# -*- coding: utf-8 -*- from datetime import datetime, timedelta import psycopg2 import re from base_playwright_browser import CryptoCrawler from base_send_gotify import GotifyNotifier class RunTasks(): def __init__(self): self.db_params = { 'dbname': 'remind', 'user': 'psql', 'password': 'psql', # 'host': 'home.erhe.link', # 'port': 55701, 'host': '192.168.100.146', 'port': 5432, } def clean_string(self, input_string): # 移除HTML标签 clean_string = re.sub(r'<.*?>', '', input_string) # 移除换行符和制表符 clean_string = re.sub(r'[\n\t]', ' ', clean_string) # 移除多余的空格 clean_string = re.sub(r'\s+', ' ', clean_string).strip() return clean_string def check_result(self, task, browser_result_data): task_config_data = task # 更新 result_value 字段 task_config_data['result_value'] = browser_result_data # 减少 execution_times 字段 task_config_data['execution_times'] -= 1 if task_config_data['task_interval'] > 0: # 读取 execution_time 字段的值 execution_time_dt = task_config_data['execution_time'] # 计算新的 execution_time new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval']) # 更新 execution_time 字段 task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S') else: task_config_data['execution_times'] = 0 # 更新最新完成时间 task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 连接到数据库 conn = psycopg2.connect(**self.db_params) cursor = conn.cursor() # 构建更新语句 update_statement = """ UPDATE config SET execution_times = %s, result_value = %s, execution_time = %s, completion_time = %s, logs = %s WHERE id = %s; """ # 执行更新操作 cursor.execute(update_statement, ( task_config_data['execution_times'], task_config_data['result_value'], task_config_data['execution_time'], task_config_data['completion_time'], task_config_data['logs'], task_config_data['id'] )) conn.commit() # 关闭连接 cursor.close() conn.close() task_name = task_config_data['task_name'] print(f'tasks {task_name} config updated successfully.') def check_config(self, data): # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0 if data['execution_times'] != 0: execution_time_dt = data['execution_time'] # 获取当前时间的datetime对象 current_time_dt = datetime.now() # 将datetime对象转换为Unix时间戳(整数) execution_time = int(execution_time_dt.timestamp()) current_time = int(current_time_dt.timestamp()) # 比较配置的执行时间和当前时间 if execution_time < current_time: print("Execution time is in the future.") return True else: print("Execution time has passed or is now.") return False else: return False def load_config(self): # 连接到数据库 conn = psycopg2.connect(**self.db_params) cursor = conn.cursor() # 读取config表的所有数据 cursor.execute("SELECT * FROM config") rows = cursor.fetchall() all_tasks = [] # 打印查询结果 for row in rows: if row[1] != 0: config_data = { "id": row[0], "execution_times": row[1], "task_name": row[2], "description": row[3], "target_url": row[4], "target_selector": row[5], "result_value": row[6], "execution_time": row[7], "task_interval": row[8], "completion_time": row[9], "logs": row[10] } if self.check_config(config_data): all_tasks.append(config_data) # 关闭连接 cursor.close() conn.close() return all_tasks def run(self, task): task_name = task['task_name'] target_url = task['target_url'] target_selector = task['target_selector'] print(f'task {task_name} start, target url: {target_url}') # 打开浏览器, 获取目标页面数据 if target_url: result_data = CryptoCrawler(url=target_url, selector=target_selector).main() else: result_data = 'default' # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1 # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟 if result_data: # 清理字符串 browser_result_data = self.clean_string(result_data) self.check_result(task, browser_result_data) GotifyNotifier(title=task_name, message=browser_result_data).send_message() print(f'Task {task_name} message has been sent') def main(self): all_tasks = self.load_config() print(f'A total of {len(all_tasks)} task needs to be executed.') if not all_tasks: print(f'Program exit') exit(0) result_config_data = [] for task in all_tasks: self.run(task) if __name__ == '__main__': run_tasks = RunTasks() run_tasks.main()