| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- # -*- coding: utf-8 -*-
- import os
- import re
- import xml.etree.ElementTree as ET
- from xml.etree.ElementTree import fromstring, ParseError
- import asyncio
- import httpx
- class OPMLParser:
- def __init__(self, file_path):
- """
- 初始化OPML解析器
- :param file_path: OPML文件路径
- """
- self.file_path = file_path
- self.data = None # 用于存储解析后的数据
- def clean_string(self, input_string):
- """
- 清除字符串中的非法字符和多余的空格。
- 合法字符包括字母、数字和下划线。
- """
- # 使用正则表达式替换非法字符为空字符串
- cleaned_string = re.sub(r'[^\w]', '', input_string)
- return cleaned_string
- def parse(self):
- """
- 解析OPML文件为字典,从body节点开始
- """
- tree = ET.parse(self.file_path)
- root = tree.getroot()
- # 找到body节点
- body = root.find(".//body")
- if body is None:
- raise ValueError("OPML文件中未找到body节点!")
- self.data = self._parse_outline(body)
- result = []
- for children in self.data['children']:
- for k, v in children.items():
- if k == 'children':
- for d in v:
- result.append(d)
- return result
- def _parse_outline(self, element):
- """
- 递归解析OPML中的outline元素
- """
- item = {
- "title": self.clean_string(element.get("text")) if element.get("text") else '',
- "xmlUrl": element.get("xmlUrl")
- }
- # 去除值为None的键
- item = {k: v for k, v in item.items() if v is not None}
- # 如果有子元素,递归解析
- children = []
- for child in element:
- children.append(self._parse_outline(child))
- if children:
- item["children"] = children
- return item
- def get_data(self):
- """
- 获取解析后的数据
- """
- if self.data is None:
- raise ValueError("尚未解析数据,请先调用 parse 方法!")
- return self.data
- class GetNews:
- def __init__(self, parsed_data):
- """
- 初始化 GetNews 类
- :param parsed_data: OPMLParser 解析后的数据
- """
- self.parsed_data = parsed_data
- async def fetch_news(self, url):
- """
- 异步请求单个 RSS 链接并解析 XML 数据
- :param url: RSS 链接
- :return: 解析后的新闻数据,请求失败或状态码非200时返回空列表
- """
- try:
- async with httpx.AsyncClient() as client:
- response = await client.get(url)
- if response.status_code != 200:
- return [] # 如果状态码不是200,直接返回空列表
- xml_content = response.text
- try:
- root = fromstring(xml_content)
- items = root.findall(".//item")
- news_list = []
- for item in items:
- title = self.clean_text(item.find("title").text) if item.find("title") is not None else "无标题"
- link = self.clean_text(item.find("link").text) if item.find("link") is not None else "无链接"
- description = self.clean_text(item.find("description").text) if item.find(
- "description") is not None else "无描述"
- news_list.append({
- "title": title,
- "link": link,
- "description": description
- })
- return news_list
- except ParseError:
- return [] # XML 解析失败时返回空列表
- except httpx.RequestError:
- return [] # 请求失败时返回空列表
- def clean_text(self, text):
- """
- 清洗文本,去除HTML标签和特殊字符,返回纯文本
- """
- if not text:
- return ""
- # 去除HTML标签
- clean_text = re.sub(r'<.*?>', '', text)
- # 去除多余的空格和换行符
- clean_text = re.sub(r'\s+', ' ', clean_text).strip()
- return clean_text
- async def get_all_news(self):
- """
- 异步获取所有 RSS 链接的新闻数据
- :return: 所有新闻数据的列表
- """
- tasks = []
- for data in self.parsed_data:
- url = data.get("xmlUrl")
- if url:
- tasks.append(self.fetch_news(url))
- results = await asyncio.gather(*tasks)
- return results
- class SearchByKeyword:
- def __init__(self, data):
- self.data = data
- def search(self, keyword):
- result = {}
- for item in self.data:
- if keyword.lower() in item['title'].lower():
- if keyword.lower() not in result:
- result[keyword] = []
- result[keyword].append(item)
- return result
- # 使用示例
- if __name__ == "__main__":
- opml_file_path = "read_news.opml"
- opml_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), opml_file_path)
- if not os.path.exists(opml_file_path):
- print(f"文件 {opml_file_path} 不存在!")
- exit(1)
- parser = OPMLParser(opml_file_path)
- parsed_data = parser.parse()
- print(f'一共有 {len(parsed_data)} 个订阅源')
- get_news = GetNews(parsed_data)
- # 异步获取所有新闻数据
- loop = asyncio.get_event_loop()
- all_news = loop.run_until_complete(get_news.get_all_news())
- valid_data = []
- for news_list in all_news:
- if news_list:
- for news in news_list:
- valid_data.append(news)
- S = SearchByKeyword(valid_data)
- result = S.search('deepseek')
- for keyword, item in result.items():
- print(f'关键词 {keyword} 的新闻有:{len(item)} 条')
- for news in item:
- print(f'标题:{news["title"]}')
- print(f'链接:{news["link"]}')
- print('-' * 200)
|