# -*- coding: utf-8 -*- ''' 获取 coinmarketcap 数字货币实时价格 ''' import sys import os from datetime import datetime sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_load_config import load_config, get_base_path config_json = load_config() base_project = get_base_path() DB_USER = config_json.get('DB_USER') DB_PASSWORD = config_json.get('DB_PASSWORD') DB_IP = config_json.get('DB_IP') DB_PORT = config_json.get('DB_PORT') MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/' from utils.utils_mongo_handle import MongoHandle from message_check_base import * from utils.utils_send_gotify import * from utils.utils_send_serverchan import * class CheckCoinmarketcap: def __init__(self): self.db = 'Message' self.collection = 'coin_price' self.client = MongoHandle(self.db, self.collection) self.currency = '¥' self.url_list = [ {'BTC': 'https://www.coinmarketcap.com/zh/currencies/bitcoin/'}, {'ETH': 'https://www.coinmarketcap.com/zh/currencies/ethereum/'}, {'DOGE': 'https://coinmarketcap.com/zh/currencies/dogecoin/'}, {'ARB': 'https://coinmarketcap.com/zh/currencies/arbitrum/'}, {'ATH': 'https://coinmarketcap.com/zh/currencies/aethir/'}, ] self.selectors = ['#section-coin-overview > div.sc-65e7f566-0.czwNaM.flexStart.alignBaseline > span'] def processed_data(self, all_data): # TODO 写入并读取当前数据和上次数据作对比 ts = str(int(time.time())) data = {ts: {}} for i in all_data: for key, value in i[0].items(): data[ts].update({key: float(value.replace(self.currency, '').replace(',', ''))}) self.client.write_data(data) load_data = self.client.load_data() if len(load_data) < 2: return {} current_data = [value for value in load_data[-1].values()][0] last_data = [value for value in load_data[-2].values()][0] result_data = {} for current_key, current_value in current_data.items(): for last_key, last_value in last_data.items(): if current_key == last_key: change = ((current_value - last_value) / last_value) * 100 if change != 0: change = round(change, 3) else: change = 0 result_data[current_key] = [f'{self.currency}{current_value}', f'{change}%'] return result_data def main(self): # 运行异步函数 all_data = None crawler = CryptoCrawler(self.url_list, self.selectors, headless=True) all_data = crawler.main() # 返回的数据格式: [[{'BTC': '¥411,060.52'}], [{'ETH': '¥16,829.59'}], [{'DOGE': '¥0.726'}], [{'ARB': '¥3.69'}], [{'ATH': '¥0.3642'}]]) if not all_data: print('获取数据失败') return # TODO 加一个 mongo 存取历史数据, 并且对比上次检测时的增减 processed_data = self.processed_data(all_data) # 打印结果,只包含货币名称和价格 context = '' for key, value in processed_data.items(): context += f'{key}: {value[0]} {value[1]}\n' if context: context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 推送到 message GotifyNotifier('实时coin价格', context).send_message() # 推送到 serverchan ServerChanNotifier('实时coin价格', context.replace('\n', '\n\n')).send_message() else: print('no data!') if __name__ == '__main__': CheckCoinmarketcap().main()