# -*- coding: utf-8 -*- import os import time import subprocess import sys import asyncio import httpx GO_TO_URL = 'https://sepolia-faucet.pk910.de/' # 是否使用代理, 是 True 否 False USE_PROXY = True # 是否无头模式, 是 True 否 False HEADLESS = False # 是否记录用户数据, 例如cookies, 是 True 否 False NEED_USER_DATA = True class GotifySender: def __init__(self, url, token='A0Xg6ZE5946iBYg'): self.url = url + f"?token={token}" async def send_message(self, title='测试标题', message='测试消息', priority=5): data = { "message": message, "title": title, "priority": priority } async with httpx.AsyncClient() as client: try: response = await client.post(self.url, json=data) if response.status_code == 200: print("消息发送成功") else: print(f"消息发送失败,状态码: {response.status_code}") except httpx.RequestException as e: print(f"请求出现异常: {e}") class ExecuteBrowserActon(object): def click_elements(self, page, selector): # 点击元素 try: # 等待元素可见 page.wait_for_selector(selector, state='visible') # 点击元素 page.click(selector) print("点击元素成功") except Exception as e: print(f"点击元素时发生异常: {e}") return False def check_elements(self, page, selector): # 检查元素 try: # 等待元素可见 page.wait_for_selector(selector, state='visible') # 获取元素的文本内容 element_text = page.query_selector(selector).inner_text() # 返回元素的文本内容 return element_text except Exception as e: print(f"检查元素时发生异常: {e}") return False def scroll_page(self, page): # 滚动页面 pass def hide_elements(self, page, selector): # 隐藏元素 try: page.evaluate('''() => {{ const img = document.querySelector('{}'); if (img) {{ img.style.display = 'none'; }} }}'''.format(selector)) print("隐藏元素成功") except Exception as e: print(f"隐藏元素时发生异常: {e}") return False return True class BrowserAutomation: def __init__(self): self.use_proxy = USE_PROXY self.headless = HEADLESS self.need_user_data = NEED_USER_DATA self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank' self.http = 'http://' self.proxy_addr = 'http://127.0.0.1:7890' self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url self.user_data_dir = os.path.join( self.get_current_file_path(), 'user_data') def check_browser_alive(self, page, browser): # 挂起浏览器 try: page.query_selector("head").inner_html() except Exception as e: browser.close() print(f"浏览器被关闭或发生异常: {e}") exit(0) def set_browser(self, playwright): # 设置浏览器 if self.need_user_data: if self.use_proxy: browser = playwright.chromium.launch_persistent_context( self.user_data_dir, proxy={"server": self.proxy_addr}, headless=self.headless, args=['--start-maximized'], no_viewport=True, bypass_csp=True, # 允许跨站脚本 locale='zh-CN', # 设置浏览器语言为中文 # 设置 user-agent user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36' ) else: browser = playwright.chromium.launch_persistent_context( self.user_data_dir, headless=self.headless, args=['--start-maximized'], no_viewport=True, bypass_csp=True, # 允许跨站脚本 locale='zh-CN', # 设置浏览器语言为中文 # 设置 user-agent user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36' ) else: if self.use_proxy: browser = playwright.chromium.launch( proxy={"server": self.proxy_addr}, headless=self.headless, args=['--start-maximized'] ) else: browser = playwright.chromium.launch( headless=self.headless, args=['--start-maximized'] ) print('打开浏览器') return browser def get_current_file_path(self): # 获取当前文件路径 return os.path.dirname(os.path.abspath(__file__)) def exce_browser_action(self, page, browser): # 实例化执行浏览器动作类 Exce = ExecuteBrowserActon() # 需要用到的所有 选择器 hide_img1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-frontimage > img' hide_img2 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div:nth-child(1) > div > img' status_selector = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div.row.pow-status-top > div:nth-child(1) > div.status-value' button_selector1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-actions.center > button' # 隐藏两个图片 while True: self.check_browser_alive(page, browser) result = Exce.hide_elements(page, hide_img1) time.sleep(0.5) if result: break while True: self.check_browser_alive(page, browser) result = Exce.hide_elements(page, hide_img2) time.sleep(0.5) if result: break # 监控领水数值 while True: self.check_browser_alive(page, browser) status_value = Exce.check_elements(page, status_selector) if status_value: print(f"当前领取水滴数为: {status_value}") value = float(status_value.split(' ')[0].strip()) if value >=2.49: print('领取水滴数大于2.49, 领取水滴') Exce.click_elements(page, button_selector1) break time.sleep(1) def open_browser(self): # 打开浏览器 with sync_playwright() as playwright: browser = self.set_browser(playwright) if self.need_user_data: page = browser.pages[0] else: context = browser.new_context(no_viewport=True) page = context.new_page() page.goto(self.target_url) # TODO 此处开始执行自动化逻辑 self.exce_browser_action(page, browser) # 自动化执行完成, 挂起浏览器挂机, while True: self.check_browser_alive(page, browser) time.sleep(1) def main(self): if self.need_user_data: if not os.path.exists(self.user_data_dir): os.mkdir(self.user_data_dir) self.open_browser() if __name__ == '__main__': try: from playwright.sync_api import sync_playwright except ImportError: print('playwright 未安装, 正在安装...') subprocess.check_call( [sys.executable, "-m", "pip", "install", "playwright"]) time.sleep(1) # 这里要执行终端命令, playwright install subprocess.check_call([sys.executable, "-m", "playwright", "install"]) time.sleep(1) subprocess.check_call( [sys.executable, "-m", "playwright", "install-deps"]) print("playwright 安装完成,正在重新运行程序...") subprocess.check_call([sys.executable, __file__]) B = BrowserAutomation() B.main()