|
|
@@ -1,21 +1,38 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
-import time
|
|
|
from datetime import datetime, timedelta
|
|
|
-import glob
|
|
|
-import os
|
|
|
-import json
|
|
|
+import psycopg2
|
|
|
+import re
|
|
|
from base_playwright_browser import CryptoCrawler
|
|
|
from base_send_gotify import GotifyNotifier
|
|
|
|
|
|
|
|
|
class RunTasks():
|
|
|
def __init__(self):
|
|
|
- pass
|
|
|
+ self.db_params = {
|
|
|
+ 'dbname': 'remind',
|
|
|
+ 'user': 'psql',
|
|
|
+ 'password': 'psql',
|
|
|
+ # 'host': 'home.erhe.link',
|
|
|
+ # 'port': 55701,
|
|
|
+ 'host': '192.168.100.146',
|
|
|
+ 'port': 5432,
|
|
|
+ }
|
|
|
+
|
|
|
+ def clean_string(self, input_string):
|
|
|
+ # 移除HTML标签
|
|
|
+ clean_string = re.sub(r'<.*?>', '', input_string)
|
|
|
+
|
|
|
+ # 移除换行符和制表符
|
|
|
+ clean_string = re.sub(r'[\n\t]', ' ', clean_string)
|
|
|
+
|
|
|
+ # 移除多余的空格
|
|
|
+ clean_string = re.sub(r'\s+', ' ', clean_string).strip()
|
|
|
+
|
|
|
+ return clean_string
|
|
|
|
|
|
def check_result(self, task, browser_result_data):
|
|
|
- task_config_path = task[0]
|
|
|
- task_config_data = task[1]
|
|
|
+ task_config_data = task
|
|
|
|
|
|
# 更新 result_value 字段
|
|
|
task_config_data['result_value'] = browser_result_data
|
|
|
@@ -23,36 +40,59 @@ class RunTasks():
|
|
|
# 减少 execution_times 字段
|
|
|
task_config_data['execution_times'] -= 1
|
|
|
|
|
|
- # 检查 task_interval 字段是否存在并且是正整数
|
|
|
- if 'task_interval' in task_config_data and isinstance(task_config_data['task_interval'], int) and \
|
|
|
- task_config_data['task_interval'] > 0:
|
|
|
-
|
|
|
- if task_config_data['task_interval'] != 0:
|
|
|
- # 读取 execution_time 字段的值
|
|
|
- execution_time_str = task_config_data['execution_time']
|
|
|
- execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
|
|
|
+ if task_config_data['task_interval'] > 0:
|
|
|
+ # 读取 execution_time 字段的值
|
|
|
+ execution_time_dt = task_config_data['execution_time']
|
|
|
|
|
|
- # 计算新的 execution_time
|
|
|
- new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
|
|
|
+ # 计算新的 execution_time
|
|
|
+ new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
|
|
|
|
|
|
- # 更新 execution_time 字段
|
|
|
- task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- else:
|
|
|
- task_config_data['execution_times'] = 0
|
|
|
-
|
|
|
- # 更新最新完成时间
|
|
|
- task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ # 更新 execution_time 字段
|
|
|
+ task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ else:
|
|
|
+ task_config_data['execution_times'] = 0
|
|
|
+
|
|
|
+ # 更新最新完成时间
|
|
|
+ task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+
|
|
|
+ # 连接到数据库
|
|
|
+ conn = psycopg2.connect(**self.db_params)
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ # 构建更新语句
|
|
|
+ update_statement = """
|
|
|
+ UPDATE config
|
|
|
+ SET
|
|
|
+ execution_times = %s,
|
|
|
+ result_value = %s,
|
|
|
+ execution_time = %s,
|
|
|
+ completion_time = %s,
|
|
|
+ logs = %s
|
|
|
+ WHERE id = %s;
|
|
|
+ """
|
|
|
+
|
|
|
+ # 执行更新操作
|
|
|
+ cursor.execute(update_statement, (
|
|
|
+ task_config_data['execution_times'],
|
|
|
+ task_config_data['result_value'],
|
|
|
+ task_config_data['execution_time'],
|
|
|
+ task_config_data['completion_time'],
|
|
|
+ task_config_data['logs'],
|
|
|
+ task_config_data['id']
|
|
|
+ ))
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 关闭连接
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
|
|
|
- # 保存更新后的配置数据
|
|
|
- with open(task_config_path, 'w', encoding='utf-8') as f:
|
|
|
- json.dump(task_config_data, f, indent=4, ensure_ascii=False)
|
|
|
+ task_name = task_config_data['task_name']
|
|
|
+ print(f'tasks {task_name} config updated successfully.')
|
|
|
|
|
|
def check_config(self, data):
|
|
|
# 检查配置文件,执行时间是否达到要求,推送次数是否不等于0
|
|
|
if data['execution_times'] != 0:
|
|
|
- # 将字符串格式的日期时间转换为datetime对象
|
|
|
- execution_time_str = data['execution_time']
|
|
|
- execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
|
|
|
+ execution_time_dt = data['execution_time']
|
|
|
|
|
|
# 获取当前时间的datetime对象
|
|
|
current_time_dt = datetime.now()
|
|
|
@@ -72,66 +112,47 @@ class RunTasks():
|
|
|
return False
|
|
|
|
|
|
def load_config(self):
|
|
|
- # 先获取当前路径, 然后获取 tasks_config 文件夹里面的 json 文件
|
|
|
- current_path = os.getcwd()
|
|
|
- config_path = os.path.join(current_path, 'tasks_config')
|
|
|
-
|
|
|
- if not os.path.exists(config_path):
|
|
|
- os.makedirs(config_path)
|
|
|
-
|
|
|
- # 构建配置文件目录下所有.json文件的路径模式
|
|
|
- json_files_pattern = os.path.join(config_path, '*.json')
|
|
|
+ # 连接到数据库
|
|
|
+ conn = psycopg2.connect(**self.db_params)
|
|
|
+ cursor = conn.cursor()
|
|
|
|
|
|
- # 使用glob模块查找所有匹配的.json文件
|
|
|
- json_files = glob.glob(json_files_pattern)
|
|
|
+ # 读取config表的所有数据
|
|
|
+ cursor.execute("SELECT * FROM config")
|
|
|
+ rows = cursor.fetchall()
|
|
|
|
|
|
all_tasks = []
|
|
|
|
|
|
- if not json_files:
|
|
|
- # 创建一个 json 文件, 名字叫 default.json
|
|
|
- default_json_path = os.path.join(config_path, 'default.json')
|
|
|
-
|
|
|
- # 定义默认的键值对
|
|
|
- default_data = {
|
|
|
- "task_name": "任务名称",
|
|
|
- "description": "任务说明",
|
|
|
- "target_url": "目标地址",
|
|
|
- "target_selector": "目标css选择器",
|
|
|
- "result_value": "返回值,留空",
|
|
|
- "execution_time": "9999-12-31 23:59:59",
|
|
|
- "task_interval": 0,
|
|
|
- "completion_time": "完成时间,留空",
|
|
|
- "execution_times": 0,
|
|
|
- "logs": "日志,留空"
|
|
|
- }
|
|
|
-
|
|
|
- # 将默认数据写入 default.json 文件
|
|
|
- with open(default_json_path, 'w', encoding='utf-8') as file:
|
|
|
- json.dump(default_data, file, indent=4, ensure_ascii=False)
|
|
|
-
|
|
|
- print(f"Created default.json at {default_json_path}")
|
|
|
- exit(0)
|
|
|
- else:
|
|
|
- for json_file in json_files:
|
|
|
- with open(json_file, 'r', encoding='utf-8') as file:
|
|
|
- data = json.load(file)
|
|
|
-
|
|
|
- # 需要检查一下 config 文件, 执行时间是否达到要求, 推送次数是否大于 0
|
|
|
- if self.check_config(data):
|
|
|
- all_tasks.append([json_file, data])
|
|
|
-
|
|
|
- print(f'Successfully read all tasks, {len(all_tasks)} task in total')
|
|
|
-
|
|
|
- return all_tasks
|
|
|
+ # 打印查询结果
|
|
|
+ for row in rows:
|
|
|
+ if row[1] != 0:
|
|
|
+ config_data = {
|
|
|
+ "id": row[0],
|
|
|
+ "execution_times": row[1],
|
|
|
+ "task_name": row[2],
|
|
|
+ "description": row[3],
|
|
|
+ "target_url": row[4],
|
|
|
+ "target_selector": row[5],
|
|
|
+ "result_value": row[6],
|
|
|
+ "execution_time": row[7],
|
|
|
+ "task_interval": row[8],
|
|
|
+ "completion_time": row[9],
|
|
|
+ "logs": row[10]
|
|
|
+ }
|
|
|
+ if self.check_config(config_data):
|
|
|
+ all_tasks.append(config_data)
|
|
|
+
|
|
|
+ # 关闭连接
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ return all_tasks
|
|
|
|
|
|
def run(self, task):
|
|
|
- task_config_path = task[0]
|
|
|
- task_config_data = task[1]
|
|
|
- task_name = task_config_data['task_name']
|
|
|
- target_url = task_config_data['target_url']
|
|
|
- target_selector = task_config_data['target_selector']
|
|
|
+ task_name = task['task_name']
|
|
|
+ target_url = task['target_url']
|
|
|
+ target_selector = task['target_selector']
|
|
|
|
|
|
- print(f'开始执行任务 {task_name}')
|
|
|
+ print(f'task {task_name} start, target url: {target_url}')
|
|
|
|
|
|
# 打开浏览器, 获取目标页面数据
|
|
|
browser_result_data = CryptoCrawler(url=target_url, selector=target_selector).main()
|
|
|
@@ -140,12 +161,22 @@ class RunTasks():
|
|
|
# 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval
|
|
|
# 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟
|
|
|
if browser_result_data:
|
|
|
+ # 清理字符串
|
|
|
+ browser_result_data = self.clean_string(browser_result_data)
|
|
|
self.check_result(task, browser_result_data)
|
|
|
GotifyNotifier(title=task_name, message=browser_result_data).send_message()
|
|
|
|
|
|
+ print(f'Task {task_name} message has been sent')
|
|
|
+
|
|
|
def main(self):
|
|
|
all_tasks = self.load_config()
|
|
|
|
|
|
+ print(f'A total of {len(all_tasks)} task needs to be executed.')
|
|
|
+
|
|
|
+ if not all_tasks:
|
|
|
+ print(f'Program exit')
|
|
|
+ exit(0)
|
|
|
+
|
|
|
result_config_data = []
|
|
|
for task in all_tasks:
|
|
|
self.run(task)
|