message_coinmarketcap.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # -*- coding: utf-8 -*-
  2. '''
  3. 获取 coinmarketcap 数字货币实时价格
  4. '''
  5. import sys
  6. import os
  7. from datetime import datetime
  8. sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
  9. from base.base_load_config import load_config, get_base_path
  10. config_json = load_config()
  11. base_project = get_base_path()
  12. from utils.utils_check_base import *
  13. from utils.utils_send_gotify import *
  14. class CheckCoinmarketcap:
  15. def __init__(self):
  16. self.currency = '¥'
  17. self.url_list = [
  18. {'BTC': 'https://www.coinmarketcap.com/currencies/bitcoin/'},
  19. {'ETH': 'https://www.coinmarketcap.com/currencies/ethereum/'},
  20. {'DOGE': 'https://coinmarketcap.com/currencies/dogecoin/'},
  21. {'ARB': 'https://coinmarketcap.com/currencies/arbitrum/'},
  22. {'ATH': 'https://coinmarketcap.com/currencies/aethir/'},
  23. {'SUI': 'https://coinmarketcap.com/currencies/sui/'},
  24. ]
  25. # self.selectors = ['#section-coin-overview > div.sc-65e7f566-0.czwNaM.flexStart.alignBaseline > span']
  26. self.selectors = ['div#section-coin-overview > div:nth-of-type(2)']
  27. def process_data(self, all_data):
  28. result = {}
  29. for data in all_data:
  30. for key, value in data[0].items():
  31. value_list = value.split('\xa0')
  32. value_list = [item for item in value_list if item]
  33. if key not in result:
  34. result[key] = value_list
  35. return result
  36. def send_data(self, all_data):
  37. # 打印结果,只包含货币名称和价格
  38. context = ''
  39. for key, value in all_data.items():
  40. context += f'{key}: {value[0]} {value[1]} {value[2]}\n'
  41. if context:
  42. context += '\n{}'.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  43. print(context)
  44. # 推送到 message
  45. GotifyNotifier('实时coin价格', context, 'coin').send_message()
  46. else:
  47. print('no data!')
  48. def main(self):
  49. # 运行异步函数
  50. all_data = None
  51. crawler = CryptoCrawler(self.url_list, self.selectors, headless=True)
  52. all_data = crawler.main() # 返回的数据格式: [[{'BTC': '¥483,784.76\xa0\xa01.02%\xa0(1天)'}], ...
  53. if not all_data:
  54. print('获取数据失败')
  55. return
  56. # 清理一下数据
  57. all_data = self.process_data(all_data) # {'BTC': ['¥483,482.98', '0.95%', '(1天)'], ...
  58. self.send_data(all_data)
  59. if __name__ == '__main__':
  60. CheckCoinmarketcap().main()