# -*- coding: utf-8 -*- ''' 使用 httpx 获取 coinmarketcap 最新数字币数据 ''' import time import os import sys import httpx import re sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto')) from utils.utils_send_gotify import * def get_coinmarketcap_coin_price(): url_list = [ ['BTC', 'https://coinmarketcap.com/currencies/bitcoin/'], ['ETH', 'https://coinmarketcap.com/currencies/ethereum/'], ['SOL', 'https://coinmarketcap.com/currencies/solana/'], ['SUI', 'https://coinmarketcap.com/currencies/sui/'], ['DOGE', 'https://coinmarketcap.com/currencies/dogecoin/'], ['ARB', 'https://coinmarketcap.com/currencies/arbitrum/'], ['ATH', 'https://coinmarketcap.com/currencies/aethir/'] ] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107" } result = '' for data_list in url_list: url = data_list[1] target = data_list[0] try: resp = httpx.get(url=url, headers=headers) except Exception as e: print(f'target: {target}, {e}') time.sleep(5) continue result += f'target: {target}\n' if resp.status_code == 301: print(resp.text) return None elif resp.status_code != 200: print(resp.status_code) return None resp.encoding = 'utf-8' page = resp.text text = re.findall('(.*?)

', '', text) text = re.sub(r'', '', text) prices = re.findall(r'\$(\d+\.\d+)', text) if prices: result += f'prices: ${prices[0]}\n' volumes = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text) if volumes: result += f'24-hour trading volume: ${volumes[1]}\n' change = re.findall(r'(up|down) (\d+\.\d+)%', text) if change: c = ' '.join(change[0]) result += f'change: {c}%\n' live_market_cap = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text) if live_market_cap: result += f'live market cap: ${live_market_cap[2]}\n' max_circulating_supply = re.findall(r'(\d{1,3}(?:,\d{3})*)', text) if max_circulating_supply: result += f'max circulating supply: {max_circulating_supply[-1]}\n' time.sleep(2) return result + '\n\n' def get_vix_data(): url = 'https://api-ddc-wscn.awtmt.com/market/real?' headers = { "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Host": "api-ddc-wscn.awtmt.com", "If-None-Match": "AMJ+W5ydwGIw9oqT3fOBeQ==", "Origin": "https://wallstreetcn.com", "Referer": "https://wallstreetcn.com/", "Sec-CH-UA": '"Chromium";v="130", "Brave";v="130", "Not?A_Brand";v="99"', "Sec-CH-UA-Mobile": "?0", "Sec-CH-UA-Platform": '"macOS"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "Sec-GPC": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36" } params = { "fields": "symbol,en_name,prod_name,last_px,px_change,px_change_rate,high_px,low_px,open_px,preclose_px,market_value,turnover_volume,turnover_ratio,turnover_value,dyn_pb_rate,amplitude,dyn_pe,trade_status,circulation_value,update_time,price_precision,week_52_high,week_52_low,static_pe,source", "prod_code": "VIX.OTC" } resp = httpx.get(url=url, headers=headers, params=params) if resp.status_code != 200: print(resp.status_code) return None data = resp.json()['data']['snapshot']['VIX.OTC'] result = f'{data[1]}:{data[2]}' return result def get_crypto_fear_and_greed_index(): url = 'https://coinmarketcap.com/charts/fear-and-greed-index/' def main(): text = get_coinmarketcap_coin_price() res = get_vix_data() if res: text += res if text: GotifyNotifier('Real-time coin price\n', text).send_message() else: print('No Data') if __name__ == '__main__': main()