import httpx import json import pandas as pd from typing import List, Dict, Any import time class NewsNowSpider: def __init__(self): self.base_url = "https://newsnow.busiyi.world/api/s/entire" self.headers = { 'authority': 'newsnow.busiyi.world', 'accept': 'application/json', 'accept-encoding': 'gzip, deflate, br, zstd', 'accept-language': 'zh-CN,zh;q=0.7', 'content-type': 'application/json', 'origin': 'https://newsnow.busiyi.world', 'priority': 'u=1, i', 'referer': 'https://newsnow.busiyi.world/c/focus', 'sec-ch-ua': '"Chromium";v="142", "Brave";v="142", "Not_A Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'sec-gpc': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36' } self.payload = { "sources": [ "cls-depth", "cls-hot", "cls-telegraph", "fastbull-express", "fastbull-news", "gelonghui", "jin10", "mktnews-flash", "wallstreetcn-hot", "wallstreetcn-news", "wallstreetcn-quick", "xueqiu-hotstock" ] } def fetch_news(self, timeout: float = 30.0) -> Dict[str, Any]: """ 获取新闻数据 Args: timeout: 请求超时时间 Returns: 返回API的JSON响应数据 """ try: with httpx.Client(headers=self.headers, timeout=timeout) as client: response = client.post( self.base_url, json=self.payload, headers=self.headers ) response.raise_for_status() # 如果状态码不是200,抛出异常 data = response.json() print(f"成功获取数据,共 {len(data)} 条新闻") return data except httpx.HTTPStatusError as e: print(f"HTTP错误: {e.response.status_code} - {e.response.text}") return {} except httpx.RequestError as e: print(f"请求错误: {e}") return {} except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return {} except Exception as e: print(f"未知错误: {e}") return {} def parse_news_data(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: """ 解析新闻数据 Args: data: API返回的原始数据 Returns: 解析后的新闻列表 """ parsed_news = [] if not data: return parsed_news for item in data: try: news_item = { 'id': item.get('id', ''), 'title': item.get('title', ''), 'content': item.get('content', ''), 'source': item.get('source', ''), 'publish_time': item.get('publishTime', ''), 'create_time': item.get('createTime', ''), 'url': item.get('url', ''), 'image_url': item.get('imageUrl', ''), 'importance': item.get('importance', 0), 'sentiment': item.get('sentiment', 0), 'tags': item.get('tags', []), 'related_stocks': item.get('relatedStocks', []) } parsed_news.append(news_item) except Exception as e: print(f"解析新闻项时出错: {e}") continue return parsed_news def save_to_json(self, data: List[Dict[str, Any]], filename: str = None): """ 保存数据到JSON文件 Args: data: 要保存的数据 filename: 文件名,如果为None则使用时间戳 """ if not filename: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"news_data_{timestamp}.json" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"数据已保存到: {filename}") except Exception as e: print(f"保存文件时出错: {e}") def save_to_excel(self, data: List[Dict[str, Any]], filename: str = None): """ 保存数据到Excel文件 Args: data: 要保存的数据 filename: 文件名,如果为None则使用时间戳 """ if not data: print("没有数据可保存") return if not filename: timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"news_data_{timestamp}.xlsx" try: df = pd.DataFrame(data) df.to_excel(filename, index=False, engine='openpyxl') print(f"数据已保存到: {filename}") except Exception as e: print(f"保存Excel文件时出错: {e}") def run(self, save_json: bool = True, save_excel: bool = True): """ 运行爬虫 Args: save_json: 是否保存为JSON文件 save_excel: 是否保存为Excel文件 """ print("开始获取新闻数据...") # 获取数据 raw_data = self.fetch_news() if not raw_data: print("未能获取到数据") return # 解析数据 parsed_data = self.parse_news_data(raw_data) if not parsed_data: print("没有解析到有效数据") return print(f"成功解析 {len(parsed_data)} 条新闻") # 保存数据 if save_json: self.save_to_json(parsed_data) if save_excel: self.save_to_excel(parsed_data) return parsed_data def get_news_by_source(self, source: str) -> List[Dict[str, Any]]: """ 根据来源筛选新闻 Args: source: 新闻来源 Returns: 筛选后的新闻列表 """ raw_data = self.fetch_news() if not raw_data: return [] all_news = self.parse_news_data(raw_data) filtered_news = [news for news in all_news if news.get('source') == source] print(f"来源 '{source}' 的新闻数量: {len(filtered_news)}") return filtered_news # 使用示例 if __name__ == "__main__": # 创建爬虫实例 spider = NewsNowSpider() # 方法1: 运行完整爬虫(获取所有数据并保存) news_data = spider.run(save_json=True, save_excel=True) # 方法2: 只获取特定来源的新闻 # wallstreet_news = spider.get_news_by_source("wallstreetcn") # 方法3: 只获取数据不保存 # raw_data = spider.fetch_news() # parsed_data = spider.parse_news_data(raw_data) # 打印前几条新闻 if news_data: print("\n前3条新闻标题:") for i, news in enumerate(news_data[:3]): print(f"{i + 1}. {news['title']} (来源: {news['source']})")