| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- # -*- coding: utf-8 -*-
- from datetime import datetime, timedelta
- import psycopg2
- import re
- from base_playwright_browser import CryptoCrawler
- from base_send_gotify import GotifyNotifier
- class RunTasks():
- def __init__(self):
- self.db_params = {
- 'dbname': 'remind',
- 'user': 'psql',
- 'password': 'psql',
- # 'host': 'home.erhe.link',
- # 'port': 55701,
- 'host': '192.168.100.146',
- 'port': 5432,
- }
- def clean_string(self, input_string):
- # 移除HTML标签
- clean_string = re.sub(r'<.*?>', '', input_string)
- # 移除换行符和制表符
- clean_string = re.sub(r'[\n\t]', ' ', clean_string)
- # 移除多余的空格
- clean_string = re.sub(r'\s+', ' ', clean_string).strip()
- return clean_string
- def check_result(self, task, browser_result_data):
- task_config_data = task
- # 更新 result_value 字段
- task_config_data['result_value'] = browser_result_data
- # 减少 execution_times 字段
- task_config_data['execution_times'] -= 1
- if task_config_data['task_interval'] > 0:
- # 读取 execution_time 字段的值
- execution_time_dt = task_config_data['execution_time']
- # 计算新的 execution_time
- new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
- # 更新 execution_time 字段
- task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
- else:
- task_config_data['execution_times'] = 0
- # 更新最新完成时间
- task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 连接到数据库
- conn = psycopg2.connect(**self.db_params)
- cursor = conn.cursor()
- # 构建更新语句
- update_statement = """
- UPDATE config
- SET
- execution_times = %s,
- result_value = %s,
- execution_time = %s,
- completion_time = %s,
- logs = %s
- WHERE id = %s;
- """
- # 执行更新操作
- cursor.execute(update_statement, (
- task_config_data['execution_times'],
- task_config_data['result_value'],
- task_config_data['execution_time'],
- task_config_data['completion_time'],
- task_config_data['logs'],
- task_config_data['id']
- ))
- conn.commit()
- # 关闭连接
- cursor.close()
- conn.close()
- task_name = task_config_data['task_name']
- print(f'tasks {task_name} config updated successfully.')
- def check_config(self, data):
- # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0
- if data['execution_times'] != 0:
- execution_time_dt = data['execution_time']
- # 获取当前时间的datetime对象
- current_time_dt = datetime.now()
- # 将datetime对象转换为Unix时间戳(整数)
- execution_time = int(execution_time_dt.timestamp())
- current_time = int(current_time_dt.timestamp())
- # 比较配置的执行时间和当前时间
- if execution_time < current_time:
- print("Execution time is in the future.")
- return True
- else:
- print("Execution time has passed or is now.")
- return False
- else:
- return False
- def load_config(self):
- # 连接到数据库
- conn = psycopg2.connect(**self.db_params)
- cursor = conn.cursor()
- # 读取config表的所有数据
- cursor.execute("SELECT * FROM config")
- rows = cursor.fetchall()
- all_tasks = []
- # 打印查询结果
- for row in rows:
- if row[1] != 0:
- config_data = {
- "id": row[0],
- "execution_times": row[1],
- "task_name": row[2],
- "description": row[3],
- "target_url": row[4],
- "target_selector": row[5],
- "result_value": row[6],
- "execution_time": row[7],
- "task_interval": row[8],
- "completion_time": row[9],
- "logs": row[10]
- }
- if self.check_config(config_data):
- all_tasks.append(config_data)
- # 关闭连接
- cursor.close()
- conn.close()
- return all_tasks
- def run(self, task):
- task_name = task['task_name']
- target_url = task['target_url']
- target_selector = task['target_selector']
- print(f'task {task_name} start, target url: {target_url}')
- # 打开浏览器, 获取目标页面数据
- if target_url:
- result_data = CryptoCrawler(url=target_url, selector=target_selector).main()
- else:
- result_data = 'default'
- # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1
- # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval
- # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟
- if result_data:
- # 清理字符串
- browser_result_data = self.clean_string(result_data)
- self.check_result(task, browser_result_data)
- GotifyNotifier(title=task_name, message=browser_result_data).send_message()
- print(f'Task {task_name} message has been sent')
- def main(self):
- all_tasks = self.load_config()
- print(f'A total of {len(all_tasks)} task needs to be executed.')
- if not all_tasks:
- print(f'Program exit')
- exit(0)
- result_config_data = []
- for task in all_tasks:
- self.run(task)
- if __name__ == '__main__':
- run_tasks = RunTasks()
- run_tasks.main()
|