coin_detail.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import sys
  4. sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
  5. import httpx
  6. from datetime import datetime
  7. from utils.utils_send_gotify import *
  8. retry_count = 5
  9. def fetch_coin_data(target):
  10. url = "https://api.chainalert.me/"
  11. headers = {
  12. "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  13. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
  14. }
  15. payload = {
  16. "method": "listData",
  17. "params": [datetime.now().strftime("%Y-%m-%d"), "MARKETPRICE", '', 0, 9999.0, target]
  18. }
  19. with httpx.Client() as client:
  20. try:
  21. response = client.post(url, headers=headers, data=payload, timeout=3)
  22. except Exception as e:
  23. # print(f"Target: {target} failed to fetch data. error: {str(e)}")
  24. client.close()
  25. return False
  26. if response.status_code != 200:
  27. client.close()
  28. # print(f"{target} failed to fetch data. status code: {response.status_code}")
  29. return False
  30. else:
  31. text = ''
  32. data = response.json()
  33. try:
  34. target_data = eval(data['result'][0]['data'])
  35. except Exception as e:
  36. client.close()
  37. raise Exception(f"Failed to parse data: {data}, error: {str(e)}")
  38. target_data = target_data[0]
  39. # print(target_data)
  40. # 获取数据值
  41. name = target_data['name']
  42. rank = target_data['rank']
  43. price = target_data['item1']
  44. volume = target_data['item2']
  45. change = target_data['item3']
  46. market_cap = target_data['item4']
  47. dilute = target_data['item5']
  48. logoUrl = target_data['logoUrl']
  49. # 拼接到 text 中
  50. text = '{} {} {} {} {} {}'.format(name, price, change, volume, rank, market_cap)
  51. print(text)
  52. # text += f'Name: {name}\n'
  53. # text += f'Ranking: {rank}\n'
  54. # text += f'Price: {price}\n'
  55. # text += f'24H Transaction Volume: {volume}\n'
  56. # text += f'24H Price Change: {change}\n'
  57. # text += f'Market Capitalization: {market_cap}\n'
  58. # text += f'Diluted Market Value: {dilute}\n'
  59. # text += f'Logo: {logoUrl}\n'
  60. return text + '\n' + ('-' * len(text)) + '\n'
  61. def fetch_vix_data():
  62. url = "https://api.chainalert.me/"
  63. headers = {
  64. "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
  65. "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
  66. }
  67. payload = {
  68. "method": "listData",
  69. "params": ['', "GREEDY_INDEX", 1, 0, 1, '']
  70. }
  71. with httpx.Client() as client:
  72. try:
  73. response = client.post(url, headers=headers, data=payload, timeout=3)
  74. except Exception as e:
  75. # print(f"failed to fetch VIX data. error: {str(e)}")
  76. client.close()
  77. return False
  78. if response.status_code != 200:
  79. client.close()
  80. # print(f"Failed to fetch VIX data. status code: {response.status_code}")
  81. return False
  82. else:
  83. data = response.json()
  84. vix_data = eval(data['result'][0]['data'])
  85. vix_data = vix_data[0]
  86. print(vix_data)
  87. greedy = vix_data['greedy']
  88. level = vix_data['level']
  89. text = f'VIX data: {greedy}\nLevel: {level}'
  90. return text
  91. def fetch_gas_data():
  92. url = "https://a5.maiziqianbao.net/api/v1/chains/EVM/1/gas_price"
  93. headers = {
  94. "Host": "a5.maiziqianbao.net",
  95. "Connection": "keep-alive",
  96. "x-req-token": "MDbO4FsaSUPdjCdvTUs2zY4V3rnvvYatvYyjz7SfY+aCJ8r+RFm06X2dGR8eEDK7Gc5g1TLEQySEhGerRXbDT/NS+e5QAWRU68yD8m4y/aKK+TBkIv90VwvxmvYId2BVoDPDHQCGG4o3EqRWkS93eV0twYQ7w7qvNUj2e3tpDcUZYuplPyLozgYVTegFPnDk",
  97. "Accept": "*/*",
  98. "x-app-type": "iOS-5",
  99. "x-app-ver": "1.0.1",
  100. "x-app-udid": "419815AD-3015-4B5A-92CA-3BCBED24ACEC",
  101. "x-app-locale": "en",
  102. "Accept-Language": "zh-Hans-CN;q=1.0, en-CN;q=0.9",
  103. "Accept-Encoding": "br;q=1.0, gzip;q=0.9, deflate;q=0.8",
  104. "User-Agent": "MathGas/1.0.1 (MathWallet.MathGas; build:3; macOS 13.5.0) Alamofire/5.4.4"
  105. }
  106. with httpx.Client() as client:
  107. response = client.get(url, headers=headers)
  108. if response.status_code != 200:
  109. client.close()
  110. print("Error:", response.status_code)
  111. return False
  112. if not response.json():
  113. client.close()
  114. print("Not Find GAS Data. Error: No response")
  115. return False
  116. remove_last_n_chars = lambda n, n_chars=9: int(str(n)[:-n_chars]) if len(str(n)) > n_chars else n
  117. result = '\nGAS:\n'
  118. try:
  119. data = response.json()['data']
  120. fastest = remove_last_n_chars(data['fastest']['price'])
  121. fast = remove_last_n_chars(data['fast']['price'])
  122. standard = remove_last_n_chars(data['standard']['price'])
  123. low = remove_last_n_chars(data['low']['price'])
  124. base = remove_last_n_chars(data['base']['price'])
  125. print(f'fastest: {fastest} - fast: {fast} - standard: {standard} - low: {low} - base: {base}')
  126. result += f'fastest: {fastest}\nfast: {fast}\nstandard: {standard}\nlow: {low}\nbase: {base}'
  127. return result
  128. except Exception as e:
  129. print(e)
  130. return False
  131. def main():
  132. text = ''
  133. # 获取币币实时价格
  134. target_list = ['btc', 'eth', 'sol', 'grass', 'sui', 'doge', 'arb', 'ath', 'move', 'pepe', 'degen', 'act', 'plume']
  135. for target in target_list:
  136. for retry in range(1, retry_count + 1):
  137. result = fetch_coin_data(target)
  138. if result:
  139. text += result
  140. break
  141. else:
  142. print(f"{target} Failed to fetch data. retry: {retry}")
  143. if retry == retry_count:
  144. text += f"{target} Failed to fetch data. retry count: {retry}"
  145. # 获取恐慌指数
  146. for retry in range(1, retry_count + 1):
  147. result = fetch_vix_data()
  148. if result:
  149. text += result + '\n\n'
  150. break
  151. else:
  152. print(f"Failed to fetch VIX data. retry: {retry}")
  153. if retry == retry_count:
  154. text += f"Failed to fetch VIX data. retry count: {retry}"
  155. # 获取实时gas费
  156. for retry in range(1, retry_count + 1):
  157. result = fetch_gas_data()
  158. if result:
  159. text += '\n' + result + '\n\n'
  160. break
  161. else:
  162. # print(f"Failed to fetch Gas data. retry: {retry}")
  163. if retry == retry_count:
  164. text += f"Failed to fetch Gas data. retry count: {retry}"
  165. if text:
  166. GotifyNotifier('Real-time coin price\n', text, 'coin').send_message()
  167. else:
  168. print('No Data')
  169. if __name__ == "__main__":
  170. main()