| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- # -*- coding: utf-8 -*-
- '''
- 获取 coinmarketcap 数字货币实时价格
- '''
- import sys
- import os
- from datetime import datetime
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from utils.utils_load_config import load_config, get_base_path
- config_json = load_config()
- base_project = get_base_path()
- DB_USER = config_json.get('DB_USER')
- DB_PASSWORD = config_json.get('DB_PASSWORD')
- DB_IP = config_json.get('DB_IP')
- DB_PORT = config_json.get('DB_PORT')
- MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
- from utils.utils_mongo_handle import MongoHandle
- from message_check_base import *
- from utils.utils_send_gotify import *
- from utils.utils_send_serverchan import *
- class CheckCoinmarketcap:
- def __init__(self):
- self.db = 'Message'
- self.collection = 'coin_price'
- self.client = MongoHandle(self.db, self.collection)
- self.currency = '¥'
- self.url_list = [
- {'BTC': 'https://www.coinmarketcap.com/zh/currencies/bitcoin/'},
- {'ETH': 'https://www.coinmarketcap.com/zh/currencies/ethereum/'},
- {'DOGE': 'https://coinmarketcap.com/zh/currencies/dogecoin/'},
- {'ARB': 'https://coinmarketcap.com/zh/currencies/arbitrum/'},
- {'ATH': 'https://coinmarketcap.com/zh/currencies/aethir/'},
- ]
- self.selectors = ['#section-coin-overview > div.sc-65e7f566-0.czwNaM.flexStart.alignBaseline > span']
- def processed_data(self, all_data):
- # TODO 写入并读取当前数据和上次数据作对比
- ts = str(int(time.time()))
- data = {ts: {}}
- for i in all_data:
- for key, value in i[0].items():
- data[ts].update({key: float(value.replace(self.currency, '').replace(',', ''))})
- self.client.write_data(data)
- load_data = self.client.load_data()
- if len(load_data) < 2:
- return {}
- current_data = [value for value in load_data[-1].values()][0]
- last_data = [value for value in load_data[-2].values()][0]
- result_data = {}
- for current_key, current_value in current_data.items():
- for last_key, last_value in last_data.items():
- if current_key == last_key:
- change = ((current_value - last_value) / last_value) * 100
- if change != 0:
- change = round(change, 3)
- else:
- change = 0
- result_data[current_key] = [f'{self.currency}{current_value}', f'{change}%']
- return result_data
- def main(self):
- # 运行异步函数
- all_data = None
- crawler = CryptoCrawler(self.url_list, self.selectors, headless=True)
- all_data = crawler.main() # 返回的数据格式: [[{'BTC': '¥411,060.52'}], [{'ETH': '¥16,829.59'}], [{'DOGE': '¥0.726'}], [{'ARB': '¥3.69'}], [{'ATH': '¥0.3642'}]])
- if not all_data:
- print('获取数据失败')
- return
- # TODO 加一个 mongo 存取历史数据, 并且对比上次检测时的增减
- processed_data = self.processed_data(all_data)
- # 打印结果,只包含货币名称和价格
- context = ''
- for key, value in processed_data.items():
- context += f'{key}: {value[0]} {value[1]}\n'
- if context:
- context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- # 推送到 message
- GotifyNotifier('实时coin价格', context).send_message()
- # 推送到 serverchan
- ServerChanNotifier('实时coin价格', context.replace('\n', '\n\n')).send_message()
- else:
- print('no data!')
- if __name__ == '__main__':
- CheckCoinmarketcap().main()
|