Ver código fonte

first commit

jack 1 ano atrás
commit
8c479b5ccd
4 arquivos alterados com 301 adições e 0 exclusões
  1. 63 0
      .gitignore
  2. 45 0
      base_playwright_browser.py
  3. 40 0
      base_send_gotify.py
  4. 153 0
      main.py

+ 63 - 0
.gitignore

@@ -0,0 +1,63 @@
+.DS_Store
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+env/
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+*.egg-info/
+.installed.cfg
+*.egg
+.idea/*
+xml_files/
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*,cover
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+target/
+
+tasks_config

+ 45 - 0
base_playwright_browser.py

@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+"""
+消息模块基础, 用于打开浏览器等相关操作
+"""
+import random
+
+from playwright.sync_api import sync_playwright
+import time
+
+
+class CryptoCrawler:
+    def __init__(self, url, selector, headless=True):
+        self.url = url
+        self.selector = selector
+        self.headless = headless
+
+    def main(self):
+        with sync_playwright() as playwright:
+            browser = playwright.webkit.launch(headless=self.headless)
+            if self.headless:
+                context = browser.new_context()
+            else:
+                context = browser.new_context(viewport={'width': 1920, 'height': 1080})
+            page = context.new_page()
+
+            result = ""
+            try:
+                page.goto(self.url)
+                page.wait_for_load_state('load')
+                time.sleep(2)  # 确保页面完全加载
+
+                element = page.query_selector(self.selector)
+                if element:
+                    res = element.text_content().strip()
+                    result = res
+            except Exception as e:
+                err_str = f"Error fetching {self.url}: {e}"
+                return err_str
+
+            page.close()
+            browser.close()
+
+            if result:
+                return result
+            return None

+ 40 - 0
base_send_gotify.py

@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+
+import httpx
+
+
+class GotifyNotifier:
+    def __init__(self, title, message):
+        self.gotify_url = 'https://gotify.erhe.top'
+        self.app_token = 'Aw7XKE2Ppk7Dgwk'
+        self.title = title
+        self.message = message
+
+    def send_message(self):
+        # 构建POST请求的headers
+        headers = {
+            'Content-Type': 'application/json'
+        }
+
+        # 构建POST请求的body
+        body = {
+            'title': self.title,
+            'message': self.message
+        }
+
+        # 发送POST请求
+        with httpx.Client() as client:
+            response = client.post(
+                url=f"{self.gotify_url}/message?token={self.app_token}",
+                headers=headers,
+                json=body
+            )
+
+        # 或者可以使用 curl
+        # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5"
+
+        # 检查响应状态码
+        if response.status_code == 200:
+            print('Gotify Message sent successfully!')
+        else:
+            print('Failed to send message:', response.text)

+ 153 - 0
main.py

@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+
+import time
+from datetime import datetime, timedelta
+import glob
+import os
+import json
+from base_playwright_browser import CryptoCrawler
+from base_send_gotify import GotifyNotifier
+
+
+class RunTasks():
+    def __init__(self):
+        pass
+
+    def check_result(self, task, browser_result_data):
+        task_config_path = task[0]
+        task_config_data = task[1]
+
+        # 更新 result_value 字段
+        task_config_data['result_value'] = browser_result_data
+
+        # 减少 execution_times 字段
+        task_config_data['execution_times'] -= 1
+
+        # 检查 task_interval 字段是否存在并且是正整数
+        if 'task_interval' in task_config_data and isinstance(task_config_data['task_interval'], int) and \
+                task_config_data['task_interval'] > 0:
+            # 读取 execution_time 字段的值
+            execution_time_str = task_config_data['execution_time']
+            execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
+
+            # 计算新的 execution_time
+            new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
+
+            # 更新 execution_time 字段
+            task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
+
+            # 更新最新完成时间
+            task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+
+            # 保存更新后的配置数据
+            with open(task_config_path, 'w', encoding='utf-8') as f:
+                json.dump(task_config_data, f, indent=4, ensure_ascii=False)
+
+    def check_config(self, data):
+        # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0
+        if data['execution_times'] != 0:
+            # 将字符串格式的日期时间转换为datetime对象
+            execution_time_str = data['execution_time']
+            execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
+
+            # 获取当前时间的datetime对象
+            current_time_dt = datetime.now()
+
+            # 将datetime对象转换为Unix时间戳(整数)
+            execution_time = int(execution_time_dt.timestamp())
+            current_time = int(current_time_dt.timestamp())
+
+            # 比较配置的执行时间和当前时间
+            if execution_time < current_time:
+                print("Execution time is in the future.")
+                return True
+            else:
+                print("Execution time has passed or is now.")
+                return False
+        else:
+            return False
+
+    def load_config(self):
+        # 先获取当前路径, 然后获取 tasks_config 文件夹里面的 json 文件
+        current_path = os.getcwd()
+        config_path = os.path.join(current_path, 'tasks_config')
+
+        if not os.path.exists(config_path):
+            os.makedirs(config_path)
+
+        # 构建配置文件目录下所有.json文件的路径模式
+        json_files_pattern = os.path.join(config_path, '*.json')
+
+        # 使用glob模块查找所有匹配的.json文件
+        json_files = glob.glob(json_files_pattern)
+
+        all_tasks = []
+
+        if not json_files:
+            # 创建一个 json 文件, 名字叫 default.json
+            default_json_path = os.path.join(config_path, 'default.json')
+
+            # 定义默认的键值对
+            default_data = {
+                "task_name": "任务名称",
+                "description": "任务说明",
+                "target_url": "目标地址",
+                "target_selector": "目标css选择器",
+                "result_value": "返回值,留空",
+                "execution_time": "9999-12-31 23:59:59",
+                "task_interval": "任务执行间隔",
+                "completion_time": "完成时间,留空",
+                "execution_times": 0,
+                "logs": "日志,留空"
+            }
+
+            # 将默认数据写入 default.json 文件
+            with open(default_json_path, 'w', encoding='utf-8') as file:
+                json.dump(default_data, file, indent=4, ensure_ascii=False)
+
+            print(f"Created default.json at {default_json_path}")
+            exit(0)
+        else:
+            for json_file in json_files:
+                with open(json_file, 'r', encoding='utf-8') as file:
+                    data = json.load(file)
+
+                    # 需要检查一下 config 文件, 执行时间是否达到要求, 推送次数是否大于 0
+                    if self.check_config(data):
+                        all_tasks.append([json_file, data])
+
+            print(f'Successfully read all tasks, {len(all_tasks)} task in total')
+
+            return all_tasks
+
+    def run(self, task):
+        task_config_path = task[0]
+        task_config_data = task[1]
+        task_name = task_config_data['task_name']
+        target_url = task_config_data['target_url']
+        target_selector = task_config_data['target_selector']
+
+        print(f'开始执行任务 {task_name}')
+
+        # 打开浏览器, 获取目标页面数据
+        # browser_result_data = CryptoCrawler(url=target_url, selector=target_selector).main()
+        browser_result_data = '网页返回值'
+
+        # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1
+        # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval
+        # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟
+        if browser_result_data:
+            self.check_result(task, browser_result_data)
+            GotifyNotifier(title=task_name, message=browser_result_data).send_message()
+
+    def main(self):
+        all_tasks = self.load_config()
+
+        result_config_data = []
+        for task in all_tasks:
+            self.run(task)
+
+
+if __name__ == '__main__':
+    run_tasks = RunTasks()
+    run_tasks.main()