| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- # -*- coding: utf-8 -*-
- '''
- 使用 httpx 获取 coinmarketcap 最新数字币数据
- '''
- import time
- import os
- import sys
- import httpx
- import re
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from utils.utils_send_gotify import *
- def get_coinmarketcap_coin_price():
- url_list = [
- ['BTC', 'https://coinmarketcap.com/currencies/bitcoin/'],
- ['ETH', 'https://coinmarketcap.com/currencies/ethereum/'],
- ['SOL', 'https://coinmarketcap.com/currencies/solana/'],
- ['GRASS', 'https://coinmarketcap.com/currencies/grass/'],
- ['SUI', 'https://coinmarketcap.com/currencies/sui/'],
- ['DOGE', 'https://coinmarketcap.com/currencies/dogecoin/'],
- ['ARB', 'https://coinmarketcap.com/currencies/arbitrum/'],
- ['ATH', 'https://coinmarketcap.com/currencies/aethir/']
- ]
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107"
- }
- result = ''
- for data_list in url_list:
- url = data_list[1]
- target = data_list[0]
- try:
- resp = httpx.get(url=url, headers=headers)
- except Exception as e:
- print(f'target: {target}, {e}')
- time.sleep(5)
- continue
- result += f'{target}\n'
- if resp.status_code == 301:
- print(resp.text)
- return None
- elif resp.status_code != 200:
- print(resp.status_code)
- return None
- resp.encoding = 'utf-8'
- page = resp.text
- text = re.findall('<strong>(.*?)sc-65e7f566-0', page)[0] if re.findall(
- '<strong>(.*?)sc-65e7f566-0', page) else 'No Data'
- text = re.sub(r'</strong>', '', text)
- text = re.sub(r'<!-- -->', '', text)
- text = re.sub(r'</p></div></div><div class="', '', text)
- prices = re.findall(r'\$(\d+\.\d+)', text)
- if prices:
- result += f'prices: ${prices[0]}\n'
- volumes = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text)
- if volumes:
- result += f'24-hour trading volume: ${volumes[1]}\n'
- change = re.findall(r'(up|down) (\d+\.\d+)%', text)
- if change:
- c = ' '.join(change[0])
- result += f'change: {c}%\n'
- live_market_cap = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text)
- if live_market_cap:
- if len(live_market_cap) == 3:
- result += f'live market cap: ${live_market_cap[2]}\n'
- else:
- pass
- max_circulating_supply = re.findall(r'(\d{1,3}(?:,\d{3})*)', text)
- if max_circulating_supply:
- result += f'max circulating supply: {max_circulating_supply[-1]}\n'
- result += '\n'
- print(f'已获取 {target} 数据')
- time.sleep(2)
- return result + '\n\n'
- def get_vix_data():
- url = 'https://api-ddc-wscn.awtmt.com/market/real?'
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Host": "api-ddc-wscn.awtmt.com",
- "If-None-Match": "AMJ+W5ydwGIw9oqT3fOBeQ==",
- "Origin": "https://wallstreetcn.com",
- "Referer": "https://wallstreetcn.com/",
- "Sec-CH-UA": '"Chromium";v="130", "Brave";v="130", "Not?A_Brand";v="99"',
- "Sec-CH-UA-Mobile": "?0",
- "Sec-CH-UA-Platform": '"macOS"',
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "cross-site",
- "Sec-GPC": "1",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- }
- params = {
- "fields": "symbol,en_name,prod_name,last_px,px_change,px_change_rate,high_px,low_px,open_px,preclose_px,market_value,turnover_volume,turnover_ratio,turnover_value,dyn_pb_rate,amplitude,dyn_pe,trade_status,circulation_value,update_time,price_precision,week_52_high,week_52_low,static_pe,source",
- "prod_code": "VIX.OTC"
- }
- resp = httpx.get(url=url, headers=headers, params=params)
- if resp.status_code != 200:
- print(resp.status_code)
- return None
- try:
- data = resp.json()['data']['snapshot']['VIX.OTC']
- except Exception as e:
- print(e)
- return None
- result = f'{data[1]}:{data[2]}'
- print('已获取 vix 指数')
- return result + '\n\n'
- def get_crypto_fear_and_greed_index():
- url = 'https://coinmarketcap.com/charts/fear-and-greed-index/'
- headers = {
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36"
- }
- time.sleep(1)
- resp = httpx.get(url=url, headers=headers)
- if resp.status_code != 200:
- print(resp.status_code)
- return None
- resp.encoding = 'utf-8'
- page = resp.text
- print('已获取 crypto 贪婪指数')
- re_text = re.findall(
- 'fearGreedIndexData":\{"currentIndex":\{"score":(.*?),"maxScore":100,"name":"(.*?)","updateTime":"(.*?)"}',
- page)
- if re_text:
- text = f'crypto fear and greed index: {re_text[0][0]}\n{re_text[0][1]}\nupdate time: {re_text[0][2]}\n'
- return text
- else:
- return None
- def get_gas_value():
- url = "https://a5.maiziqianbao.net/api/v1/chains/EVM/1/gas_price"
- headers = {
- "Host": "a5.maiziqianbao.net",
- "Connection": "keep-alive",
- "x-req-token": "MDbO4FsaSUPdjCdvTUs2zY4V3rnvvYatvYyjz7SfY+aCJ8r+RFm06X2dGR8eEDK7Gc5g1TLEQySEhGerRXbDT/NS+e5QAWRU68yD8m4y/aKK+TBkIv90VwvxmvYId2BVoDPDHQCGG4o3EqRWkS93eV0twYQ7w7qvNUj2e3tpDcUZYuplPyLozgYVTegFPnDk",
- "Accept": "*/*",
- "x-app-type": "iOS-5",
- "x-app-ver": "1.0.1",
- "x-app-udid": "419815AD-3015-4B5A-92CA-3BCBED24ACEC",
- "x-app-locale": "en",
- "Accept-Language": "zh-Hans-CN;q=1.0, en-CN;q=0.9",
- "Accept-Encoding": "br;q=1.0, gzip;q=0.9, deflate;q=0.8",
- "User-Agent": "MathGas/1.0.1 (MathWallet.MathGas; build:3; macOS 13.5.0) Alamofire/5.4.4"
- }
- response = httpx.get(url, headers=headers)
- if response.status_code != 200:
- print("Error:", response.status_code)
- return None
- if not response.json():
- print("Not Find GAS Data. Error: No response")
- return None
- remove_last_n_chars = lambda n, n_chars=9: int(str(n)[:-n_chars]) if len(str(n)) > n_chars else n
- result = '\nGAS:\n'
- try:
- data = response.json()['data']
- fastest = remove_last_n_chars(data['fastest']['price'])
- fast = remove_last_n_chars(data['fast']['price'])
- standard = remove_last_n_chars(data['standard']['price'])
- low = remove_last_n_chars(data['low']['price'])
- base = remove_last_n_chars(data['base']['price'])
- result += f'fastest: {fastest}\nfast: {fast}\nstandard: {standard}\nlow: {low}\nbase: {base}'
- return result
- except Exception as e:
- print(e)
- return None
- def main():
- text = ''
- text = get_coinmarketcap_coin_price()
- res = get_vix_data()
- if res:
- text += res
- fear_and_greed = get_crypto_fear_and_greed_index()
- if fear_and_greed:
- text += fear_and_greed
- gas = get_gas_value()
- if gas:
- text += gas
- if text:
- print(text)
- GotifyNotifier('Real-time coin price\n', text, 'AgfOJESqDKftBTQ').send_message()
- else:
- print('No Data')
- if __name__ == '__main__':
- main()
|