from pymongo import MongoClient from datetime import datetime, timedelta import time import re import smtplib from email.mime.text import MIMEText from email.header import Header # 假设 tools_load_config 模块和相关函数已经正确实现 import tools_load_config config_json = tools_load_config.load_config() PROJECT_NAME = config_json.get('PROJECT_NAME') DB_USER = config_json.get('DB_USER') DB_PASSWORD = config_json.get('DB_PASSWORD') DB_IP = config_json.get('DB_IP') DB_PORT = config_json.get('DB_PORT') MAIL_HOST = config_json.get('MAIL_HOST') MAIL_USER = config_json.get('MAIL_USER') MAIL_PASS = config_json.get('MAIL_PASS') MAIL_SENDER = config_json.get('MAIL_SENDER') MAIL_RECEIVERS = config_json.get('MAIL_RECEIVERS') DB_NAME = config_json.get('DB_NAME') # 确保配置文件中有这个键 MONGO_LINK = f'mongodb://{DB_USER}:{DB_PASSWORD}@{DB_IP}:{DB_PORT}/'.format(**config_json) now_day = datetime.now().strftime('%Y-%m-%d') # 获取今天的日期 class NewsDataCollation(object): def __init__(self): # 第三方 SMTP 服务 self.mail_host = MAIL_HOST # 设置服务器 self.mail_user = MAIL_USER # 用户名 self.mail_pass = MAIL_PASS # 口令 self.sender = MAIL_SENDER self.receivers = [MAIL_RECEIVERS] self.processed_data = [] self.filter = 1 self.filter_key = [] self.days = 1 def load_data(self): client = MongoClient(MONGO_LINK) db = client['NEWS'] # 根据 self.days 获取日期范围 start_date = (datetime.now() - timedelta(days=self.days - 1)).strftime('%Y-%m-%d') end_date = datetime.now().strftime('%Y-%m-%d') # 构造查询条件,匹配日期范围内的日期 query = { "create_datetime": { "$regex": f"^{start_date}|{end_date}", "$options": "i" # 使用不区分大小写的匹配 } } # 遍历数据库中的所有集合 for collection_name in db.list_collection_names(): collection = db[collection_name] cursor = collection.find(query) for document in cursor: if not document.get('title'): continue data = self.process_data(document) if data: self.processed_data.append(data) # 关闭MongoDB连接 client.close() def process_data(self, document): data = { "title": document.get('title') or '', "context": document.get('context') or '', "source_url": document.get('source_url') or '', 'link': document.get('link') or '', "article_type": document.get('article_type') or '', "article_source": document.get('article_source') or '', "img_url": document.get('img_url') or '', 'keyword': document.get('keyword') or '', "posted_date": document.get('posted_date') or '', "create_time": document.get('create_time') or '', "create_datetime": document.get('create_datetime') or '' } # 过滤打开, 先过滤, 然后清理字符串 if self.filter and self.filter_key: for key in self.filter_key: if key in data['title'] or key in data['context']: data['title'] = self.clean_string(data['title']) data['context'] = self.clean_string(data['context']) else: return None else: # 过滤关闭, 直接清理字符串 data['title'] = self.clean_string(data['title']) data['context'] = self.clean_string(data['context']) return data def clean_string(self, input_string): if not isinstance(input_string, str): return '' # 清除换行符\n cleaned_string = re.sub(r'\n', '', input_string) # 清除制表符\t cleaned_string = re.sub(r'\t', '', cleaned_string) # 清除所有空白字符(包括空格、制表符、换行符等) cleaned_string = re.sub(r'\s+', '', cleaned_string) return cleaned_string def send_email(self): subject = '新闻汇总sub' title = '新闻汇总title' text = '********************************************************\n' for data in self.processed_data: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '类型: {}\n'.format(data['article_type']) text += '板块: {}\n'.format(data['article_source']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' message = MIMEText(text, 'plain', 'utf-8') message['From'] = Header(title, 'utf-8') message['To'] = Header("auto collection", 'utf-8') subject = subject message['Subject'] = Header(subject, 'utf-8') try: smtp_obj = smtplib.SMTP() smtp_obj.connect(self.mail_host, 25) smtp_obj.login(self.mail_user, self.mail_pass) smtp_obj.sendmail(self.sender, self.receivers, message.as_string()) print("邮件发送成功") except smtplib.SMTPException: print("Error: 无法发送邮件") def main(self): # 加载数据 self.load_data() if not self.processed_data: print("没有找到任何数据") return self.send_email() if __name__ == '__main__': NewsDataCollation().main()