| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- # -*- coding: utf-8 -*-
- import re
- from fastapi import FastAPI, HTTPException
- import uvicorn
- import httpx
- from fastapi.responses import HTMLResponse
- app = FastAPI()
- COIN_ITEMS = {
- 'btc': 'https://coinmarketcap.com/currencies/bitcoin/',
- 'eth': 'https://coinmarketcap.com/currencies/ethereum/',
- 'sol': 'https://coinmarketcap.com/currencies/solana/',
- 'sui': 'https://coinmarketcap.com/currencies/sui/',
- 'doge': 'https://coinmarketcap.com/currencies/dogecoin/',
- 'x': 'https://coinmarketcap.com/currencies/x-empire/',
- 'arb': 'https://coinmarketcap.com/currencies/arbitrum/',
- 'pepe': 'https://coinmarketcap.com/currencies/pepe/',
- 'grass': 'https://coinmarketcap.com/currencies/grass/',
- 'bome': 'https://coinmarketcap.com/currencies/book-of-meme/',
- }
- PROXIES = {
- "http://": "http://127.0.0.1:7890",
- "https://": "http://127.0.0.1:7890"
- }
- HEADERS = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 Edg/107"
- }
- @app.get("/coin/{proxy_type}/{coin_name}", response_class=HTMLResponse)
- async def read_coin(coin_name: str, proxy_type: int = 0):
- coin_url = COIN_ITEMS.get(coin_name)
- if not coin_url:
- return HTMLResponse(content=f"<p style='text-align: center;'>error: {coin_name}</p>", status_code=200)
- try:
- result = get_coin_data(coin_url, HEADERS, PROXIES, proxy_type)
- # 使用HTMLResponse返回居中的HTML内容
- html_result = f"""<html>
- <body>
- <div style="width: 80%; margin: auto; text-align: left; font-size: 16px; padding: 20px;">
- {result}
- </div>
- </body>
- </html>"""
- return HTMLResponse(content=html_result, status_code=200)
- except httpx.RequestError as e:
- return HTMLResponse(content=f"<p style='text-align: center;'>Request Error: {e}</p>", status_code=200)
- except Exception as e:
- return HTMLResponse(content=f"<p style='text-align: center;'>Internal Error: {e}</p>", status_code=200)
- def get_coin_data(url: str, headers: dict, proxies: dict, proxy_type: int):
- result = ''
- if proxy_type:
- resp = httpx.get(url=url, headers=headers, proxies=proxies, timeout=3)
- else:
- resp = httpx.get(url=url, headers=headers, timeout=3)
- if resp.status_code != 200:
- raise HTTPException(status_code=resp.status_code,detail=f"Failed to retrieve data, status code: {resp.status_code}")
- page = resp.text
- text = re.search('<strong>(.*?)sc-65e7f566-0', page)
- if not text:
- return 'No Data'
- text = re.sub(r'</strong>', '', text.group(1))
- text = re.sub(r'<!-- -->', '', text)
- text = re.sub(r'</p></div></div><div class="', '', text)
- prices = re.findall(r'\$(\d+\.\d+)', text)
- volumes = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text)
- change = re.findall(r'(up|down) (\d+\.\d+)%', text)
- live_market_cap = re.findall(r'\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)', text)
- max_circulating_supply = re.findall(r'(\d{1,3}(?:,\d{3})*)', text)
- result += text.replace('我们会实时更新SUI兑换为CNY的价格。 ', '')
- if prices:
- result += f'\n\nprices: ${prices[0]}'
- if volumes:
- result += f'\n24-hour trading volume: ${volumes[1]}'
- if change:
- c = ' '.join(change[0])
- result += f'\nchange: {c}%'
- if live_market_cap and len(live_market_cap) == 3:
- result += f'\nlive market cap: ${live_market_cap[2]}'
- if max_circulating_supply:
- result += f'\nmax circulating supply: {max_circulating_supply[-1]}'
- if result:
- result = re.sub(r'price([\S\s]*)?coins\.\n', '', result)
- return result.replace('\n', '<br>').replace('. ', '<br>')
- if __name__ == "__main__":
- uvicorn.run("main:app", host="0.0.0.0", port=32900, reload=True)
|