core.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import asyncio
  2. import json
  3. import ssl
  4. import sys
  5. import time
  6. import uuid
  7. from datetime import datetime
  8. from typing import Optional
  9. import socks
  10. import websockets
  11. from faker import Faker
  12. from loguru import logger
  13. from websockets import WebSocketCommonProtocol
  14. from utils import parse_proxy_url, Status
  15. logger.remove() # 移除默认的控制台输出处理器
  16. logger.add(sys.stdout, level="INFO") # 添加新的控制台输出处理器
  17. INFO = 'INFO'
  18. DEBUG = 'DEBUG'
  19. class AsyncGrassWs:
  20. def __init__(self, user_id, proxy_url=None):
  21. self.user_id = user_id
  22. self.user_agent = Faker().chrome()
  23. self.device_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, proxy_url or ""))
  24. self.proxy_url = proxy_url
  25. self.ws: Optional[WebSocketCommonProtocol] = None
  26. self.status: Status = Status.disconnect
  27. self._stop = False
  28. self._stopped = False
  29. self._ping_stopped = False
  30. self.server_hostname = "proxy.wynd.network"
  31. self.server_port = 4444
  32. self.server_url = f"wss://{self.server_hostname}:{self.server_port}/"
  33. self.proxy_timeout = 60
  34. self.logs = []
  35. def log(self, level, message):
  36. logger.log(logger.level(level).name, message)
  37. self.logs.append((datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message))
  38. if len(self.logs) >= 100:
  39. self.logs = self.logs[-100:]
  40. async def send_ping(self):
  41. await asyncio.sleep(5)
  42. while not self._stop:
  43. try:
  44. send_message = json.dumps(
  45. {"id": str(uuid.uuid4()), "version": "1.0.0", "action": "PING", "data": {}})
  46. if self.ws:
  47. self.log(DEBUG, f'[发送消息] [{self.user_id}] [{self.proxy_url}] [{send_message}]')
  48. await self.ws.send(send_message)
  49. except Exception as e:
  50. self.log(DEBUG, f'[PING Error] {e}')
  51. for i in range(20):
  52. if self._stop:
  53. break
  54. await asyncio.sleep(1)
  55. self._ping_stopped = True
  56. def auth_response(self, message):
  57. return {
  58. "id": message["id"],
  59. "origin_action": "AUTH",
  60. "result": {
  61. "browser_id": self.device_id,
  62. "user_id": self.user_id,
  63. "user_agent": self.user_agent,
  64. "timestamp": int(time.time()),
  65. "device_type": "extension",
  66. "version": "3.3.2"
  67. }
  68. }
  69. async def run(self):
  70. self.log(INFO, f'[启动] [{self.user_id}] [{self.proxy_url}]')
  71. asyncio.create_task(self.send_ping())
  72. loop = asyncio.get_event_loop()
  73. while True:
  74. ws_proxy = None
  75. try:
  76. self.status = Status.connecting
  77. if self.proxy_url:
  78. proxy_type, http_proxy_host, http_proxy_port, http_proxy_auth = parse_proxy_url(self.proxy_url)
  79. if http_proxy_auth:
  80. username, password = http_proxy_auth[0], http_proxy_auth[1]
  81. else:
  82. username = password = None
  83. # Initialize the connection to the server through the proxy
  84. self.log(DEBUG, f'[连接代理] [{self.user_id}] [{self.proxy_url}]')
  85. ws_proxy = socks.socksocket()
  86. ws_proxy.set_proxy(socks.PROXY_TYPES[proxy_type.upper()], http_proxy_host, http_proxy_port,
  87. username=username, password=password)
  88. async with asyncio.timeout(self.proxy_timeout):
  89. await loop.run_in_executor(None, ws_proxy.connect, (self.server_hostname, self.server_port)) # 执行阻塞函数
  90. self.log(DEBUG, f'[连接代理成功] [{self.user_id}] [{self.proxy_url}]')
  91. ssl_context = ssl.create_default_context()
  92. ssl_context.check_hostname = False
  93. ssl_context.verify_mode = ssl.CERT_NONE
  94. custom_headers = {
  95. "User-Agent": self.user_agent
  96. }
  97. self.log(DEBUG, f'[连接服务器] [{self.user_id}] [{self.proxy_url}]')
  98. self.ws = await websockets.connect(
  99. self.server_url,
  100. ssl=ssl_context,
  101. sock=ws_proxy,
  102. extra_headers=custom_headers,
  103. server_hostname=self.server_hostname,
  104. open_timeout=60
  105. )
  106. self.log(DEBUG, f'[连接服务器成功] [{self.user_id}] [{self.proxy_url}]')
  107. while True:
  108. response = await self.ws.recv()
  109. message = json.loads(response)
  110. self.log(DEBUG, f'[收到消息] [{self.user_id}] [{self.proxy_url}] [{message}]')
  111. if message.get("action") == "AUTH":
  112. auth_response = self.auth_response(message)
  113. self.log(DEBUG, f'[发送消息] [{self.user_id}] [{self.proxy_url}] [{auth_response}]')
  114. await self.ws.send(json.dumps(auth_response))
  115. self.status = Status.connected
  116. self.log(INFO, f'[在线] [{self.user_id}] [{self.proxy_url}]')
  117. elif message.get("action") == "PONG":
  118. pong_response = {"id": message["id"], "origin_action": "PONG"}
  119. self.log(DEBUG, f'[发送消息] [{self.user_id}] [{self.proxy_url}] [{pong_response}]')
  120. await self.ws.send(json.dumps(pong_response))
  121. except TimeoutError as e:
  122. self.log(INFO, f'[连接超时] [{self.user_id}] [{self.proxy_url}] {e}')
  123. except Exception as e:
  124. self.log(INFO, f'[连接断开] [{self.user_id}] [{self.proxy_url}] {e}')
  125. self.status = Status.disconnect
  126. if not self._stop:
  127. self.log(DEBUG, f'[重新连接] [{self.user_id}] [{self.proxy_url}]')
  128. try:
  129. ws_proxy.close()
  130. except:
  131. pass
  132. else:
  133. while not self._ping_stopped:
  134. await asyncio.sleep(1)
  135. self.log(INFO, f'手动退出 [{self.user_id}] [{self.proxy_url}]')
  136. self._stopped = True
  137. break
  138. await asyncio.sleep(5)
  139. async def stop(self):
  140. self._stop = True
  141. if self.ws:
  142. await self.ws.close()