|
@@ -0,0 +1,168 @@
|
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
|
+import os
|
|
|
|
|
+import time
|
|
|
|
|
+import subprocess
|
|
|
|
|
+import sys
|
|
|
|
|
+import asyncio
|
|
|
|
|
+import httpx
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+GO_TO_URL = 'https://sepolia-faucet.pk910.de/'
|
|
|
|
|
+# 是否使用代理, 是 True 否 False
|
|
|
|
|
+USE_PROXY = True
|
|
|
|
|
+# 是否无头模式, 是 True 否 False
|
|
|
|
|
+HEADLESS = False
|
|
|
|
|
+# 是否记录用户数据, 例如cookies, 是 True 否 False
|
|
|
|
|
+NEED_USER_DATA = False
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class GotifySender:
|
|
|
|
|
+ def __init__(self, url, token='A0Xg6ZE5946iBYg'):
|
|
|
|
|
+ self.url = url + f"?token={token}"
|
|
|
|
|
+
|
|
|
|
|
+ async def send_message(self, title='测试标题', message='测试消息', priority=5):
|
|
|
|
|
+ data = {
|
|
|
|
|
+ "message": message,
|
|
|
|
|
+ "title": title,
|
|
|
|
|
+ "priority": priority
|
|
|
|
|
+ }
|
|
|
|
|
+ async with httpx.AsyncClient() as client:
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = await client.post(self.url, json=data)
|
|
|
|
|
+ if response.status_code == 200:
|
|
|
|
|
+ print("消息发送成功")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"消息发送失败,状态码: {response.status_code}")
|
|
|
|
|
+ except httpx.RequestException as e:
|
|
|
|
|
+ print(f"请求出现异常: {e}")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ExecuteBrowserActon(object):
|
|
|
|
|
+ def click_elements(self, page): # 点击元素
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ def check_elements(self, page): # 检查元素
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ def scroll_page(self): # 滚动页面
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class BrowserAutomation:
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ self.use_proxy = USE_PROXY
|
|
|
|
|
+ self.headless = HEADLESS
|
|
|
|
|
+ self.need_user_data = NEED_USER_DATA
|
|
|
|
|
+ self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank'
|
|
|
|
|
+ self.http = 'http://'
|
|
|
|
|
+ self.proxy_addr = 'http://127.0.0.1:7890'
|
|
|
|
|
+ self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url
|
|
|
|
|
+ self.user_data_dir = os.path.join(
|
|
|
|
|
+ self.get_current_file_path(),
|
|
|
|
|
+ 'user_data')
|
|
|
|
|
+
|
|
|
|
|
+ def hang_up_browser(self, page): # 挂起浏览器
|
|
|
|
|
+ try:
|
|
|
|
|
+ while True:
|
|
|
|
|
+ page.query_selector("head").inner_html()
|
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"浏览器被关闭或发生异常: {e}")
|
|
|
|
|
+ print('关闭浏览器')
|
|
|
|
|
+
|
|
|
|
|
+ def set_browser(self, playwright): # 设置浏览器
|
|
|
|
|
+ if self.need_user_data:
|
|
|
|
|
+ if self.use_proxy:
|
|
|
|
|
+ browser = playwright.chromium.launch_persistent_context(
|
|
|
|
|
+ self.user_data_dir,
|
|
|
|
|
+ proxy={"server": self.proxy_addr},
|
|
|
|
|
+ headless=self.headless,
|
|
|
|
|
+ args=['--start-maximized'],
|
|
|
|
|
+ no_viewport=True,
|
|
|
|
|
+ bypass_csp=True, # 允许跨站脚本
|
|
|
|
|
+ locale='zh-CN', # 设置浏览器语言为中文
|
|
|
|
|
+ # 设置 user-agent
|
|
|
|
|
+ user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ browser = playwright.chromium.launch_persistent_context(
|
|
|
|
|
+ self.user_data_dir,
|
|
|
|
|
+ headless=self.headless,
|
|
|
|
|
+ args=['--start-maximized'],
|
|
|
|
|
+ no_viewport=True,
|
|
|
|
|
+ bypass_csp=True, # 允许跨站脚本
|
|
|
|
|
+ locale='zh-CN', # 设置浏览器语言为中文
|
|
|
|
|
+ # 设置 user-agent
|
|
|
|
|
+ user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ if self.use_proxy:
|
|
|
|
|
+ browser = playwright.chromium.launch(
|
|
|
|
|
+ proxy={"server": self.proxy_addr},
|
|
|
|
|
+ headless=self.headless,
|
|
|
|
|
+ args=['--start-maximized']
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ browser = playwright.chromium.launch(
|
|
|
|
|
+ headless=self.headless,
|
|
|
|
|
+ args=['--start-maximized']
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ print('打开浏览器')
|
|
|
|
|
+
|
|
|
|
|
+ return browser
|
|
|
|
|
+
|
|
|
|
|
+ def get_current_file_path(self): # 获取当前文件路径
|
|
|
|
|
+ return os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
+
|
|
|
|
|
+ def exce_browser_action(self, page):
|
|
|
|
|
+ # 实例化执行浏览器动作类
|
|
|
|
|
+ Exce = ExecuteBrowserActon()
|
|
|
|
|
+
|
|
|
|
|
+ # 模拟各种动作
|
|
|
|
|
+
|
|
|
|
|
+ def open_browser(self): # 打开浏览器
|
|
|
|
|
+ with sync_playwright() as playwright:
|
|
|
|
|
+ browser = self.set_browser(playwright)
|
|
|
|
|
+
|
|
|
|
|
+ if self.need_user_data:
|
|
|
|
|
+ page = browser.pages[0]
|
|
|
|
|
+ else:
|
|
|
|
|
+ context = browser.new_context(no_viewport=True)
|
|
|
|
|
+ page = context.new_page()
|
|
|
|
|
+
|
|
|
|
|
+ page.goto(self.target_url)
|
|
|
|
|
+
|
|
|
|
|
+ # TODO 此处开始执行自动化逻辑
|
|
|
|
|
+ self.exce_browser_action(page)
|
|
|
|
|
+
|
|
|
|
|
+ # 自动化执行完成, 挂起浏览器挂机,
|
|
|
|
|
+ self.hang_up_browser(page)
|
|
|
|
|
+ browser.close()
|
|
|
|
|
+
|
|
|
|
|
+ def main(self):
|
|
|
|
|
+ if self.need_user_data:
|
|
|
|
|
+ if not os.path.exists(self.user_data_dir):
|
|
|
|
|
+ os.mkdir(self.user_data_dir)
|
|
|
|
|
+
|
|
|
|
|
+ self.open_browser()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
|
+ try:
|
|
|
|
|
+ from playwright.sync_api import sync_playwright
|
|
|
|
|
+ except ImportError:
|
|
|
|
|
+ print('playwright 未安装, 正在安装...')
|
|
|
|
|
+ subprocess.check_call(
|
|
|
|
|
+ [sys.executable, "-m", "pip", "install", "playwright"])
|
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
+ # 这里要执行终端命令, playwright install
|
|
|
|
|
+ subprocess.check_call([sys.executable, "-m", "playwright", "install"])
|
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
+ subprocess.check_call(
|
|
|
|
|
+ [sys.executable, "-m", "playwright", "install-deps"])
|
|
|
|
|
+ print("playwright 安装完成,正在重新运行程序...")
|
|
|
|
|
+ subprocess.check_call([sys.executable, __file__])
|
|
|
|
|
+
|
|
|
|
|
+ B = BrowserAutomation()
|
|
|
|
|
+ B.main()
|