Jack 1 månad sedan
förälder
incheckning
879cb36c70
1 ändrade filer med 230 tillägg och 0 borttagningar
  1. 230 0
      to_email/newsnow.py

+ 230 - 0
to_email/newsnow.py

@@ -0,0 +1,230 @@
+import httpx
+import json
+import pandas as pd
+from typing import List, Dict, Any
+import time
+
+
+class NewsNowSpider:
+    def __init__(self):
+        self.base_url = "https://newsnow.busiyi.world/api/s/entire"
+        self.headers = {
+            'authority': 'newsnow.busiyi.world',
+            'accept': 'application/json',
+            'accept-encoding': 'gzip, deflate, br, zstd',
+            'accept-language': 'zh-CN,zh;q=0.7',
+            'content-type': 'application/json',
+            'origin': 'https://newsnow.busiyi.world',
+            'priority': 'u=1, i',
+            'referer': 'https://newsnow.busiyi.world/c/focus',
+            'sec-ch-ua': '"Chromium";v="142", "Brave";v="142", "Not_A Brand";v="99"',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"',
+            'sec-fetch-dest': 'empty',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-site': 'same-origin',
+            'sec-gpc': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36'
+        }
+        self.payload = {
+            "sources": [
+                "cls-depth", "cls-hot", "cls-telegraph",
+                "fastbull-express", "fastbull-news", "gelonghui",
+                "jin10", "mktnews-flash", "wallstreetcn-hot",
+                "wallstreetcn-news", "wallstreetcn-quick", "xueqiu-hotstock"
+            ]
+        }
+
+    def fetch_news(self, timeout: float = 30.0) -> Dict[str, Any]:
+        """
+        获取新闻数据
+
+        Args:
+            timeout: 请求超时时间
+
+        Returns:
+            返回API的JSON响应数据
+        """
+        try:
+            with httpx.Client(headers=self.headers, timeout=timeout) as client:
+                response = client.post(
+                    self.base_url,
+                    json=self.payload,
+                    headers=self.headers
+                )
+
+                response.raise_for_status()  # 如果状态码不是200,抛出异常
+
+                data = response.json()
+                print(f"成功获取数据,共 {len(data)} 条新闻")
+                return data
+
+        except httpx.HTTPStatusError as e:
+            print(f"HTTP错误: {e.response.status_code} - {e.response.text}")
+            return {}
+        except httpx.RequestError as e:
+            print(f"请求错误: {e}")
+            return {}
+        except json.JSONDecodeError as e:
+            print(f"JSON解析错误: {e}")
+            return {}
+        except Exception as e:
+            print(f"未知错误: {e}")
+            return {}
+
+    def parse_news_data(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """
+        解析新闻数据
+
+        Args:
+            data: API返回的原始数据
+
+        Returns:
+            解析后的新闻列表
+        """
+        parsed_news = []
+
+        if not data:
+            return parsed_news
+
+        for item in data:
+            try:
+                news_item = {
+                    'id': item.get('id', ''),
+                    'title': item.get('title', ''),
+                    'content': item.get('content', ''),
+                    'source': item.get('source', ''),
+                    'publish_time': item.get('publishTime', ''),
+                    'create_time': item.get('createTime', ''),
+                    'url': item.get('url', ''),
+                    'image_url': item.get('imageUrl', ''),
+                    'importance': item.get('importance', 0),
+                    'sentiment': item.get('sentiment', 0),
+                    'tags': item.get('tags', []),
+                    'related_stocks': item.get('relatedStocks', [])
+                }
+                parsed_news.append(news_item)
+            except Exception as e:
+                print(f"解析新闻项时出错: {e}")
+                continue
+
+        return parsed_news
+
+    def save_to_json(self, data: List[Dict[str, Any]], filename: str = None):
+        """
+        保存数据到JSON文件
+
+        Args:
+            data: 要保存的数据
+            filename: 文件名,如果为None则使用时间戳
+        """
+        if not filename:
+            timestamp = time.strftime("%Y%m%d_%H%M%S")
+            filename = f"news_data_{timestamp}.json"
+
+        try:
+            with open(filename, 'w', encoding='utf-8') as f:
+                json.dump(data, f, ensure_ascii=False, indent=2)
+            print(f"数据已保存到: {filename}")
+        except Exception as e:
+            print(f"保存文件时出错: {e}")
+
+    def save_to_excel(self, data: List[Dict[str, Any]], filename: str = None):
+        """
+        保存数据到Excel文件
+
+        Args:
+            data: 要保存的数据
+            filename: 文件名,如果为None则使用时间戳
+        """
+        if not data:
+            print("没有数据可保存")
+            return
+
+        if not filename:
+            timestamp = time.strftime("%Y%m%d_%H%M%S")
+            filename = f"news_data_{timestamp}.xlsx"
+
+        try:
+            df = pd.DataFrame(data)
+            df.to_excel(filename, index=False, engine='openpyxl')
+            print(f"数据已保存到: {filename}")
+        except Exception as e:
+            print(f"保存Excel文件时出错: {e}")
+
+    def run(self, save_json: bool = True, save_excel: bool = True):
+        """
+        运行爬虫
+
+        Args:
+            save_json: 是否保存为JSON文件
+            save_excel: 是否保存为Excel文件
+        """
+        print("开始获取新闻数据...")
+
+        # 获取数据
+        raw_data = self.fetch_news()
+
+        if not raw_data:
+            print("未能获取到数据")
+            return
+
+        # 解析数据
+        parsed_data = self.parse_news_data(raw_data)
+
+        if not parsed_data:
+            print("没有解析到有效数据")
+            return
+
+        print(f"成功解析 {len(parsed_data)} 条新闻")
+
+        # 保存数据
+        if save_json:
+            self.save_to_json(parsed_data)
+
+        if save_excel:
+            self.save_to_excel(parsed_data)
+
+        return parsed_data
+
+    def get_news_by_source(self, source: str) -> List[Dict[str, Any]]:
+        """
+        根据来源筛选新闻
+
+        Args:
+            source: 新闻来源
+
+        Returns:
+            筛选后的新闻列表
+        """
+        raw_data = self.fetch_news()
+        if not raw_data:
+            return []
+
+        all_news = self.parse_news_data(raw_data)
+        filtered_news = [news for news in all_news if news.get('source') == source]
+
+        print(f"来源 '{source}' 的新闻数量: {len(filtered_news)}")
+        return filtered_news
+
+
+# 使用示例
+if __name__ == "__main__":
+    # 创建爬虫实例
+    spider = NewsNowSpider()
+
+    # 方法1: 运行完整爬虫(获取所有数据并保存)
+    news_data = spider.run(save_json=True, save_excel=True)
+
+    # 方法2: 只获取特定来源的新闻
+    # wallstreet_news = spider.get_news_by_source("wallstreetcn")
+
+    # 方法3: 只获取数据不保存
+    # raw_data = spider.fetch_news()
+    # parsed_data = spider.parse_news_data(raw_data)
+
+    # 打印前几条新闻
+    if news_data:
+        print("\n前3条新闻标题:")
+        for i, news in enumerate(news_data[:3]):
+            print(f"{i + 1}. {news['title']} (来源: {news['source']})")