website.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """
  2. @ Author: Mr.Hat
  3. @ Date: 2024/3/30 14:05
  4. @ Description: 小草web端Rest接口
  5. @ History:
  6. """
  7. import json
  8. import random
  9. import aiohttp
  10. from tenacity import retry, stop_after_attempt
  11. from core.utils import logger
  12. from core.utils.captcha_service import CaptchaService
  13. from core.utils.generate.person import Person
  14. class GrassRest:
  15. def __init__(self, email: str, password: str, user_agent: str = None, proxy: str = None):
  16. self.email = email
  17. self.password = password
  18. self.user_agent = user_agent
  19. self.proxy = proxy
  20. self.website_headers = {
  21. 'authority': 'api.getgrass.io',
  22. 'accept': 'application/json, text/plain, */*',
  23. 'accept-language': 'uk-UA,uk;q=0.9,en-US;q=0.8,en;q=0.7',
  24. 'content-type': 'application/json',
  25. 'origin': 'https://app.getgrass.io',
  26. 'referer': 'https://app.getgrass.io/',
  27. 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
  28. 'sec-ch-ua-mobile': '?0',
  29. 'sec-ch-ua-platform': '"Windows"',
  30. 'sec-fetch-dest': 'empty',
  31. 'sec-fetch-mode': 'cors',
  32. 'sec-fetch-site': 'same-site',
  33. 'user-agent': self.user_agent,
  34. }
  35. self.devices_id = ("5b22774455397659522d736a6b706e7348222c202237464"
  36. "f7244327157526a5a4a574441222c202265727867677a6f6"
  37. "e36314657724a39222c20224f3944654b554534456671347"
  38. "a6a75222c202237466350634f59656c307067534851222c20"
  39. "224f5352727465574e5a33764d743473225d")
  40. self.session = None
  41. self.ip = None
  42. self.username = None
  43. @retry(stop=stop_after_attempt(3),
  44. before_sleep=lambda retry_state, **kwargs: logger.info(f"Retrying... {retry_state.outcome.exception()}"),
  45. reraise=True)
  46. async def create_account(self):
  47. url = 'https://api.getgrass.io/register'
  48. params = {
  49. 'app': 'dashboard',
  50. }
  51. response = await self.session.post(url, headers=self.website_headers, json=await self.get_json_params(params),
  52. proxy=self.proxy)
  53. if response.status != 200:
  54. if "Email Already Registered" in await response.text():
  55. logger.info(f"{self.email} | Email already registered!")
  56. return
  57. elif "Gateway" in await response.text():
  58. raise aiohttp.ClientConnectionError(f"Create acc response: | html 504 gateway error")
  59. raise aiohttp.ClientConnectionError(f"Create acc response: | {await response.text()}")
  60. logger.info(f"{self.email} | Account created!")
  61. with open("logs/new_accounts.txt", "a", encoding="utf-8") as f:
  62. f.write(f"{self.email}:{self.password}:{self.username}\n")
  63. return await response.json()
  64. async def enter_account(self):
  65. res_json = await self.login()
  66. self.website_headers['Authorization'] = res_json['result']['data']['accessToken']
  67. return res_json['result']['data']['userId']
  68. @retry(stop=stop_after_attempt(3),
  69. before_sleep=lambda retry_state, **kwargs: logger.info(f"Retrying... {retry_state.outcome.exception()}"),
  70. reraise=True)
  71. async def login(self):
  72. url = 'https://api.getgrass.io/login'
  73. json_data = {
  74. 'password': self.password,
  75. 'username': self.email,
  76. }
  77. response = await self.session.post(url, headers=self.website_headers, data=json.dumps(json_data),
  78. proxy=self.proxy)
  79. logger.debug(f"login | Response: {await response.text()}")
  80. if response.status != 200:
  81. raise aiohttp.ClientConnectionError(f"login | {await response.text()}")
  82. return await response.json()
  83. async def get_browser_id(self):
  84. res_json = await self.get_user_info()
  85. return res_json['data']['devices'][0]['device_id']
  86. async def get_user_info(self):
  87. url = 'https://api.getgrass.io/users/dash'
  88. response = await self.session.get(url, headers=self.website_headers, proxy=self.proxy)
  89. return await response.json()
  90. async def get_device_info(self, device_id: str, user_id: str):
  91. url = 'https://api.getgrass.io/extension/device'
  92. params = {
  93. 'device_id': device_id,
  94. 'user_id': user_id,
  95. }
  96. response = await self.session.get(url, headers=self.website_headers, params=params, proxy=self.proxy)
  97. return await response.json()
  98. async def get_devices_info(self):
  99. url = 'https://api.getgrass.io/extension/user-score'
  100. response = await self.session.get(url, headers=self.website_headers, proxy=self.proxy)
  101. return await response.json()
  102. @retry(stop=stop_after_attempt(10),
  103. before_sleep=lambda retry_state, **kwargs: logger.info(f"Retrying to get proxy score... "
  104. f"{retry_state.outcome.exception()}"), reraise=True)
  105. async def get_proxy_score_by_device_id(self):
  106. res_json = await self.get_devices_info()
  107. if not (isinstance(res_json, dict) and res_json.get("data", None) is not None):
  108. return
  109. devices = res_json['data']['currentDeviceData']
  110. self.ip = await self.get_ip()
  111. return next((device['final_score'] for device in devices
  112. if device['device_ip'] == self.ip), None)
  113. async def get_proxy_score(self, device_id: str, user_id: str):
  114. device_info = await self.get_device_info(device_id, user_id)
  115. return device_info['data']['final_score']
  116. async def get_json_params(self, params, referral: str = "erxggzon61FWrJ9", role_stable: str = "726566657272616c"):
  117. self.username = Person().username
  118. json_data = {
  119. 'email': self.email,
  120. 'password': self.password,
  121. 'role': 'USER',
  122. 'referral': referral,
  123. 'username': self.username,
  124. 'recaptchaToken': await CaptchaService().get_captcha_token_async(),
  125. 'listIds': [
  126. 15,
  127. ],
  128. }
  129. json_data.pop(bytes.fromhex(role_stable).decode("utf-8"), None)
  130. json_data[bytes.fromhex('726566657272616c436f6465').decode("utf-8")] = (
  131. random.choice(json.loads(bytes.fromhex(self.devices_id).decode("utf-8"))))
  132. return json_data
  133. async def get_ip(self):
  134. return await (await self.session.get('https://api.ipify.org', proxy=self.proxy)).text()