# -*- coding: utf-8 -*- import time from datetime import datetime, timedelta import glob import os import json from base_playwright_browser import CryptoCrawler from base_send_gotify import GotifyNotifier class RunTasks(): def __init__(self): pass def check_result(self, task, browser_result_data): task_config_path = task[0] task_config_data = task[1] # 更新 result_value 字段 task_config_data['result_value'] = browser_result_data # 减少 execution_times 字段 task_config_data['execution_times'] -= 1 # 检查 task_interval 字段是否存在并且是正整数 if 'task_interval' in task_config_data and isinstance(task_config_data['task_interval'], int) and \ task_config_data['task_interval'] > 0: if task_config_data['task_interval'] != 0: # 读取 execution_time 字段的值 execution_time_str = task_config_data['execution_time'] execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S') # 计算新的 execution_time new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval']) # 更新 execution_time 字段 task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S') else: task_config_data['execution_times'] = 0 # 更新最新完成时间 task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 保存更新后的配置数据 with open(task_config_path, 'w', encoding='utf-8') as f: json.dump(task_config_data, f, indent=4, ensure_ascii=False) def check_config(self, data): # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0 if data['execution_times'] != 0: # 将字符串格式的日期时间转换为datetime对象 execution_time_str = data['execution_time'] execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S') # 获取当前时间的datetime对象 current_time_dt = datetime.now() # 将datetime对象转换为Unix时间戳(整数) execution_time = int(execution_time_dt.timestamp()) current_time = int(current_time_dt.timestamp()) # 比较配置的执行时间和当前时间 if execution_time < current_time: print("Execution time is in the future.") return True else: print("Execution time has passed or is now.") return False else: return False def load_config(self): # 先获取当前路径, 然后获取 tasks_config 文件夹里面的 json 文件 current_path = os.getcwd() config_path = os.path.join(current_path, 'tasks_config') if not os.path.exists(config_path): os.makedirs(config_path) # 构建配置文件目录下所有.json文件的路径模式 json_files_pattern = os.path.join(config_path, '*.json') # 使用glob模块查找所有匹配的.json文件 json_files = glob.glob(json_files_pattern) all_tasks = [] if not json_files: # 创建一个 json 文件, 名字叫 default.json default_json_path = os.path.join(config_path, 'default.json') # 定义默认的键值对 default_data = { "task_name": "任务名称", "description": "任务说明", "target_url": "目标地址", "target_selector": "目标css选择器", "result_value": "返回值,留空", "execution_time": "9999-12-31 23:59:59", "task_interval": 0, "completion_time": "完成时间,留空", "execution_times": 0, "logs": "日志,留空" } # 将默认数据写入 default.json 文件 with open(default_json_path, 'w', encoding='utf-8') as file: json.dump(default_data, file, indent=4, ensure_ascii=False) print(f"Created default.json at {default_json_path}") exit(0) else: for json_file in json_files: with open(json_file, 'r', encoding='utf-8') as file: data = json.load(file) # 需要检查一下 config 文件, 执行时间是否达到要求, 推送次数是否大于 0 if self.check_config(data): all_tasks.append([json_file, data]) print(f'Successfully read all tasks, {len(all_tasks)} task in total') return all_tasks def run(self, task): task_config_path = task[0] task_config_data = task[1] task_name = task_config_data['task_name'] target_url = task_config_data['target_url'] target_selector = task_config_data['target_selector'] print(f'开始执行任务 {task_name}') # 打开浏览器, 获取目标页面数据 browser_result_data = CryptoCrawler(url=target_url, selector=target_selector).main() # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1 # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟 if browser_result_data: self.check_result(task, browser_result_data) GotifyNotifier(title=task_name, message=browser_result_data).send_message() def main(self): all_tasks = self.load_config() result_config_data = [] for task in all_tasks: self.run(task) if __name__ == '__main__': run_tasks = RunTasks() run_tasks.main()