# -*- coding: utf-8 -*- ''' 获取36kr讯息数据, 通过rsshub获取数据, 可能需要使用代理 https://www.36kr.com/ ''' import datetime import json import random import re import xmltodict import time import httpx from tools_mongo_handle import MongoHandle from tools_logs_handle import LogsHandle from tools_send_email import SendEmail class Get36krInfo: def __init__(self): self.base_url = 'https://rsshub.app/36kr/' self.local_key = [ 'news', 'newsflashes', 'recommend', 'life', 'estate', 'workplace' ] self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.db = 'NEWS' self.collection = '36kr_info' self.send_email_datas = {} def req(self): result_data = [] local_key = self.local_key[:] random.shuffle(local_key) for key in local_key: url = self.base_url + key try: response = httpx.get(url) except TimeoutError as timeout_error: print(timeout_error) continue if response.status_code != 200: self.logs_handle.logs_write('36kr_info', '请求失败, 状态码: %s' % response.status_code, 'error', False) time.sleep(20) continue response.encoding = 'utf-8' html = response.text xml_dict = xmltodict.parse(html) source = '' items = [] try: source = xml_dict['rss']['channel']['title'] except Exception as e: print('获取 source 失败') try: items = xml_dict['rss']['channel']['item'] except Exception as e: print('获取 items 失败') for item in items: # 清洗 if item.get('description'): item['description'] = re.sub(r'<[^>]+>', '', item.get('description')) result_data.append({ "title": item.get('title') or '', "context": item.get('description') or '', "source_url": url, 'link': item.get('link') or '', "article_type": source, "article_source": key, "img_url": '', 'keyword': '', "posted_date": item.get('pubDate') or '', "create_time": int(time.time()), "create_datetime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) if len(local_key) != 1: sleep_time = random.uniform(10, 15) time.sleep(sleep_time) return result_data def save_to_mongo(self, result_data): new_datas = [] print(f'正在处理 {self.collection}数据') mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) for data_to_insert in result_data: try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) # 准备发送邮件的数据 new_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write(f'{self.collection}', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'处理 {self.collection} 数据完成') return new_datas def send_to_email(self, new_datas): title = self.collection subject = self.collection text = '********************************************************\n' for data in new_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['link']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' send_email = SendEmail(subject=subject, title=title, text=text) send_email.send() self.logs_handle.logs_write(f'self.collection', f'{title}-发送邮件完成', 'done', False) def main(self): self.logs_handle.logs_write('36kr - info', '任务开始', 'start', False) result_data = self.req() if result_data: new_datas = self.save_to_mongo(result_data) if new_datas: self.send_to_email(new_datas) else: print('无新数据') self.logs_handle.logs_write('36kr - info', '36kr - info 数据获取完成', 'done', False) print('done') else: self.logs_handle.logs_write('36kr - info', '无法获取 36kr - info 数据', 'error', False) if __name__ == '__main__': g = Get36krInfo() g.main()