| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- # -*- coding: utf-8 -*-
- '''
- chiphell
- '''
- import os
- import random
- import sys
- import threading
- import re
- sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
- from utils.utils import *
- config_json = LoadConfig().load_config()
- DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
- class CHIPHELL(object):
- def __init__(self):
- self.logs_handle = LogsHandle()
- self.now_day = time.strftime('%Y-%m-%d', time.localtime())
- self.base_url = 'https://www.chiphell.com/'
- self.href_url = 'portal.php?mod=list&catid={}'
- self.db = 'NEWS'
- self.collection = 'chiphell_info'
- self.headers = {
- 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; Media Center PC 6.0; InfoPath.2; MS-RTC LM 8'
- }
- self.temp_datas = []
- def req(self, source, target):
- print(f'正在获取 {source} 数据')
- # sleep_time = random.uniform(10, 15)
- sleep_time = random.uniform(1, 2)
- print(f'睡眠 {sleep_time} 秒')
- time.sleep(sleep_time)
- result_list = []
- try:
- url = self.base_url + self.href_url.format(target)
- print(url)
- resp = httpx.get(url=url, headers=self.headers)
- except Exception as e:
- print(e)
- return 0
- if resp.status_code == 200:
- resp.encoding = 'utf-8'
- # print(resp.text)
- dl_list = re.findall('<dt class="xs2">([\S\s]*?)</dl>', resp.text)
- for dl in dl_list:
- if dl:
- url_list = re.findall('<a href="(.*?)" target="_blank" ', dl)
- title_list = re.findall('class="xi2" style="">(.*?)</a> </dt>', dl)
- img_url_list = re.findall('target="_blank"><img src="(.*?)"', dl)
- context_list = re.findall('class="tn" /></a></div>([\S\s]*?)</dd>', dl)
- post_time_list = re.findall('<span class="xg1"> (.*?)</span>', dl)
- for url, title, img_url, context, post_time in zip(url_list, title_list, img_url_list, context_list,
- post_time_list):
- # 清理正文内容的空格和换行等字符
- if context:
- for i in [' ', '\n']:
- context = context.replace(i, '')
- context = context.replace('\r', ' ')
- result_list.append({
- "title": title,
- "context": context,
- "source_url": self.base_url + url,
- 'link': '',
- "article_type": source.split(' - ')[1],
- "article_source": source.split(' - ')[0],
- "img_url": img_url,
- 'keyword': '',
- "posted_date": post_time,
- "create_time": int(time.time()),
- "create_datetime": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
- "repush_times": DEFAULT_RE_PUSH_TIMES
- })
- else:
- print(resp.status_code)
- return 0
- return result_list
- def save_to_mongo(self, collection, source_data):
- print(f'正在处理 {self.collection} 数据')
- mongo = MongoHandle(db=self.db, collection=self.collection, del_db=False, del_collection=False, auto_remove=0)
- for data_to_insert in source_data:
- try:
- # 检查数据库中是否存在匹配的文档
- filter_criteria = {'title': data_to_insert.get('title', '')} # 确保 title 字段有值
- count = mongo.collection.count_documents(filter_criteria)
- if count == 0:
- # 如果没有找到匹配的文档,插入新文档
- result = mongo.collection.insert_one(data_to_insert)
- # 准备发送邮件的数据
- self.temp_datas.append(data_to_insert)
- except TypeError as te:
- print('\n%s' % te)
- self.logs_handle.logs_write('chiphell', '写入数据库报错: %s' % te, 'error', False)
- return 0
- print(f'处理 chiphell - {collection}数据完成')
- def send_to_email(self):
- text = '********************************************************\n'
- for data in self.temp_datas:
- text += '标题: {}\n'.format(data['title'])
- text += '正文: {}\n'.format(data['context'])
- text += '板块: {}\n'.format(data['article_source'])
- text += '类型: {}\n'.format(data['article_type'])
- text += '文章地址: {}\n'.format(data['source_url'])
- text += '文章时间: {}\n'.format(data['posted_date'])
- text += '获取时间: {}\n'.format(data['create_datetime'])
- text += '********************************************************\n\n'
- title = 'chiphell - info - ' + str(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- sub = 'chiphell - info'
- SendEmail(subject=sub, title=title, text=text).send()
- # GotifyNotifier(title=title, message=text, token_name='news').send_message()
- self.logs_handle.logs_write('chiphell', f'{title}-发送邮件完成', 'done', False)
- def main(self):
- category = {
- '评测': {
- '笔记本': '19',
- '机箱': '11',
- # '处理器': '13',
- # '散热器': '14',
- # '主板': '15',
- # '内存': '137',
- # '外设': '18',
- # '电源': '35',
- '存储': '23',
- '显示设备': '21',
- # '台式机': '88',
- '显卡': '10',
- # '相机': '116'
- },
- '电脑': {
- '配件开箱': '98',
- '整机搭建': '99',
- '桌面书房': '101'
- },
- '掌设': {
- '智能手机': '40',
- '智能穿戴': '89',
- '笔电平板': '41',
- # '周边附件': '92'
- },
- # '摄影': {
- # '微单卡片': '52',
- # '单反单电': '51',
- # '经典旁轴': '53',
- # '怀旧菲林': '54',
- # '影音摄像': '57',
- # '周边附件': '55'
- # },
- # '汽车': {
- # '买菜车': '58',
- # '商务车': '59',
- # '性能车': '63',
- # '旅行车': '60',
- # 'SUV': '61',
- # 'MPV': '95',
- # '摩托轻骑': '65',
- # '改装配件': '96'
- # },
- # '单车': {
- # '山地车': '108',
- # '公路车': '109',
- # '折叠车': '110',
- # '休旅车': '111'
- # },
- # '腕表': {
- # '机械表': '128',
- # '电子表': '126'
- # },
- '视听': {
- '耳机耳放': '71',
- '音箱功放': '72',
- # '解码转盘': '73',
- '随身设备': '74'
- },
- '美食': {
- '当地美食': '68',
- '世界美食': '117',
- '私房菜品': '69',
- '美食器材': '70'
- },
- # '家居': {
- # '家居': '132'
- # },
- }
- response_datas = {}
- for source1, tags in category.items():
- # source1作为表名, 先放到response_datas里面
- if source1 not in response_datas:
- response_datas[source1] = []
- for source2, target in tags.items():
- source = source1 + ' - ' + source2
- response_data = self.req(source, target)
- if response_data != 0:
- response_datas[source1] += response_data
- if response_datas:
- threads = []
- for k, v in response_datas.items():
- thread = threading.Thread(target=self.save_to_mongo, args=(k, v,))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- if self.temp_datas:
- self.send_to_email()
- return None
- else:
- self.logs_handle.logs_write('chiphell - info', '获取数据为空', 'error', False)
- return False
- if __name__ == '__main__':
- CHIPHELL().main()
|