utils.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import smtplib
  4. import time
  5. from datetime import datetime
  6. import os
  7. import sys
  8. from email.header import Header
  9. from email.mime.text import MIMEText
  10. import httpx
  11. import pymongo
  12. sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
  13. PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
  14. class LogsHandle(object):
  15. def __init__(self):
  16. self.now_day = time.strftime('%Y-%m-%d', time.localtime())
  17. db = 'logs'
  18. collection = 'logs_' + self.now_day
  19. self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
  20. def logs_generate(self):
  21. data_to_insert = {
  22. "title": "logs",
  23. "context": 'generate logs',
  24. "state": "create",
  25. "create_time": int(time.time()),
  26. "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  27. }
  28. self.mongo.collection.insert_one(data_to_insert)
  29. def logs_send(self):
  30. title = 'autoinfo - logs: {}'.format(self.now_day)
  31. text = ''
  32. # TODO
  33. # 从 mongodb 读取日志, 拼接 text, 发送邮件
  34. # 查询所有文档
  35. cursor = self.mongo.collection.find()
  36. # 遍历结果集
  37. for record in cursor:
  38. text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format(
  39. record.setdefault('title'),
  40. record.setdefault('content'),
  41. record.setdefault('state'),
  42. record.setdefault('create_datetime'),
  43. )
  44. GotifyNotifier(title=title, message=text, token_name='logs').send_message()
  45. def logs_write(self, title_source=None, content=None, state=None, send_now=False):
  46. data_to_insert = {
  47. "title": title_source,
  48. "context": content,
  49. "state": state,
  50. "create_time": int(time.time()),
  51. "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  52. }
  53. self.mongo.collection.insert_one(data_to_insert)
  54. if send_now:
  55. title = 'Auto Info - running logs: {}'.format(self.now_day)
  56. text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format(
  57. data_to_insert.setdefault('title'),
  58. data_to_insert.setdefault('content'),
  59. data_to_insert.setdefault('state'),
  60. data_to_insert.setdefault('create_datetime'),
  61. )
  62. GotifyNotifier(title=title, message=text, token_name='logs').send_message()
  63. class MongoHandle(object):
  64. def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
  65. config_json = LoadConfig().load_config()
  66. base_project = LoadConfig().get_base_path()
  67. db_user = config_json.get('DB_USER')
  68. db_password = config_json.get('DB_PASSWORD')
  69. db_ip = config_json.get('DB_IP')
  70. db_port = config_json.get('DB_PORT')
  71. mongo_link = f'mongodb://{db_user}:{db_password}@{db_ip}:{db_port}/'
  72. self.client = pymongo.MongoClient(mongo_link)
  73. self.db = db
  74. self.collection = collection
  75. if del_db and db:
  76. # 检查数据库是否存在
  77. if db in self.client.list_database_names():
  78. # 删除数据库
  79. self.client.drop_database(db)
  80. self.db = self.client[db]
  81. if del_collection and self.collection:
  82. # 检查集合是否存在
  83. if self.collection in self.db.list_collection_names():
  84. # 删除集合
  85. self.db.drop_collection(collection)
  86. self.collection = self.db[collection]
  87. if auto_remove:
  88. self.auto_remove_data(auto_remove)
  89. def write_data(self, data):
  90. self.collection.insert_one(data)
  91. def load_data(self):
  92. # MongoDB 会在第一次写入时自动创建数据库和集合
  93. return list(self.collection.find({}, {'_id': False}))
  94. def auto_remove_data(self, day):
  95. for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
  96. self.collection.delete_one({'_id': data['_id']})
  97. class SendEmail(object):
  98. def __init__(self, subject='AutoInfo subject', title='AutoInfo title', text='auto text') -> None:
  99. config_json = LoadConfig().load_config()
  100. mail_host = config_json.get('MAIL_HOST')
  101. mail_user = config_json.get('MAIL_USER')
  102. mail_pass = config_json.get('MAIL_PASS')
  103. mail_sender = config_json.get('MAIL_SENDER')
  104. mail_receivers = config_json.get('MAIL_RECEIVERS')
  105. # 第三方 SMTP 服务
  106. self.mail_host = mail_host # 设置服务器
  107. self.mail_user = mail_user # 用户名
  108. self.mail_pass = mail_pass # 口令
  109. self.sender = mail_sender
  110. self.receivers = [mail_receivers]
  111. self.subject = subject
  112. self.title = title
  113. self.text = text
  114. def send(self):
  115. message = MIMEText(self.text, 'plain', 'utf-8')
  116. message['From'] = Header(self.title, 'utf-8')
  117. message['To'] = Header("auto", 'utf-8')
  118. message['Subject'] = Header(self.subject, 'utf-8')
  119. try:
  120. smtpObj = smtplib.SMTP_SSL(self.mail_host)
  121. smtpObj.login(self.mail_user, self.mail_pass)
  122. smtpObj.sendmail(self.sender, self.receivers, message.as_string())
  123. print("邮件发送成功")
  124. except smtplib.SMTPException as e:
  125. print("Error: 无法发送邮件", e)
  126. class GotifyNotifier:
  127. def __init__(self, title, message, token_name=''):
  128. config_json = LoadConfig().load_config()
  129. self.gotify_url = config_json.get('GOTIFY_URL', 'https://gotify.erhe.top')
  130. self.app_token = self.match_token_name(token_name or 'test')
  131. self.title = title
  132. self.message = message
  133. def match_token_name(self, name):
  134. token_name_dict = {}
  135. # 读取项目根目录下的 gotify_config.json 文件
  136. gotify_config_path = os.path.join(str(PROJECT_PATH), 'gotify_config.json')
  137. with open(gotify_config_path, 'r') as f:
  138. token_name_dict = json.load(f)
  139. token = token_name_dict.get(name)
  140. if token:
  141. return token
  142. else:
  143. return token_name_dict['base']
  144. def send_message(self):
  145. # 发送POST请求
  146. with httpx.Client() as client:
  147. response = client.post(
  148. url=f"{self.gotify_url}/message?token={self.app_token}",
  149. headers={'Content-Type': 'application/json'},
  150. json={'title': self.title, 'message': self.message}
  151. )
  152. # 或者可以使用 curl
  153. # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=测试信息,测试发送" -F "priority=5"
  154. # 检查响应状态码
  155. if response.status_code == 200:
  156. print(self.title)
  157. print('Gotify Message sent successfully!')
  158. else:
  159. print('Failed to send message:', response.text)
  160. class LoadConfig:
  161. def load_config(self):
  162. try:
  163. config_path = os.path.join(PROJECT_PATH, 'config.json')
  164. config_json = {}
  165. with open(config_path, 'r') as f:
  166. config_json = json.load(f)
  167. if not config_json:
  168. print('No config file found')
  169. exit(0)
  170. except Exception as e:
  171. print(e)
  172. exit(0)
  173. return config_json
  174. def get_base_path(self):
  175. return os.path.join(os.getcwd().split('AutoInfo')[0], 'AutoInfo')