main.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime, timedelta
  3. import psycopg2
  4. import re
  5. from base_playwright_browser import CryptoCrawler
  6. from base_send_gotify import GotifyNotifier
  7. class RunTasks():
  8. def __init__(self):
  9. self.db_params = {
  10. 'dbname': 'remind',
  11. 'user': 'psql',
  12. 'password': 'psql',
  13. # 'host': 'home.erhe.link',
  14. # 'port': 55701,
  15. 'host': '192.168.100.146',
  16. 'port': 5432,
  17. }
  18. def clean_string(self, input_string):
  19. # 移除HTML标签
  20. clean_string = re.sub(r'<.*?>', '', input_string)
  21. # 移除换行符和制表符
  22. clean_string = re.sub(r'[\n\t]', ' ', clean_string)
  23. # 移除多余的空格
  24. clean_string = re.sub(r'\s+', ' ', clean_string).strip()
  25. return clean_string
  26. def check_result(self, task, browser_result_data):
  27. task_config_data = task
  28. # 更新 result_value 字段
  29. task_config_data['result_value'] = browser_result_data
  30. # 减少 execution_times 字段
  31. task_config_data['execution_times'] -= 1
  32. if task_config_data['task_interval'] > 0:
  33. # 读取 execution_time 字段的值
  34. execution_time_dt = task_config_data['execution_time']
  35. # 计算新的 execution_time
  36. new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
  37. # 更新 execution_time 字段
  38. task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
  39. else:
  40. task_config_data['execution_times'] = 0
  41. # 更新最新完成时间
  42. task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  43. # 连接到数据库
  44. conn = psycopg2.connect(**self.db_params)
  45. cursor = conn.cursor()
  46. # 构建更新语句
  47. update_statement = """
  48. UPDATE config
  49. SET
  50. execution_times = %s,
  51. result_value = %s,
  52. execution_time = %s,
  53. completion_time = %s,
  54. logs = %s
  55. WHERE id = %s;
  56. """
  57. # 执行更新操作
  58. cursor.execute(update_statement, (
  59. task_config_data['execution_times'],
  60. task_config_data['result_value'],
  61. task_config_data['execution_time'],
  62. task_config_data['completion_time'],
  63. task_config_data['logs'],
  64. task_config_data['id']
  65. ))
  66. conn.commit()
  67. # 关闭连接
  68. cursor.close()
  69. conn.close()
  70. task_name = task_config_data['task_name']
  71. print(f'tasks {task_name} config updated successfully.')
  72. def check_config(self, data):
  73. # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0
  74. if data['execution_times'] != 0:
  75. execution_time_dt = data['execution_time']
  76. # 获取当前时间的datetime对象
  77. current_time_dt = datetime.now()
  78. # 将datetime对象转换为Unix时间戳(整数)
  79. execution_time = int(execution_time_dt.timestamp())
  80. current_time = int(current_time_dt.timestamp())
  81. # 比较配置的执行时间和当前时间
  82. if execution_time < current_time:
  83. print("Execution time is in the future.")
  84. return True
  85. else:
  86. print("Execution time has passed or is now.")
  87. return False
  88. else:
  89. return False
  90. def load_config(self):
  91. # 连接到数据库
  92. conn = psycopg2.connect(**self.db_params)
  93. cursor = conn.cursor()
  94. # 读取config表的所有数据
  95. cursor.execute("SELECT * FROM config")
  96. rows = cursor.fetchall()
  97. all_tasks = []
  98. # 打印查询结果
  99. for row in rows:
  100. if row[1] != 0:
  101. config_data = {
  102. "id": row[0],
  103. "execution_times": row[1],
  104. "task_name": row[2],
  105. "description": row[3],
  106. "target_url": row[4],
  107. "target_selector": row[5],
  108. "result_value": row[6],
  109. "execution_time": row[7],
  110. "task_interval": row[8],
  111. "completion_time": row[9],
  112. "logs": row[10]
  113. }
  114. if self.check_config(config_data):
  115. all_tasks.append(config_data)
  116. # 关闭连接
  117. cursor.close()
  118. conn.close()
  119. return all_tasks
  120. def run(self, task):
  121. task_name = task['task_name']
  122. target_url = task['target_url']
  123. target_selector = task['target_selector']
  124. print(f'task {task_name} start, target url: {target_url}')
  125. # 打开浏览器, 获取目标页面数据
  126. if target_url:
  127. result_data = CryptoCrawler(url=target_url, selector=target_selector).main()
  128. else:
  129. result_data = 'default'
  130. # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1
  131. # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval
  132. # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟
  133. if result_data:
  134. # 清理字符串
  135. browser_result_data = self.clean_string(result_data)
  136. self.check_result(task, browser_result_data)
  137. GotifyNotifier(title=task_name, message=browser_result_data).send_message()
  138. print(f'Task {task_name} message has been sent')
  139. def main(self):
  140. all_tasks = self.load_config()
  141. print(f'A total of {len(all_tasks)} task needs to be executed.')
  142. if not all_tasks:
  143. print(f'Program exit')
  144. exit(0)
  145. result_config_data = []
  146. for task in all_tasks:
  147. self.run(task)
  148. if __name__ == '__main__':
  149. run_tasks = RunTasks()
  150. run_tasks.main()