Kaynağa Gözat

重写 coin 实时数据,消息推送

Jack 1 yıl önce
ebeveyn
işleme
4fc5c2eeb8
1 değiştirilmiş dosya ile 118 ekleme ve 82 silme
  1. 118 82
      message/message_coin_detail.py

+ 118 - 82
message/message_coin_detail.py

@@ -1,81 +1,96 @@
 # -*- coding: utf-8 -*-
-'''
-使用 httpx 获取 coinmarketcap 最新数字币数据
-'''
-import time
-import os
-import sys
 import httpx
-import re
-from playwright.sync_api import sync_playwright
-
-sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
-
+from datetime import datetime
 from utils.utils_send_gotify import *
 
+retry_count = 10
 
-def get_chainalert():
-    coin_follow_list = ['BTC', 'ETH', 'SOL', 'GRASS', 'SUI', 'DOGE', 'ARB', 'ATH', 'MOVE']
-    # coin_follow_list = ['DOGE']
-
-    text = ''
-
-    data = {}
-    with sync_playwright() as playwright:
-        browser = playwright.chromium.launch(headless=True)
-        context = browser.new_context()
-        page = context.new_page()
-
-        page.goto("https://chainalert.me/data/marketprice")
-
-        for coin in coin_follow_list:
-            page.fill("#marketprice_search_input", coin)
-
-            page.click("#marketprice_search_btn")
 
-            page.wait_for_timeout(1500)
-
-            elements = page.query_selector_all("#listdata_header > li:nth-child(2) > div")
-            text_content = [element.text_content() for element in elements]
-
-            temp_data = [s.replace('↑', '').replace('↓', '').split() for s in text_content]
-            print(temp_data[0])
-
-            # 去重
-            if temp_data[0][1] not in data:
-                data[temp_data[0][1]] = temp_data[0]
-
-        # 将去重后的数据拼接到 text 中
-        for coin, value in data.items():
-            text += f'Name: {coin}\n'
-            text += f'Ranking: {value[0]}\n'
-            text += f'Price: {value[2]}\n'
-            text += f'24H Transaction Volume: {value[3]}\n'
-            text += f'24H Price Change: {value[4]}\n'
-            text += f'Market Capitalization: {value[5]}\n'
-            text += f'Diluted Market Value: {value[6]}\n'
-            text += '\n\n'
+def fetch_coin_data(target):
+    url = "https://api.chainalert.me/"
+    headers = {
+        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
+        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
+    }
 
-        time.sleep(1)
+    payload = {
+        "method": "listData",
+        "params": [datetime.now().strftime("%Y-%m-%d"), "MARKETPRICE", '', 0, 9999.0, target]
+    }
 
-        page.goto("https://chainalert.me/data/")
-        page.wait_for_timeout(1500)
-        elements = page.query_selector_all(
-            '#greedy-galay-div > div.greedy_index-svg-value-div > div.greedy-svg-value > strong')
-        text_content = [element.text_content() for element in elements]
-        print(f'VIX: {text_content[0] if text_content else "No Data"}')
-        if text_content:
-            text += f'VIX: {text_content[0]}'
+    with httpx.Client() as client:
+        try:
+            response = client.post(url, headers=headers, data=payload, timeout=3)
+        except Exception as e:
+            # print(f"Target: {target} failed to fetch data. error: {str(e)}")
+            return False
+        if response.status_code != 200:
+            # print(f"{target} failed to fetch data. status code: {response.status_code}")
+            return False
         else:
-            text += 'VIX: No Data'
+            text = ''
+            data = response.json()
+            target_data = eval(data['result'][0]['data'])
+            target_data = target_data[0]
+
+            print(target_data)
+
+            # 获取数据值
+            name = target_data['name']
+            rank = target_data['rank']
+            price = target_data['item1']
+            volume = target_data['item2']
+            change = target_data['item3']
+            market_cap = target_data['item4']
+            dilute = target_data['item5']
+            logoUrl = target_data['logoUrl']
+
+            # 拼接到 text 中
+            text += f'Name: {name}\n'
+            text += f'Ranking: {rank}\n'
+            text += f'Price: {price}\n'
+            text += f'24H Transaction Volume: {volume}\n'
+            text += f'24H Price Change: {change}\n'
+            text += f'Market Capitalization: {market_cap}\n'
+            text += f'Diluted Market Value: {dilute}\n'
+            text += f'Logo: {logoUrl}\n'
+
+            return text
+
+
+def fetch_vix_data():
+    url = "https://api.chainalert.me/"
+    headers = {
+        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
+        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
+    }
 
