| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- # -*- coding: utf-8 -*-
- import os
- import time
- import subprocess
- import sys
- import asyncio
- import httpx
- GO_TO_URL = 'https://sepolia-faucet.pk910.de/'
- # 是否使用代理, 是 True 否 False
- USE_PROXY = True
- # 是否无头模式, 是 True 否 False
- HEADLESS = False
- # 是否记录用户数据, 例如cookies, 是 True 否 False
- NEED_USER_DATA = True
- class GotifySender:
- def __init__(self, url, token='A0Xg6ZE5946iBYg'):
- self.url = url + f"?token={token}"
- async def send_message(self, title='测试标题', message='测试消息', priority=5):
- data = {
- "message": message,
- "title": title,
- "priority": priority
- }
- async with httpx.AsyncClient() as client:
- try:
- response = await client.post(self.url, json=data)
- if response.status_code == 200:
- print("消息发送成功")
- else:
- print(f"消息发送失败,状态码: {response.status_code}")
- except httpx.RequestException as e:
- print(f"请求出现异常: {e}")
- class ExecuteBrowserActon(object):
- def click_elements(self, page, selector): # 点击元素
- try:
- # 等待元素可见
- page.wait_for_selector(selector, state='visible')
- # 点击元素
- page.click(selector)
- print("点击元素成功")
- except Exception as e:
- print(f"点击元素时发生异常: {e}")
- return False
- def check_elements(self, page, selector): # 检查元素
- try:
- # 等待元素可见
- page.wait_for_selector(selector, state='visible')
- # 获取元素的文本内容
- element_text = page.query_selector(selector).inner_text()
- # 返回元素的文本内容
- return element_text
- except Exception as e:
- print(f"检查元素时发生异常: {e}")
- return False
- def scroll_page(self, page): # 滚动页面
- pass
- def hide_elements(self, page, selector): # 隐藏元素
- try:
- page.evaluate('''() => {{
- const img = document.querySelector('{}');
- if (img) {{
- img.style.display = 'none';
- }}
- }}'''.format(selector))
- print("隐藏元素成功")
- except Exception as e:
- print(f"隐藏元素时发生异常: {e}")
- return False
- return True
- class BrowserAutomation:
- def __init__(self):
- self.use_proxy = USE_PROXY
- self.headless = HEADLESS
- self.need_user_data = NEED_USER_DATA
- self.go_to_url = GO_TO_URL if GO_TO_URL else 'about:blank'
- self.http = 'http://'
- self.proxy_addr = 'http://127.0.0.1:7890'
- self.target_url = self.go_to_url if 'http' in self.go_to_url else self.http + self.go_to_url
- self.user_data_dir = os.path.join(
- self.get_current_file_path(),
- 'user_data')
- def check_browser_alive(self, page, browser): # 挂起浏览器
- try:
- page.query_selector("head").inner_html()
- except Exception as e:
- browser.close()
- print(f"浏览器被关闭或发生异常: {e}")
- exit(0)
- def set_browser(self, playwright): # 设置浏览器
- if self.need_user_data:
- if self.use_proxy:
- browser = playwright.chromium.launch_persistent_context(
- self.user_data_dir,
- proxy={"server": self.proxy_addr},
- headless=self.headless,
- args=['--start-maximized'],
- no_viewport=True,
- bypass_csp=True, # 允许跨站脚本
- locale='zh-CN', # 设置浏览器语言为中文
- # 设置 user-agent
- user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
- )
- else:
- browser = playwright.chromium.launch_persistent_context(
- self.user_data_dir,
- headless=self.headless,
- args=['--start-maximized'],
- no_viewport=True,
- bypass_csp=True, # 允许跨站脚本
- locale='zh-CN', # 设置浏览器语言为中文
- # 设置 user-agent
- user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.72 Safari/537.36'
- )
- else:
- if self.use_proxy:
- browser = playwright.chromium.launch(
- proxy={"server": self.proxy_addr},
- headless=self.headless,
- args=['--start-maximized']
- )
- else:
- browser = playwright.chromium.launch(
- headless=self.headless,
- args=['--start-maximized']
- )
- print('打开浏览器')
- return browser
- def get_current_file_path(self): # 获取当前文件路径
- return os.path.dirname(os.path.abspath(__file__))
- def exce_browser_action(self, page, browser):
- # 实例化执行浏览器动作类
- Exce = ExecuteBrowserActon()
- # 需要用到的所有 选择器
- hide_img1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-frontimage > img'
- hide_img2 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div:nth-child(1) > div > img'
- status_selector = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.pow-status-container > div > div.row.pow-status-top > div:nth-child(1) > div.status-value'
- button_selector1 = 'body > div.faucet-wrapper > div > div > div > div.faucet-body > div > div.faucet-actions.center > button'
- # 隐藏两个图片
- while True:
- self.check_browser_alive(page, browser)
- result = Exce.hide_elements(page, hide_img1)
- time.sleep(0.5)
- if result:
- break
- while True:
- self.check_browser_alive(page, browser)
- result = Exce.hide_elements(page, hide_img2)
- time.sleep(0.5)
- if result:
- break
- # 监控领水数值
- while True:
- self.check_browser_alive(page, browser)
- status_value = Exce.check_elements(page, status_selector)
- if status_value:
- print(f"当前领取水滴数为: {status_value}")
- value = float(status_value.split(' ')[0].strip())
- if value >=2.49:
- print('领取水滴数大于2.49, 领取水滴')
- Exce.click_elements(page, button_selector1)
- break
- time.sleep(1)
- def open_browser(self): # 打开浏览器
- with sync_playwright() as playwright:
- browser = self.set_browser(playwright)
- if self.need_user_data:
- page = browser.pages[0]
- else:
- context = browser.new_context(no_viewport=True)
- page = context.new_page()
- page.goto(self.target_url)
- # TODO 此处开始执行自动化逻辑
- self.exce_browser_action(page, browser)
- # 自动化执行完成, 挂起浏览器挂机,
- while True:
- self.check_browser_alive(page, browser)
- time.sleep(1)
- def main(self):
- if self.need_user_data:
- if not os.path.exists(self.user_data_dir):
- os.mkdir(self.user_data_dir)
- self.open_browser()
- if __name__ == '__main__':
- try:
- from playwright.sync_api import sync_playwright
- except ImportError:
- print('playwright 未安装, 正在安装...')
- subprocess.check_call(
- [sys.executable, "-m", "pip", "install", "playwright"])
- time.sleep(1)
- # 这里要执行终端命令, playwright install
- subprocess.check_call([sys.executable, "-m", "playwright", "install"])
- time.sleep(1)
- subprocess.check_call(
- [sys.executable, "-m", "playwright", "install-deps"])
- print("playwright 安装完成,正在重新运行程序...")
- subprocess.check_call([sys.executable, __file__])
- B = BrowserAutomation()
- B.main()
|