main.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- coding: utf-8 -*-
  2. import time
  3. from datetime import datetime, timedelta
  4. import glob
  5. import os
  6. import json
  7. from base_playwright_browser import CryptoCrawler
  8. from base_send_gotify import GotifyNotifier
  9. class RunTasks():
  10. def __init__(self):
  11. pass
  12. def check_result(self, task, browser_result_data):
  13. task_config_path = task[0]
  14. task_config_data = task[1]
  15. # 更新 result_value 字段
  16. task_config_data['result_value'] = browser_result_data
  17. # 减少 execution_times 字段
  18. task_config_data['execution_times'] -= 1
  19. # 检查 task_interval 字段是否存在并且是正整数
  20. if 'task_interval' in task_config_data and isinstance(task_config_data['task_interval'], int) and \
  21. task_config_data['task_interval'] > 0:
  22. if task_config_data['task_interval'] != 0:
  23. # 读取 execution_time 字段的值
  24. execution_time_str = task_config_data['execution_time']
  25. execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
  26. # 计算新的 execution_time
  27. new_execution_time = execution_time_dt + timedelta(minutes=task_config_data['task_interval'])
  28. # 更新 execution_time 字段
  29. task_config_data['execution_time'] = new_execution_time.strftime('%Y-%m-%d %H:%M:%S')
  30. else:
  31. task_config_data['execution_times'] = 0
  32. # 更新最新完成时间
  33. task_config_data['completion_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  34. # 保存更新后的配置数据
  35. with open(task_config_path, 'w', encoding='utf-8') as f:
  36. json.dump(task_config_data, f, indent=4, ensure_ascii=False)
  37. def check_config(self, data):
  38. # 检查配置文件,执行时间是否达到要求,推送次数是否不等于0
  39. if data['execution_times'] != 0:
  40. # 将字符串格式的日期时间转换为datetime对象
  41. execution_time_str = data['execution_time']
  42. execution_time_dt = datetime.strptime(execution_time_str, '%Y-%m-%d %H:%M:%S')
  43. # 获取当前时间的datetime对象
  44. current_time_dt = datetime.now()
  45. # 将datetime对象转换为Unix时间戳(整数)
  46. execution_time = int(execution_time_dt.timestamp())
  47. current_time = int(current_time_dt.timestamp())
  48. # 比较配置的执行时间和当前时间
  49. if execution_time < current_time:
  50. print("Execution time is in the future.")
  51. return True
  52. else:
  53. print("Execution time has passed or is now.")
  54. return False
  55. else:
  56. return False
  57. def load_config(self):
  58. # 先获取当前路径, 然后获取 tasks_config 文件夹里面的 json 文件
  59. current_path = os.getcwd()
  60. config_path = os.path.join(current_path, 'tasks_config')
  61. if not os.path.exists(config_path):
  62. os.makedirs(config_path)
  63. # 构建配置文件目录下所有.json文件的路径模式
  64. json_files_pattern = os.path.join(config_path, '*.json')
  65. # 使用glob模块查找所有匹配的.json文件
  66. json_files = glob.glob(json_files_pattern)
  67. all_tasks = []
  68. if not json_files:
  69. # 创建一个 json 文件, 名字叫 default.json
  70. default_json_path = os.path.join(config_path, 'default.json')
  71. # 定义默认的键值对
  72. default_data = {
  73. "task_name": "任务名称",
  74. "description": "任务说明",
  75. "target_url": "目标地址",
  76. "target_selector": "目标css选择器",
  77. "result_value": "返回值,留空",
  78. "execution_time": "9999-12-31 23:59:59",
  79. "task_interval": 0,
  80. "completion_time": "完成时间,留空",
  81. "execution_times": 0,
  82. "logs": "日志,留空"
  83. }
  84. # 将默认数据写入 default.json 文件
  85. with open(default_json_path, 'w', encoding='utf-8') as file:
  86. json.dump(default_data, file, indent=4, ensure_ascii=False)
  87. print(f"Created default.json at {default_json_path}")
  88. exit(0)
  89. else:
  90. for json_file in json_files:
  91. with open(json_file, 'r', encoding='utf-8') as file:
  92. data = json.load(file)
  93. # 需要检查一下 config 文件, 执行时间是否达到要求, 推送次数是否大于 0
  94. if self.check_config(data):
  95. all_tasks.append([json_file, data])
  96. print(f'Successfully read all tasks, {len(all_tasks)} task in total')
  97. return all_tasks
  98. def run(self, task):
  99. task_config_path = task[0]
  100. task_config_data = task[1]
  101. task_name = task_config_data['task_name']
  102. target_url = task_config_data['target_url']
  103. target_selector = task_config_data['target_selector']
  104. print(f'开始执行任务 {task_name}')
  105. # 打开浏览器, 获取目标页面数据
  106. browser_result_data = CryptoCrawler(url=target_url, selector=target_selector).main()
  107. # 1, 如果有数据, 则更新 task_config_data 的 result_value 字段, 并且 execution_times 字段减 1
  108. # 2, 如果 task_interval 这个字段是正整数, 并且大于 0, 则读取 execution_time 字段的值, 然后加上 task_interval
  109. # 注: execution_time 格式 为 1970-01-01 00:00:00, task_interval 为分钟
  110. if browser_result_data:
  111. self.check_result(task, browser_result_data)
  112. GotifyNotifier(title=task_name, message=browser_result_data).send_message()
  113. def main(self):
  114. all_tasks = self.load_config()
  115. result_config_data = []
  116. for task in all_tasks:
  117. self.run(task)
  118. if __name__ == '__main__':
  119. run_tasks = RunTasks()
  120. run_tasks.main()