main.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. import subprocess
  5. import sys
  6. import asyncio
  7. import httpx
  8. GO_TO_URL = 'https://sepolia-faucet.pk910.de/'
  9. # 是否使用代理, 是 True 否 False
  10. USE_PROXY = True
  11. # 是否无头模式, 是 True 否 False
  12. HEADLESS = False
  13. # 是否记录用户数据, 例如cookies, 是 True 否 False
  14. NEED_USER_DATA = True
  15. class GotifySender:
  16. def __init__(self, url, token='A0Xg6ZE5946iBYg'):
  17. self.url = url + f"?token={token}"
  18. async def send_message(self, title='测试标题', message='测试消息', priority=5):
  19. data = {
  20. "message": message,
  21. "title": title,
  22. "priority": priority
  23. }
  24. async with httpx.AsyncClient() as client:
  25. try:
  26. response = await client.post(self.url, json=data)
  27. if response.status_code == 200:
  28. print("消息发送成功")
  29. else:
  30. print(f"消息发送失败,状态码: {response.status_code}")
  31. except httpx.RequestException as e:
  32. print(f"请求出现异常: {e}")
  33. class ExecuteBrowserActon(object):
  34. def click_elements(self, page, selector): # 点击元素
  35. try:
  36. # 等待元素可见
  37. page.wait_for_selector(selector, state='visible')
  38. # 点击元素
  39. page.click(selector)
  40. print("点击元素成功")
  41. except Exception as e:
  42. print(f"点击元素时发生异常: {e}")
  43. return False
  44. def check_elements(self, page, selector): # 检查元素
  45. try:
  46. # 等待元素可见
  47. page.wait_for_selector(selector, state='visible')
  48. # 获取元素的文本内容
  49. element_text = page.query_selector(selector).inner_text()
  50. # 返回元素的文本内容
  51. return element_text
  52. except Exception as e:
  53. print(f"检查元素时发生异常: {e}")
  54. return False
  55. def scroll_page(self, page): # 滚动页面
  56. pass
  57. def hide_elements(self, page, selector): # 隐藏元素
  58. try:
  59. page.evaluate('''() => {{
  60. const img = document.querySelector('{}');
  61. if (img) {{
  62. img.style.display = 'none';
  63. }}
  64. }}'''.format(selector))
  65. print("隐藏元素成功")
  66. except Exception as e:
  67. print(f"隐藏元素时发生异常: {e}")
  68. return False
  69. return True
  70. class BrowserAutomation:
  71. def __init__(self):
  72. self.use_proxy = USE_PROXY
  73. self.headless = HEADLESS
  74. self.need_user_data = NEED_USER_DATA
  75. self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank'
  76. self.http = 'http://'
  77. self.proxy_addr = 'http://127.0.0.1:7890'
  78. self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url
  79. self.user_data_dir = os.path.join(
  80. self.get_current_file_path(),
  81. 'user_data')
  82. def check_browser_alive(self, page, browser): # 挂起浏览器
  83. try:
  84. page.query_selector("head").inner_html()
  85. except Exception as e:
  86. browser.close()
  87. print(f"浏览器被关闭或发生异常: {e}")
  88. exit(0)
  89. def set_browser(self, playwright): # 设置浏览器
  90. if self.need_user_data:
  91. if self.use_proxy:
  92. browser = playwright.chromium.launch_persistent_context(
  93. self.user_data_dir,
  94. proxy={"server": self.proxy_addr},
  95. headless=self.headless,
  96. args=['--start-maximized'],
  97. no_viewport=True,
  98. bypass_csp=True, # 允许跨站脚本
  99. locale='zh-CN', # 设置浏览器语言为中文
  100. # 设置 user-agent
  101. user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
  102. )
  103. else:
  104. browser = playwright.chromium.launch_persistent_context(
  105. self.user_data_dir,
  106. headless=self.headless,
  107. args=['--start-maximized'],
  108. no_viewport=True,
  109. bypass_csp=True, # 允许跨站脚本
  110. locale='zh-CN', # 设置浏览器语言为中文
  111. # 设置 user-agent
  112. user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
  113. )
  114. else:
  115. if self.use_proxy:
  116. browser = playwright.chromium.launch(
  117. proxy={"server": self.proxy_addr},
  118. headless=self.headless,
  119. args=['--start-maximized']
  120. )
  121. else:
  122. browser = playwright.chromium.launch(
  123. headless=self.headless,
  124. args=['--start-maximized']
  125. )
  126. print('打开浏览器')
  127. return browser
  128. def get_current_file_path(self): # 获取当前文件路径
  129. return os.path.dirname(os.path.abspath(__file__))
  130. def exce_browser_action(self, page, browser):
  131. # 实例化执行浏览器动作类
  132. Exce = ExecuteBrowserActon()
  133. # 需要用到的所有 选择器
  134. hide_img1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-frontimage > img'
  135. hide_img2 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div:nth-child(1) > div > img'
  136. status_selector = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div.row.pow-status-top > div:nth-child(1) > div.status-value'
  137. button_selector1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-actions.center > button'
  138. # 隐藏两个图片
  139. while True:
  140. self.check_browser_alive(page, browser)
  141. result = Exce.hide_elements(page, hide_img1)
  142. time.sleep(0.5)
  143. if result:
  144. break
  145. while True:
  146. self.check_browser_alive(page, browser)
  147. result = Exce.hide_elements(page, hide_img2)
  148. time.sleep(0.5)
  149. if result:
  150. break
  151. # 监控领水数值
  152. while True:
  153. self.check_browser_alive(page, browser)
  154. status_value = Exce.check_elements(page, status_selector)
  155. if status_value:
  156. print(f"当前领取水滴数为: {status_value}")
  157. value = float(status_value.split(' ')[0].strip())
  158. if value >=2.49:
  159. print('领取水滴数大于2.49, 领取水滴')
  160. Exce.click_elements(page, button_selector1)
  161. break
  162. time.sleep(1)
  163. def open_browser(self): # 打开浏览器
  164. with sync_playwright() as playwright:
  165. browser = self.set_browser(playwright)
  166. if self.need_user_data:
  167. page = browser.pages[0]
  168. else:
  169. context = browser.new_context(no_viewport=True)
  170. page = context.new_page()
  171. page.goto(self.target_url)
  172. # TODO 此处开始执行自动化逻辑
  173. self.exce_browser_action(page, browser)
  174. # 自动化执行完成, 挂起浏览器挂机,
  175. while True:
  176. self.check_browser_alive(page, browser)
  177. time.sleep(1)
  178. def main(self):
  179. if self.need_user_data:
  180. if not os.path.exists(self.user_data_dir):
  181. os.mkdir(self.user_data_dir)
  182. self.open_browser()
  183. if __name__ == '__main__':
  184. try:
  185. from playwright.sync_api import sync_playwright
  186. except ImportError:
  187. print('playwright 未安装, 正在安装...')
  188. subprocess.check_call(
  189. [sys.executable, "-m", "pip", "install", "playwright"])
  190. time.sleep(1)
  191. # 这里要执行终端命令, playwright install
  192. subprocess.check_call([sys.executable, "-m", "playwright", "install"])
  193. time.sleep(1)
  194. subprocess.check_call(
  195. [sys.executable, "-m", "playwright", "install-deps"])
  196. print("playwright 安装完成,正在重新运行程序...")
  197. subprocess.check_call([sys.executable, __file__])
  198. B = BrowserAutomation()
  199. B.main()