# -*- coding: utf-8 -*- ''' Hello Github ''' import os import sys sys.path.append(os.path.join(os.getcwd().split('auto_news_scheduler')[0], 'auto_news_scheduler')) import threading import time from datetime import datetime import httpx from tools_mongo_handle import MongoHandle from tools_logs_handle import LogsHandle from tools_send_email import SendEmail class HelloGithub(object): def __init__(self): self.logs_handle = LogsHandle() self.now_day = time.strftime('%Y-%m-%d', time.localtime()) self.base_url = 'https://api.hellogithub.com/v1/?sort_by=last&tid=&page={}' self.headers = { 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8' } self.db = 'NEWS' self.collection = 'HelloGithub_info' self.source_url = 'https://hellogithub.com/repository/' self.send_email_datas = [] self.send_email_now = 0 def main(self): self.logs_handle.logs_write('HelloGithub', '开始获取 HelloGithub 数据', 'start', False) targets = ['featured'] response_datas = [] for target in targets: response_data = self.req(target) response_datas += response_data if response_datas: self.save_to_mongo(response_datas) else: self.logs_handle.logs_write('HelloGithub', '获取 HelloGithub 数据失败', 'error', False) self.logs_handle.logs_write('HelloGithub', 'HelloGithub 数据获取完成', 'done', False) print('获取 HelloGithub 数据 done') if self.send_email_now: if self.send_email_datas: self.send_to_email() else: print('没有新数据, 不发送邮件') def req(self, target): print('开始获取 HelloGithub {} 数据'.format(target)) response_data = [] for i in range(1, 5): response = httpx.get(url='https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(target, i), headers=self.headers) if response.status_code != 200: print( '获取 HelloGithub {} 数据, 状态码: {}, 程序退出\n检查目标地址: https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format( target, response.status_code, target, i)) self.logs_handle.logs_write('HelloGithub', '请求失败, 状态码: %s' % response.status_code, 'error', False) exit(0) json_data = response.json() for d in json_data.setdefault('data'): response_data.append({ "title": d.setdefault('title', ''), "context": d.setdefault('summary', '') + ' --- ' + d.setdefault('description'), "source_url": 'https://hellogithub.com', 'link': self.source_url + d.setdefault('item_id'), "article_type": '', "article_source": target, "img_url": '', 'keyword': '', "posted_date": d.setdefault('updated_at'), "create_time": int(time.time()), "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) if response_data: return response_data else: self.logs_handle.logs_write('HelloGithub', '获取数据失败', 'error', False) def save_to_mongo(self, data): print(f'开始储存 HelloGithub 数据') for data_to_insert in data: mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0) try: # 检查数据库中是否存在匹配的文档 filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值 count = mongo.collection.count_documents(filter_criteria) if count == 0: # 如果没有找到匹配的文档,插入新文档 result = mongo.collection.insert_one(data_to_insert) # 准备发送邮件的数据 self.send_email_datas.append(data_to_insert) except TypeError as te: print('\n%s' % te) self.logs_handle.logs_write('HelloGithub', '写入数据库报错: %s' % te, 'error', False) return 0 print(f'处理 HelloGithub 数据完成') def send_to_email(self): title = 'HelloGithub - info' subject = 'HelloGithub - info' text = '********************************************************\n' for data in self.send_email_datas: text += '标题: {}\n'.format(data['title']) text += '正文: {}\n'.format(data['context']) text += '文章地址: {}\n'.format(data['source_url']) text += '文章时间: {}\n'.format(data['posted_date']) text += '获取时间: {}\n'.format(data['create_datetime']) text += '********************************************************\n\n' send_email = SendEmail(subject=subject, title=title, text=text) send_email.send() self.logs_handle.logs_write('HelloGithub', f'{title}-发送邮件完成', 'done', False) # if __name__ == "__main__": # H = HelloGithub() # H.main()