| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # -*- coding: utf-8 -*-
- '''
- Hello Github
- '''
- import os
- import sys
- sys.path.append(os.path.join(os.getcwd().split('auto_news_scheduler')[0], 'auto_news_scheduler'))
- import threading
- import time
- from datetime import datetime
- import httpx
- from tools_mongo_handle import MongoHandle
- from tools_logs_handle import LogsHandle
- from tools_send_email import SendEmail
- class HelloGithub(object):
- def __init__(self):
- self.logs_handle = LogsHandle()
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- self.base_url = 'https://api.hellogithub.com/v1/?sort_by=last&tid=&page={}'
- self.headers = {
- 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
- }
- self.db = 'NEWS'
- self.collection = 'HelloGithub_info'
- self.source_url = 'https://hellogithub.com/repository/'
- self.send_email_datas = []
- self.send_email_now = 0
- def main(self):
- self.logs_handle.logs_write('HelloGithub', '开始获取 HelloGithub 数据', 'start', False)
- targets = ['featured']
- response_datas = []
- for target in targets:
- response_data = self.req(target)
- response_datas += response_data
- if response_datas:
- self.save_to_mongo(response_datas)
- else:
- self.logs_handle.logs_write('HelloGithub', '获取 HelloGithub 数据失败', 'error', False)
- self.logs_handle.logs_write('HelloGithub', 'HelloGithub 数据获取完成', 'done', False)
- print('获取 HelloGithub 数据 done')
- if self.send_email_now:
- if self.send_email_datas:
- self.send_to_email()
- else:
- print('没有新数据, 不发送邮件')
- def req(self, target):
- print('开始获取 HelloGithub {} 数据'.format(target))
- response_data = []
- for i in range(1, 5):
- response = httpx.get(url='https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(target, i),
- headers=self.headers)
- if response.status_code != 200:
- print(
- '获取 HelloGithub {} 数据, 状态码: {}, 程序退出\n检查目标地址: https://api.hellogithub.com/v1/?sort_by={}&tid=&page={}'.format(
- target, response.status_code, target, i))
- self.logs_handle.logs_write('HelloGithub', '请求失败, 状态码: %s' % response.status_code, 'error',
- False)
- exit(0)
- json_data = response.json()
- for d in json_data.setdefault('data'):
- response_data.append({
- "title": d.setdefault('title', ''),
- "context": d.setdefault('summary', '') + ' --- ' + d.setdefault('description'),
- "source_url": 'https://hellogithub.com',
- 'link': self.source_url + d.setdefault('item_id'),
- "article_type": '',
- "article_source": target,
- "img_url": '',
- 'keyword': '',
- "posted_date": d.setdefault('updated_at'),
- "create_time": int(time.time()),
- "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- })
- if response_data:
- return response_data
- else:
- self.logs_handle.logs_write('HelloGithub', '获取数据失败', 'error', False)
- def save_to_mongo(self, data):
- print(f'开始储存 HelloGithub 数据')
- for data_to_insert in data:
- mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False,
- auto_remove=0)
- try:
- # 检查数据库中是否存在匹配的文档
- filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
- count = mongo.collection.count_documents(filter_criteria)
- if count == 0:
- # 如果没有找到匹配的文档,插入新文档
- result = mongo.collection.insert_one(data_to_insert)
- # 准备发送邮件的数据
- self.send_email_datas.append(data_to_insert)
- except TypeError as te:
- print('\n%s' % te)
- self.logs_handle.logs_write('HelloGithub', '写入数据库报错: %s' % te, 'error', False)
- return 0
- print(f'处理 HelloGithub 数据完成')
- def send_to_email(self):
- title = 'HelloGithub - info'
- subject = 'HelloGithub - info'
- text = '********************************************************\n'
- for data in self.send_email_datas:
- text += '标题: {}\n'.format(data['title'])
- text += '正文: {}\n'.format(data['context'])
- text += '文章地址: {}\n'.format(data['source_url'])
- text += '文章时间: {}\n'.format(data['posted_date'])
- text += '获取时间: {}\n'.format(data['create_datetime'])
- text += '********************************************************\n\n'
- send_email = SendEmail(subject=subject, title=title, text=text)
- send_email.send()
- self.logs_handle.logs_write('HelloGithub', f'{title}-发送邮件完成', 'done', False)
- # if __name__ == "__main__":
- # H = HelloGithub()
- # H.main()
|