| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- # -*- coding: utf-8 -*-
- """
- 消息模块基础, 用于打开浏览器等相关操作
- """
- import random
- from playwright.sync_api import sync_playwright
- import sys
- import os
- import time
- sys.path.append(os.path.join(os.path.abspath(__file__).split('auto')[0] + 'auto'))
- from utils.utils_logs_handle import LogsHandle
- class CryptoCrawler:
- def __init__(self, url_list, selectors, check_difference=False, headless=True, proxy=False):
- self.url_list = url_list
- self.selectors = selectors
- self.check_difference = check_difference # 用于检测数据是否发生变化 (开关)
- self.data_difference = False # 用于检测数据是否发生变化 (结果) (默认 否)
- self.logs_handle = LogsHandle() # 记录日志
- self.db = 'CHECK'
- self.collection = 'check'
- self.headless = headless
- self.proxy = proxy
- def main(self):
- with sync_playwright() as playwright:
- if self.proxy:
- browser = playwright.webkit.launch(headless=self.headless, proxy={'server': '127.0.0.1:7890'})
- else:
- browser = playwright.webkit.launch(headless=self.headless)
- context = browser.new_context(viewport={'width': 1920, 'height': 1080})
- page = context.new_page()
- all_data = []
- for url_info in self.url_list:
- for key, url in url_info.items():
- result_list = []
- try:
- page.goto(url)
- page.wait_for_load_state('load')
- time.sleep(5) # 确保页面完全加载
- for selector in self.selectors:
- element = page.query_selector(selector)
- if element:
- res = element.text_content().strip()
- result_list.append({key: res})
- except Exception as e:
- err_str = f"Error fetching {url}: {e}"
- self.logs_handle.logs_write(self.collection, err_str, 'error', False)
- continue
- if result_list:
- all_data.append(result_list)
- time.sleep(random.randint(1, 3))
- browser.close()
- if all_data:
- return all_data
- else:
- return None
|