| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding: utf-8 -*-
- '''
- 获取36kr讯息数据, 通过rsshub获取数据, 可能需要使用代理
- https://www.36kr.com/
- '''
- import datetime
- import json
- import random
- import re
- import xmltodict
- import time
- import httpx
- from tools_mongo_handle import MongoHandle
- from tools_logs_handle import LogsHandle
- from tools_send_email import SendEmail
- class Get36krKey:
- def __init__(self):
- self.base_url = 'https://rsshub.app/36kr/search/articles/'
- self.local_key = [
- '数字币',
- # '测试网'
- ]
- self.logs_handle = LogsHandle()
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- self.db = 'NEWS'
- self.collection = '36kr_key'
- self.send_email_datas = {}
- self.send_email_now = 0
- def req(self):
- result_data = []
- local_key = self.local_key[:]
- random.shuffle(local_key)
- for key in local_key:
- url = self.base_url + key
- try:
- response = httpx.get(url)
- except TimeoutError as timeout_error:
- print(timeout_error)
- continue
- if response.status_code != 200:
- self.logs_handle.logs_write('36kr_key', '请求失败, 状态码: %s' % response.status_code, 'error', False)
- time.sleep(20)
- continue
- response.encoding = 'utf-8'
- html = response.text
- xml_dict = xmltodict.parse(html)
- source = ''
- items = []
- try:
- source = xml_dict['rss']['channel']['title']
- except Exception as e:
- print('获取 source 失败')
- try:
- items = xml_dict['rss']['channel']['item']
- except Exception as e:
- print('获取 items 失败')
- for item in items:
- # 清洗
- if item.get('description'):
- item['description'] = re.sub(r'<[^>]+>', '', item.get('description'))
- result_data.append({
- "title": item.get('title') or '',
- "context": item.get('description') or '',
- "source_url": url,
- 'link': item.get('link') or '',
- "article_type": source.replace(' ', ''),
- "article_source": '36kr-Search',
- "img_url": '',
- 'keyword': key,
- "posted_date": item.get('pubDate') or '',
- "create_time": int(time.time()),
- "create_datetime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- })
- if len(local_key) != 1:
- sleep_time = random.uniform(10, 15)
- time.sleep(sleep_time)
- return result_data
- def save_to_mongo(self, result_data):
- new_datas = []
- print(f'正在处理 {self.collection}数据')
- mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
- for data_to_insert in result_data:
- try:
- # 检查数据库中是否存在匹配的文档
- filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
- count = mongo.collection.count_documents(filter_criteria)
- if count == 0:
- # 如果没有找到匹配的文档,插入新文档
- result = mongo.collection.insert_one(data_to_insert)
- # 准备发送邮件的数据
- new_datas.append(data_to_insert)
- except TypeError as te:
- print('\n%s' % te)
- self.logs_handle.logs_write(f'{self.collection}', '写入数据库报错: %s' % te, 'error', False)
- return 0
- print(f'处理 {self.collection} 数据完成')
- return new_datas
- def send_to_email(self, new_datas):
- title = self.collection
- subject = self.collection
- text = '********************************************************\n'
- for data in new_datas:
- text += '标题: {}\n'.format(data['title'])
- text += '正文: {}\n'.format(data['context'])
- text += '文章地址: {}\n'.format(data['link'])
- text += '文章时间: {}\n'.format(data['posted_date'])
- text += '获取时间: {}\n'.format(data['create_datetime'])
- text += '********************************************************\n\n'
- send_email = SendEmail(subject=subject, title=title, text=text)
- send_email.send()
- self.logs_handle.logs_write(f'self.collection', f'{title}-发送邮件完成', 'done', False)
- def main(self):
- self.logs_handle.logs_write('36kr - key', '任务开始', 'start', False)
- result_data = self.req()
- if result_data:
- new_datas = self.save_to_mongo(result_data)
- if self.send_email_now:
- if new_datas:
- self.send_to_email(new_datas)
- else:
- print('无新数据')
- self.logs_handle.logs_write('36kr - key', '36kr - key 数据获取完成', 'done', False)
- print('done')
- else:
- self.logs_handle.logs_write('36kr - key', '无法获取 36kr - key 数据', 'error', False)
- # if __name__ == '__main__':
- # g = Get36krKey()
- # g.main()
|