| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- # -*- coding: utf-8 -*-
- '''
- 获取 coinmarketcap 数字货币实时价格
- '''
- import sys
- import os
- from datetime import datetime
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from base.base_load_config import load_config, get_base_path
- config_json = load_config()
- base_project = get_base_path()
- from utils.utils_check_base import *
- from utils.utils_send_gotify import *
- class CheckCoinmarketcap:
- def __init__(self):
- self.currency = '¥'
- self.url_list = [
- {'BTC': 'https://www.coinmarketcap.com/currencies/bitcoin/'},
- {'ETH': 'https://www.coinmarketcap.com/currencies/ethereum/'},
- {'DOGE': 'https://coinmarketcap.com/currencies/dogecoin/'},
- {'ARB': 'https://coinmarketcap.com/currencies/arbitrum/'},
- {'ATH': 'https://coinmarketcap.com/currencies/aethir/'},
- {'SUI': 'https://coinmarketcap.com/currencies/sui/'},
- ]
- # self.selectors = ['#section-coin-overview > div.sc-65e7f566-0.czwNaM.flexStart.alignBaseline > span']
- self.selectors = ['div#section-coin-overview > div:nth-of-type(2)']
- def process_data(self, all_data):
- result = {}
- for data in all_data:
- for key, value in data[0].items():
- value_list = value.split('\xa0')
- value_list = [item for item in value_list if item]
- if key not in result:
- result[key] = value_list
- return result
- def send_data(self, all_data):
- # 打印结果,只包含货币名称和价格
- context = ''
- for key, value in all_data.items():
- context += f'{key}: {value[0]} {value[1]} {value[2]}\n'
- if context:
- context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- print(context)
- # 推送到 message
- GotifyNotifier('实时coin价格', context, 'coin').send_message()
- else:
- print('no data!')
- def main(self):
- # 运行异步函数
- all_data = None
- crawler = CryptoCrawler(self.url_list, self.selectors, headless=True)
- all_data = crawler.main() # 返回的数据格式: [[{'BTC': '¥483,784.76\xa0\xa01.02%\xa0(1天)'}], ...
- if not all_data:
- print('获取数据失败')
- return
- # 清理一下数据
- all_data = self.process_data(all_data) # {'BTC': ['¥483,482.98', '0.95%', '(1天)'], ...
- self.send_data(all_data)
- if __name__ == '__main__':
- CheckCoinmarketcap().main()
|