-        context.close()
-        browser.close()
+    payload = {
+        "method": "listData",
+        "params": ['', "GREEDY_INDEX", 1, 0, 1, '']
+    }
 
-        return text
+    with httpx.Client() as client:
+        try:
+            response = client.post(url, headers=headers, data=payload, timeout=3)
+        except Exception as e:
+            # print(f"failed to fetch VIX data. error: {str(e)}")
+            return False
+        if response.status_code != 200:
+            # print(f"Failed to fetch VIX data. status code: {response.status_code}")
+            return False
+        else:
+            data = response.json()
+            vix_data = eval(data['result'][0]['data'])
+            vix_data = vix_data[0]
+            print(vix_data)
+            greedy = vix_data['greedy']
+            level = vix_data['level']
+            text = f'VIX data: {greedy}\nLevel: {level}'
+            return text
 
 
-def get_gas_value():
+def fetch_gas_data():
     url = "https://a5.maiziqianbao.net/api/v1/chains/EVM/1/gas_price"
 
     headers = {
@@ -95,11 +110,11 @@ def get_gas_value():
     response = httpx.get(url, headers=headers)
     if response.status_code != 200:
         print("Error:", response.status_code)
-        return None
+        return False
 
     if not response.json():
         print("Not Find GAS Data. Error: No response")
-        return None
+        return False
 
     remove_last_n_chars = lambda n, n_chars=9: int(str(n)[:-n_chars]) if len(str(n)) > n_chars else n
 
@@ -113,35 +128,56 @@ def get_gas_value():
         standard = remove_last_n_chars(data['standard']['price'])
         low = remove_last_n_chars(data['low']['price'])
         base = remove_last_n_chars(data['base']['price'])
+        print(f'fastest: {fastest} - fast: {fast} - standard: {standard} - low: {low} - base: {base}')
         result += f'fastest: {fastest}\nfast: {fast}\nstandard: {standard}\nlow: {low}\nbase: {base}'
         return result
     except Exception as e:
         print(e)
-        return None
+        return False
 
 
 def main():
-    retry = 5
     text = ''
-    for i in range(retry):
-        try:
-            text = get_chainalert()
-
-            gas = get_gas_value()
-            if gas:
-                text += gas
 
+    # 获取币币实时价格
+    target_list = ['btc', 'eth', 'sol', 'grass', 'sui', 'doge', 'arb', 'ath', 'move', 'pepe']
+    for target in target_list:
+        for retry in range(1, retry_count + 1):
+            result = fetch_coin_data(target)
+            if result:
+                text += result + '\n\n'
+                break
+            else:
+                print(f"{target} Failed to fetch data. retry: {retry}")
+                if retry == retry_count:
+                    text += f"{target} Failed to fetch data. retry count: {retry}"
+
+    # 获取恐慌指数
+    for retry in range(1, retry_count + 1):
+        result = fetch_vix_data()
+        if result:
+            text += result + '\n\n'
             break
-        except Exception as e:
-            print(f'retry {i + 1}')
-            print(e)
-            time.sleep(5)
+        else:
+            print(f"Failed to fetch VIX data. retry: {retry}")
+            if retry == retry_count:
+                text += f"Failed to fetch VIX data. retry count: {retry}"
+
+    # 获取gas
+    for retry in range(1, retry_count + 1):
+        result = fetch_gas_data()
+        if result:
+            text += result + '\n\n'
+            break
+        else:
+            print(f"Failed to fetch Gas data. retry: {retry}")
+            if retry == retry_count:
+                text += f"Failed to fetch Gas data. retry count: {retry}"
 
     if text:
         GotifyNotifier('Real-time coin price\n', text, 'AgfOJESqDKftBTQ').send_message()
     else:
         print('No Data')
 
-
-if __name__ == '__main__':
+if __name__ == "__main__":
     main()