utils_daily_task_scheduler.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. from datetime import datetime
  5. """
  6. TaskScheduler 类用于管理定时任务的执行时间。
  7. 该类通过在指定的文件中存储时间戳来记录任务的上次执行时间和下次执行时间。
  8. 它提供了两个主要方法:load_start_time 和 save_start_time。
  9. load_start_time 方法用于检查是否到达任务的执行时间,如果未到达则返回 0,否则返回上次执行时间。
  10. save_start_time 方法用于更新下次执行时间,默认为当前时间加上指定的时间间隔(默认为 24 小时)。
  11. Attributes:
  12. file_path (str): 存储时间戳的文件路径。
  13. time_span (int): 任务执行的时间间隔,默认为 86400 秒(24 小时)。
  14. Methods:
  15. load_start_time(): 检查是否到达任务的执行时间。
  16. save_start_time(): 更新下次执行时间。
  17. Example:
  18. >>> scheduler = TaskScheduler("task_time.txt")
  19. >>> last_start_time = scheduler.load_start_time()
  20. >>> if last_start_time != 0:
  21. >>> # 执行任务
  22. >>> scheduler.save_start_time()
  23. Note:
  24. 该类假设文件路径是有效的,并且有足够的权限进行读写操作。
  25. 如果文件不存在,将自动创建并写入当前时间戳。
  26. """
  27. class TaskScheduler:
  28. def __init__(self, file_path, time_span=86400):
  29. self.file_path = file_path
  30. self.time_span = time_span
  31. def load_start_time(self):
  32. timestamp_now = int(time.time())
  33. start_time = 0
  34. if not os.path.exists(self.file_path):
  35. with open(self.file_path, "w") as file:
  36. file.write(str(timestamp_now))
  37. print('创建执行时间文件 {}'.format(datetime.fromtimestamp(timestamp_now).strftime('%Y-%m-%d %H:%M:%S')))
  38. return 0
  39. else:
  40. with open(self.file_path, "r") as file:
  41. start_time = int(file.read())
  42. if start_time > timestamp_now:
  43. print('任务未到执行时间, 下次执行时间: {}'.format(datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')))
  44. return 0
  45. print('开始执行定时任务')
  46. return start_time
  47. def save_start_time(self):
  48. timestamp_now = int(time.time())
  49. date_now = datetime.fromtimestamp(timestamp_now).strftime('%Y-%m-%d %H:%M:%S')
  50. # 时间戳 +24 小时
  51. timestamp_now += self.time_span
  52. print('写入下次执行时间: {}'.format(datetime.fromtimestamp(timestamp_now).strftime('%Y-%m-%d %H:%M:%S')))
  53. with open(self.file_path, "w") as file:
  54. file.write(str(timestamp_now))