utils.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. from datetime import datetime
  5. import os
  6. import sys
  7. import httpx
  8. import pymongo
  9. sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
  10. PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
  11. from utils.utils_mongo_handle import MongoHandle
  12. from base.base_load_config import load_config, get_base_path
  13. config_json = load_config()
  14. base_project = get_base_path()
  15. DB_USER = config_json.get('DB_USER')
  16. DB_PASSWORD = config_json.get('DB_PASSWORD')
  17. DB_IP = config_json.get('DB_IP')
  18. DB_PORT = config_json.get('DB_PORT')
  19. MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'
  20. class LogsHandle(object):
  21. def __init__(self):
  22. self.now_day = time.strftime('%Y-%m-%d', time.localtime())
  23. db = 'logs'
  24. collection = 'logs_' + self.now_day
  25. self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
  26. def logs_generate(self):
  27. data_to_insert = {
  28. "title": "logs",
  29. "context": 'generate logs',
  30. "state": "create",
  31. "create_time": int(time.time()),
  32. "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  33. }
  34. self.mongo.collection.insert_one(data_to_insert)
  35. def logs_send(self):
  36. subject = 'auto collection logs'
  37. title = 'auto collection - logs: {}'.format(self.now_day)
  38. text = ''
  39. # TODO
  40. # 从 mongodb 读取日志, 拼接 text, 发送邮件
  41. # 查询所有文档
  42. cursor = self.mongo.collection.find()
  43. # 遍历结果集
  44. for record in cursor:
  45. text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format(
  46. record.setdefault('title'),
  47. record.setdefault('content'),
  48. record.setdefault('state'),
  49. record.setdefault('create_datetime'),
  50. )
  51. GotifyNotifier(title=title, message=text, token_name='logs').send_message()
  52. def logs_write(self, title_source=None, content=None, state=None, send_now=False):
  53. data_to_insert = {
  54. "title": title_source,
  55. "context": content,
  56. "state": state,
  57. "create_time": int(time.time()),
  58. "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  59. }
  60. self.mongo.collection.insert_one(data_to_insert)
  61. if send_now:
  62. title = 'Auto Info - running logs: {}'.format(self.now_day)
  63. text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format(
  64. data_to_insert.setdefault('title'),
  65. data_to_insert.setdefault('content'),
  66. data_to_insert.setdefault('state'),
  67. data_to_insert.setdefault('create_datetime'),
  68. )
  69. GotifyNotifier(title=title, message=text, token_name='logs').send_message()
  70. class MongoHandle(object):
  71. def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
  72. self.client = pymongo.MongoClient(MONGO_LINK)
  73. self.db = db
  74. self.collection = collection
  75. if del_db and db:
  76. # 检查数据库是否存在
  77. if db in self.client.list_database_names():
  78. # 删除数据库
  79. self.client.drop_database(db)
  80. self.db = self.client[db]
  81. if del_collection and self.collection:
  82. # 检查集合是否存在
  83. if self.collection in self.db.list_collection_names():
  84. # 删除集合
  85. self.db.drop_collection(collection)
  86. self.collection = self.db[collection]
  87. if auto_remove:
  88. self.auto_remove_data(auto_remove)
  89. def write_data(self, data):
  90. self.collection.insert_one(data)
  91. def load_data(self):
  92. # MongoDB 会在第一次写入时自动创建数据库和集合
  93. return list(self.collection.find({}, {'_id': False}))
  94. def auto_remove_data(self, day):
  95. for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
  96. self.collection.delete_one({'_id': data['_id']})
  97. class GotifyNotifier:
  98. def __init__(self, title, message, token_name=''):
  99. self.gotify_url = 'https://gotify.erhe.top'
  100. self.app_token = self.match_token_name(token_name)
  101. self.title = title
  102. self.message = message
  103. def match_token_name(self, name):
  104. token_name_dict = {}
  105. # 读取项目根目录下的 gotify_config.json 文件
  106. gotify_config_path = os.path.join(PROJECT_PATH, 'gotify_config.json')
  107. with open(gotify_config_path, 'r') as f:
  108. token_name_dict = json.load(f)
  109. token = token_name_dict.get(name)
  110. if token:
  111. return token
  112. else:
  113. return token_name_dict['base']
  114. def send_message(self):
  115. # 构建POST请求的headers
  116. headers = {
  117. 'Content-Type': 'application/json'
  118. }
  119. # 构建POST请求的body
  120. body = {
  121. 'title': self.title,
  122. 'message': self.message
  123. }
  124. # 发送POST请求
  125. with httpx.Client() as client:
  126. response = client.post(
  127. url=f"{self.gotify_url}/message?token={self.app_token}",
  128. headers=headers,
  129. json=body
  130. )
  131. # 或者可以使用 curl
  132. # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=假装有信息,测试发送" -F "priority=5"
  133. # 检查响应状态码
  134. if response.status_code == 200:
  135. print('Gotify Message sent successfully!')
  136. else:
  137. print('Failed to send message:', response.text)