| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- # -*- coding: utf-8 -*-
- import json
- import smtplib
- import time
- from datetime import datetime
- import os
- import sys
- from email.header import Header
- from email.mime.text import MIMEText
- import httpx
- import pymongo
- sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
- PROJECT_PATH = os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo')
- class LogsHandle(object):
- def __init__(self):
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- db = 'logs'
- collection = 'logs_' + self.now_day
- self.mongo = MongoHandle(db=db, collection=collection, del_db=False, del_collection=False, auto_remove=0)
- def logs_generate(self):
- data_to_insert = {
- "title": "logs",
- "context": 'generate logs',
- "state": "create",
- "create_time": int(time.time()),
- "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }
- self.mongo.collection.insert_one(data_to_insert)
- def logs_send(self):
- title = 'autoinfo - logs: {}'.format(self.now_day)
- text = ''
- # TODO
- # 从 mongodb 读取日志, 拼接 text, 发送邮件
- # 查询所有文档
- cursor = self.mongo.collection.find()
- # 遍历结果集
- for record in cursor:
- text += "logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}\n\n".format(
- record.setdefault('title'),
- record.setdefault('content'),
- record.setdefault('state'),
- record.setdefault('create_datetime'),
- )
- GotifyNotifier(title=title, message=text, token_name='logs').send_message()
- def logs_write(self, title_source=None, content=None, state=None, send_now=False):
- data_to_insert = {
- "title": title_source,
- "context": content,
- "state": state,
- "create_time": int(time.time()),
- "create_datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }
- self.mongo.collection.insert_one(data_to_insert)
- if send_now:
- title = 'Auto Info - running logs: {}'.format(self.now_day)
- text = 'logs_source: {}, logs_detail: {}, state: {} logs_create_time: {}'.format(
- data_to_insert.setdefault('title'),
- data_to_insert.setdefault('content'),
- data_to_insert.setdefault('state'),
- data_to_insert.setdefault('create_datetime'),
- )
- GotifyNotifier(title=title, message=text, token_name='logs').send_message()
- class MongoHandle(object):
- def __init__(self, db, collection, del_db=False, del_collection=False, auto_remove=0):
- config_json = LoadConfig().load_config()
- base_project = LoadConfig().get_base_path()
- db_user = config_json.get('DB_USER')
- db_password = config_json.get('DB_PASSWORD')
- db_ip = config_json.get('DB_IP')
- db_port = config_json.get('DB_PORT')
- mongo_link = f'mongodb://{db_user}:{db_password}@{db_ip}:{db_port}/'
- self.client = pymongo.MongoClient(mongo_link)
- self.db = db
- self.collection = collection
- if del_db and db:
- # 检查数据库是否存在
- if db in self.client.list_database_names():
- # 删除数据库
- self.client.drop_database(db)
- self.db = self.client[db]
- if del_collection and self.collection:
- # 检查集合是否存在
- if self.collection in self.db.list_collection_names():
- # 删除集合
- self.db.drop_collection(collection)
- self.collection = self.db[collection]
- if auto_remove:
- self.auto_remove_data(auto_remove)
- def write_data(self, data):
- self.collection.insert_one(data)
- def load_data(self):
- # MongoDB 会在第一次写入时自动创建数据库和集合
- return list(self.collection.find({}, {'_id': False}))
- def auto_remove_data(self, day):
- for data in self.collection.find({'create_time': {'$lt': int(time.time()) - day * 24 * 60 * 60}}):
- self.collection.delete_one({'_id': data['_id']})
- class SendEmail(object):
- def __init__(self, subject='AutoInfo subject', title='AutoInfo title', text='auto text') -> None:
- config_json = LoadConfig().load_config()
- mail_host = config_json.get('MAIL_HOST')
- mail_user = config_json.get('MAIL_USER')
- mail_pass = config_json.get('MAIL_PASS')
- mail_sender = config_json.get('MAIL_SENDER')
- mail_receivers = config_json.get('MAIL_RECEIVERS')
- # 第三方 SMTP 服务
- self.mail_host = mail_host # 设置服务器
- self.mail_user = mail_user # 用户名
- self.mail_pass = mail_pass # 口令
- self.sender = mail_sender
- self.receivers = [mail_receivers]
- self.subject = subject
- self.title = title
- self.text = text
- def send(self):
- message = MIMEText(self.text, 'plain', 'utf-8')
- message['From'] = Header(self.title, 'utf-8')
- message['To'] = Header("auto", 'utf-8')
- message['Subject'] = Header(self.subject, 'utf-8')
- try:
- smtpObj = smtplib.SMTP_SSL(self.mail_host)
- smtpObj.login(self.mail_user, self.mail_pass)
- smtpObj.sendmail(self.sender, self.receivers, message.as_string())
- print("邮件发送成功")
- except smtplib.SMTPException as e:
- print("Error: 无法发送邮件", e)
- class GotifyNotifier:
- def __init__(self, title, message, token_name=''):
- config_json = LoadConfig().load_config()
- self.gotify_url = config_json.get('GOTIFY_URL', 'https://gotify.erhe.top')
- self.app_token = self.match_token_name(token_name or 'test')
- self.title = title
- self.message = message
- def match_token_name(self, name):
- token_name_dict = {}
- # 读取项目根目录下的 gotify_config.json 文件
- gotify_config_path = os.path.join(str(PROJECT_PATH), 'gotify_config.json')
- with open(gotify_config_path, 'r') as f:
- token_name_dict = json.load(f)
- token = token_name_dict.get(name)
- if token:
- return token
- else:
- return token_name_dict['base']
- def send_message(self):
- # 发送POST请求
- with httpx.Client() as client:
- response = client.post(
- url=f"{self.gotify_url}/message?token={self.app_token}",
- headers={'Content-Type': 'application/json'},
- json={'title': self.title, 'message': self.message}
- )
- # 或者可以使用 curl
- # curl -k "https://gotify.erhe.top/message?token=A0Xg6ZE5946iBYg" -F "title=测试发送信息" -F "message=测试信息,测试发送" -F "priority=5"
- # 检查响应状态码
- if response.status_code == 200:
- print(self.title)
- print('Gotify Message sent successfully!')
- else:
- print('Failed to send message:', response.text)
- class LoadConfig:
- def load_config(self):
- try:
- config_path = os.path.join(PROJECT_PATH, 'config.json')
- config_json = {}
- with open(config_path, 'r') as f:
- config_json = json.load(f)
- if not config_json:
- print('No config file found')
- exit(0)
- except Exception as e:
- print(e)
- exit(0)
- return config_json
- def get_base_path(self):
- return os.path.join(os.getcwd().split('AutoInfo')[0], 'AutoInfo')
|