# -*- coding: utf-8 -*- import os import time import subprocess import sys import asyncio import httpx GO_TO_URL = 'https://sepolia-faucet.pk910.de/' # 是否使用代理, 是 True 否 False USE_PROXY = True # 是否无头模式, 是 True 否 False HEADLESS = False # 是否记录用户数据, 例如cookies, 是 True 否 False NEED_USER_DATA = False class GotifySender: def __init__(self, url, token='A0Xg6ZE5946iBYg'): self.url = url + f"?token={token}" async def send_message(self, title='测试标题', message='测试消息', priority=5): data = { "message": message, "title": title, "priority": priority } async with httpx.AsyncClient() as client: try: response = await client.post(self.url, json=data) if response.status_code == 200: print("消息发送成功") else: print(f"消息发送失败,状态码: {response.status_code}") except httpx.RequestException as e: print(f"请求出现异常: {e}") class ExecuteBrowserActon(object): def click_elements(self, page): # 点击元素 pass def check_elements(self, page): # 检查元素 pass def scroll_page(self): # 滚动页面 pass class BrowserAutomation: def __init__(self): self.use_proxy = USE_PROXY self.headless = HEADLESS self.need_user_data = NEED_USER_DATA self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank' self.http = 'http://' self.proxy_addr = 'http://127.0.0.1:7890' self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url self.user_data_dir = os.path.join( self.get_current_file_path(), 'user_data') def hang_up_browser(self, page): # 挂起浏览器 try: while True: page.query_selector("head").inner_html() time.sleep(1) except Exception as e: print(f"浏览器被关闭或发生异常: {e}") print('关闭浏览器') def set_browser(self, playwright): # 设置浏览器 if self.need_user_data: if self.use_proxy: browser = playwright.chromium.launch_persistent_context( self.user_data_dir, proxy={"server": self.proxy_addr}, headless=self.headless, args=['--start-maximized'], no_viewport=True, bypass_csp=True, # 允许跨站脚本 locale='zh-CN', # 设置浏览器语言为中文 # 设置 user-agent user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36' ) else: browser = playwright.chromium.launch_persistent_context( self.user_data_dir, headless=self.headless, args=['--start-maximized'], no_viewport=True, bypass_csp=True, # 允许跨站脚本 locale='zh-CN', # 设置浏览器语言为中文 # 设置 user-agent user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36' ) else: if self.use_proxy: browser = playwright.chromium.launch( proxy={"server": self.proxy_addr}, headless=self.headless, args=['--start-maximized'] ) else: browser = playwright.chromium.launch( headless=self.headless, args=['--start-maximized'] ) print('打开浏览器') return browser def get_current_file_path(self): # 获取当前文件路径 return os.path.dirname(os.path.abspath(__file__)) def exce_browser_action(self, page): # 实例化执行浏览器动作类 Exce = ExecuteBrowserActon() # 模拟各种动作 def open_browser(self): # 打开浏览器 with sync_playwright() as playwright: browser = self.set_browser(playwright) if self.need_user_data: page = browser.pages[0] else: context = browser.new_context(no_viewport=True) page = context.new_page() page.goto(self.target_url) # TODO 此处开始执行自动化逻辑 self.exce_browser_action(page) # 自动化执行完成, 挂起浏览器挂机, self.hang_up_browser(page) browser.close() def main(self): if self.need_user_data: if not os.path.exists(self.user_data_dir): os.mkdir(self.user_data_dir) self.open_browser() if __name__ == '__main__': try: from playwright.sync_api import sync_playwright except ImportError: print('playwright 未安装, 正在安装...') subprocess.check_call( [sys.executable, "-m", "pip", "install", "playwright"]) time.sleep(1) # 这里要执行终端命令, playwright install subprocess.check_call([sys.executable, "-m", "playwright", "install"]) time.sleep(1) subprocess.check_call( [sys.executable, "-m", "playwright", "install-deps"]) print("playwright 安装完成,正在重新运行程序...") subprocess.check_call([sys.executable, __file__]) B = BrowserAutomation() B.main()