| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- # -*- coding: utf-8 -*-
- import os
- import time
- import subprocess
- import sys
- import asyncio
- import httpx
- GO_TO_URL = 'https://sepolia-faucet.pk910.de/'
- # 是否使用代理, 是 True 否 False
- USE_PROXY = True
- # 是否无头模式, 是 True 否 False
- HEADLESS = False
- # 是否记录用户数据, 例如cookies, 是 True 否 False
- NEED_USER_DATA = False
- class GotifySender:
- def __init__(self, url, token='A0Xg6ZE5946iBYg'):
- self.url = url + f"?token={token}"
- async def send_message(self, title='测试标题', message='测试消息', priority=5):
- data = {
- "message": message,
- "title": title,
- "priority": priority
- }
- async with httpx.AsyncClient() as client:
- try:
- response = await client.post(self.url, json=data)
- if response.status_code == 200:
- print("消息发送成功")
- else:
- print(f"消息发送失败,状态码: {response.status_code}")
- except httpx.RequestException as e:
- print(f"请求出现异常: {e}")
- class ExecuteBrowserActon(object):
- def click_elements(self, page): # 点击元素
- pass
- def check_elements(self, page): # 检查元素
- pass
- def scroll_page(self): # 滚动页面
- pass
- class BrowserAutomation:
- def __init__(self):
- self.use_proxy = USE_PROXY
- self.headless = HEADLESS
- self.need_user_data = NEED_USER_DATA
- self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank'
- self.http = 'http://'
- self.proxy_addr = 'http://127.0.0.1:7890'
- self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url
- self.user_data_dir = os.path.join(
- self.get_current_file_path(),
- 'user_data')
- def hang_up_browser(self, page): # 挂起浏览器
- try:
- while True:
- page.query_selector("head").inner_html()
- time.sleep(1)
- except Exception as e:
- print(f"浏览器被关闭或发生异常: {e}")
- print('关闭浏览器')
- def set_browser(self, playwright): # 设置浏览器
- if self.need_user_data:
- if self.use_proxy:
- browser = playwright.chromium.launch_persistent_context(
- self.user_data_dir,
- proxy={"server": self.proxy_addr},
- headless=self.headless,
- args=['--start-maximized'],
- no_viewport=True,
- bypass_csp=True, # 允许跨站脚本
- locale='zh-CN', # 设置浏览器语言为中文
- # 设置 user-agent
- user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
- )
- else:
- browser = playwright.chromium.launch_persistent_context(
- self.user_data_dir,
- headless=self.headless,
- args=['--start-maximized'],
- no_viewport=True,
- bypass_csp=True, # 允许跨站脚本
- locale='zh-CN', # 设置浏览器语言为中文
- # 设置 user-agent
- user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
- )
- else:
- if self.use_proxy:
- browser = playwright.chromium.launch(
- proxy={"server": self.proxy_addr},
- headless=self.headless,
- args=['--start-maximized']
- )
- else:
- browser = playwright.chromium.launch(
- headless=self.headless,
- args=['--start-maximized']
- )
- print('打开浏览器')
- return browser
- def get_current_file_path(self): # 获取当前文件路径
- return os.path.dirname(os.path.abspath(__file__))
- def exce_browser_action(self, page):
- # 实例化执行浏览器动作类
- Exce = ExecuteBrowserActon()
- # 模拟各种动作
- def open_browser(self): # 打开浏览器
- with sync_playwright() as playwright:
- browser = self.set_browser(playwright)
- if self.need_user_data:
- page = browser.pages[0]
- else:
- context = browser.new_context(no_viewport=True)
- page = context.new_page()
- page.goto(self.target_url)
- # TODO 此处开始执行自动化逻辑
- self.exce_browser_action(page)
- # 自动化执行完成, 挂起浏览器挂机,
- self.hang_up_browser(page)
- browser.close()
- def main(self):
- if self.need_user_data:
- if not os.path.exists(self.user_data_dir):
- os.mkdir(self.user_data_dir)
- self.open_browser()
- if __name__ == '__main__':
- try:
- from playwright.sync_api import sync_playwright
- except ImportError:
- print('playwright 未安装, 正在安装...')
- subprocess.check_call(
- [sys.executable, "-m", "pip", "install", "playwright"])
- time.sleep(1)
- # 这里要执行终端命令, playwright install
- subprocess.check_call([sys.executable, "-m", "playwright", "install"])
- time.sleep(1)
- subprocess.check_call(
- [sys.executable, "-m", "playwright", "install-deps"])
- print("playwright 安装完成,正在重新运行程序...")
- subprocess.check_call([sys.executable, __file__])
- B = BrowserAutomation()
- B.main()
|