| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- # -*- coding: utf-8 -*-
- '''
- 爬取多个 web 新闻网站
- 存 mongo, 但只检索是否已发送过消息
- '''
- import os
- import sys
- import threading
- import time
- import httpx
- sys.path.append(os.path.join(os.path.abspath(__file__).split('AutoInfo')[0] + 'AutoInfo'))
- from html import unescape
- from datetime import datetime
- import re
- from utils.utils_mongo_handle import MongoHandle
- from base.base_load_config import load_config
- config_json = load_config()
- DEFAULT_RE_PUSH_TIMES = config_json['DEFAULT_RE_PUSH_TIMES']
- class MessageSearchKey(object):
- def __init__(self):
- db_name = 'NEWS'
- collection_name = 'web3_news'
- self.mongo = MongoHandle(db=db_name, collection=collection_name, del_db=False, del_collection=False,
- auto_remove=0)
- self.headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
- "Accept-Language": "en-US,en;q=0.5",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Content-Type": "application/json"
- }
- def techflow(self):
- # 深潮TechFlow url: https://www.163.com/dy/media/T1561634363944.html
- tag_title = '深潮TechFlow'
- data_list = []
- target = ['https://www.163.com/dy/media/T1561634363944.html']
- for url in target:
- print('前往 url: {}'.format(url))
- resp = httpx.get(url, headers=self.headers, timeout=10)
- if resp.status_code != 200:
- print('深潮TechFlow - 获取数据失败, 状态码: {}'.format(resp.status_code))
- return False
- resp.encoding = 'utf-8'
- html = resp.text
- context_urls = re.findall('<a href="(.*?)" class="title">', html)
- title_list = re.findall('class="title">(.*?)</a>', html)
- posted_time_list = re.findall('<span class="time">(.*?)</span>', html)
- for title, context_url, posted_time in zip(title_list, context_urls, posted_time_list):
- data = {
- 'title': title,
- 'context': title,
- 'source_url': url,
- 'link': context_url,
- 'article_type': tag_title,
- 'article_source': tag_title,
- 'img_url': '',
- 'keyword': '',
- 'posted_date': posted_time,
- 'create_time': int(time.time()),
- 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'repush_times': DEFAULT_RE_PUSH_TIMES
- }
- filter_criteria = {'title': data['title']}
- count = self.mongo.collection.count_documents(filter_criteria)
- if count == 0:
- result = self.mongo.collection.insert_one(data)
- def panewslab(self):
- tag_title = 'panewslab'
- base_url = 'https://www.panewslab.com'
- # ------------------------------------------------------------------------------------------------------------
- try:
- url = 'https://www.panewslab.com/webapi/index/list?Rn=20&LId=1&LastTime=1724891115&TagId=&tw=0'
- print('前往 url: {}'.format(url))
- resp = httpx.get(url, headers=self.headers, timeout=10)
- if resp.status_code != 200:
- print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
- return False
- resp.encoding = 'utf-8'
- resp_json = resp.json()
- for resp_data in resp_json['data']:
- try:
- data = {
- 'title': resp_data['share']['title'],
- 'context': resp_data['desc'],
- 'source_url': url,
- 'link': resp_data['share']['url'],
- 'article_type': tag_title,
- 'article_source': tag_title,
- 'img_url': '',
- 'keyword': '',
- 'posted_date': datetime.utcfromtimestamp(int(resp_data['publishTime'])).strftime(
- '%Y-%m-%d %H:%M:%S'),
- 'create_time': int(time.time()),
- 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'repush_times': DEFAULT_RE_PUSH_TIMES
- }
- filter_criteria = {'title': data['title']}
- count = self.mongo.collection.count_documents(filter_criteria)
- if count == 0:
- result = self.mongo.collection.insert_one(data)
- except Exception as e:
- print(f'{tag_title}: 数据取值失败, {e}')
- continue
- except Exception as e:
- print(f'{tag_title}: 数据取值失败, {e}')
- # -------------------------------------------------------------------------------------------------------------
- url = 'https://www.panewslab.com/zh/profundity/index.html'
- print('前往 url: {}'.format(url))
- resp = httpx.get(url, headers=self.headers, timeout=10)
- if resp.status_code != 200:
- print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
- return False
- resp.encoding = 'utf-8'
- html = resp.text
- context_urls = re.findall('<div class="list-left" data-v-559b28aa><a href="(.*?)" target="_blank"', html)
- title_list = re.findall('target="_blank" class="n-title" data-v-559b28aa>(.*?)</a>', html)
- context_list = re.findall('<p class="description" data-v-559b28aa>(.*?)</p>', html)
- for title, context, context_url in zip(title_list, context_list, context_urls):
- data = {
- 'title': title,
- 'context': context,
- 'source_url': url,
- 'link': base_url + context_url,
- 'article_type': tag_title,
- 'article_source': tag_title,
- 'img_url': '',
- 'keyword': '',
- 'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'create_time': int(time.time()),
- 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'repush_times': DEFAULT_RE_PUSH_TIMES
- }
- filter_criteria = {'title': data['title']}
- count = self.mongo.collection.count_documents(filter_criteria)
- if count == 0:
- result = self.mongo.collection.insert_one(data)
- # -------------------------------------------------------------------------------------------------------------
- url = 'https://www.panewslab.com/zh/news/index.html'
- print('前往 url: {}'.format(url))
- resp = httpx.get(url, headers=self.headers, timeout=10)
- if resp.status_code != 200:
- print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
- return False
- resp.encoding = 'utf-8'
- html = resp.text
- context_urls = re.findall('class="content" data-v-3376a1f2><a href="(.*?)" target="_blank"', html)
- title_list = re.findall('target="_blank" class="n-title" data-v-3376a1f2>(.*?)</a>', html)
- context_list = re.findall('</a> <p data-v-3376a1f2>(.*?)</p>', html)
- for title, context, context_url in zip(title_list, context_list, context_urls):
- data = {
- 'title': title,
- 'context': context,
- 'source_url': url,
- 'link': base_url + context_url,
- 'article_type': tag_title,
- 'article_source': tag_title,
- 'img_url': '',
- 'keyword': '',
- 'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'create_time': int(time.time()),
- 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'repush_times': DEFAULT_RE_PUSH_TIMES
- }
- filter_criteria = {'title': data['title']}
- count = self.mongo.collection.count_documents(filter_criteria)
- if count == 0:
- result = self.mongo.collection.insert_one(data)
- def foresightnews(self):
- # 获取 foresightnews 新闻数据
- tag_title = 'foresightnews'
- base_url = 'https://foresightnews.pro/'
- # -------------------------------------------------------------------------------------------------------------
- url = 'https://foresightnews.pro/'
- print('前往 url: {}'.format(url))
- resp = httpx.get(url, headers=self.headers, timeout=10)
- if resp.status_code != 200:
- print('{} - 获取数据失败, 状态码: {}'.format(tag_title, resp.status_code))
- return False
- resp.encoding = 'utf-8'
- html = resp.text
- html = unescape(html)
- context_urls = re.findall('</div></div></div></a><a href="(.*?)" target="_blank"', html)
- title_list = re.findall('<div class="topic-body-title" data-v-3171afda>(.*?)</div>', html)
- context_list = re.findall('<div class="topic-body-content" data-v-3171afda>(.*?)</div>', html)
- posted_time_list = re.findall('div class="topic-time" data-v-3171afda>(.*?)</div>', html)
- for title, context, context_url, posted_time in zip(title_list, context_list, context_urls, posted_time_list):
- data = {
- 'title': title,
- 'context': context,
- 'source_url': url,
- 'link': base_url + context_url,
- 'article_type': tag_title,
- 'article_source': tag_title,
- 'img_url': '',
- 'keyword': '',
- 'posted_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'create_time': int(time.time()),
- 'create_datetime': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- 'repush_times': DEFAULT_RE_PUSH_TIMES
- }
- filter_criteria = {'title': title}
- count = self.mongo.collection.count_documents(filter_criteria)
- if count == 0:
- result = self.mongo.collection.insert_one(data)
- def main(self):
- # 打开浏览器之后, 按照每个网站不同的规则, 进行数据获取, 最后无论成功或者失败, 都放到 self.data_set
- # 每条新闻数据格式: {text: '', url: '', post_time: ''}
- # 跑完所有规则, 在数据库判定是否发送过消息, 数据格式: {text: '', url: '', post_time: '', push_count: 0}
- functions = [
- self.techflow,
- self.panewslab,
- self.foresightnews
- ]
- # 创建并启动线程
- print('创建并启动线程')
- threads = []
- for func in functions:
- thread = threading.Thread(target=func)
- thread.start()
- threads.append(thread)
- # 等待所有线程完成
- for thread in threads:
- thread.join()
- print('程序运行结束')
- if __name__ == "__main__":
- m = MessageSearchKey()
- m.main()
|