read_news.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import re
  4. import xml.etree.ElementTree as ET
  5. from xml.etree.ElementTree import fromstring, ParseError
  6. import asyncio
  7. import httpx
  8. class OPMLParser:
  9. def __init__(self, file_path):
  10. """
  11. 初始化OPML解析器
  12. :param file_path: OPML文件路径
  13. """
  14. self.file_path = file_path
  15. self.data = None # 用于存储解析后的数据
  16. def clean_string(self, input_string):
  17. """
  18. 清除字符串中的非法字符和多余的空格。
  19. 合法字符包括字母、数字和下划线。
  20. """
  21. # 使用正则表达式替换非法字符为空字符串
  22. cleaned_string = re.sub(r'[^\w]', '', input_string)
  23. return cleaned_string
  24. def parse(self):
  25. """
  26. 解析OPML文件为字典,从body节点开始
  27. """
  28. tree = ET.parse(self.file_path)
  29. root = tree.getroot()
  30. # 找到body节点
  31. body = root.find(".//body")
  32. if body is None:
  33. raise ValueError("OPML文件中未找到body节点!")
  34. self.data = self._parse_outline(body)
  35. result = []
  36. for children in self.data['children']:
  37. for k, v in children.items():
  38. if k == 'children':
  39. for d in v:
  40. result.append(d)
  41. return result
  42. def _parse_outline(self, element):
  43. """
  44. 递归解析OPML中的outline元素
  45. """
  46. item = {
  47. "title": self.clean_string(element.get("text")) if element.get("text") else '',
  48. "xmlUrl": element.get("xmlUrl")
  49. }
  50. # 去除值为None的键
  51. item = {k: v for k, v in item.items() if v is not None}
  52. # 如果有子元素,递归解析
  53. children = []
  54. for child in element:
  55. children.append(self._parse_outline(child))
  56. if children:
  57. item["children"] = children
  58. return item
  59. def get_data(self):
  60. """
  61. 获取解析后的数据
  62. """
  63. if self.data is None:
  64. raise ValueError("尚未解析数据,请先调用 parse 方法!")
  65. return self.data
  66. class GetNews:
  67. def __init__(self, parsed_data):
  68. """
  69. 初始化 GetNews 类
  70. :param parsed_data: OPMLParser 解析后的数据
  71. """
  72. self.parsed_data = parsed_data
  73. async def fetch_news(self, url):
  74. """
  75. 异步请求单个 RSS 链接并解析 XML 数据
  76. :param url: RSS 链接
  77. :return: 解析后的新闻数据,请求失败或状态码非200时返回空列表
  78. """
  79. try:
  80. async with httpx.AsyncClient() as client:
  81. response = await client.get(url)
  82. if response.status_code != 200:
  83. return [] # 如果状态码不是200,直接返回空列表
  84. xml_content = response.text
  85. try:
  86. root = fromstring(xml_content)
  87. items = root.findall(".//item")
  88. news_list = []
  89. for item in items:
  90. title = self.clean_text(item.find("title").text) if item.find("title") is not None else "无标题"
  91. link = self.clean_text(item.find("link").text) if item.find("link") is not None else "无链接"
  92. description = self.clean_text(item.find("description").text) if item.find(
  93. "description") is not None else "无描述"
  94. news_list.append({
  95. "title": title,
  96. "link": link,
  97. "description": description
  98. })
  99. return news_list
  100. except ParseError:
  101. return [] # XML 解析失败时返回空列表
  102. except httpx.RequestError:
  103. return [] # 请求失败时返回空列表
  104. def clean_text(self, text):
  105. """
  106. 清洗文本,去除HTML标签和特殊字符,返回纯文本
  107. """
  108. if not text:
  109. return ""
  110. # 去除HTML标签
  111. clean_text = re.sub(r'<.*?>', '', text)
  112. # 去除多余的空格和换行符
  113. clean_text = re.sub(r'\s+', ' ', clean_text).strip()
  114. return clean_text
  115. async def get_all_news(self):
  116. """
  117. 异步获取所有 RSS 链接的新闻数据
  118. :return: 所有新闻数据的列表
  119. """
  120. tasks = []
  121. for data in self.parsed_data:
  122. url = data.get("xmlUrl")
  123. if url:
  124. tasks.append(self.fetch_news(url))
  125. results = await asyncio.gather(*tasks)
  126. return results
  127. class SearchByKeyword:
  128. def __init__(self, data):
  129. self.data = data
  130. def search(self, keyword):
  131. result = {}
  132. for item in self.data:
  133. if keyword.lower() in item['title'].lower():
  134. if keyword.lower() not in result:
  135. result[keyword] = []
  136. result[keyword].append(item)
  137. return result
  138. # 使用示例
  139. if __name__ == "__main__":
  140. opml_file_path = "read_news.opml"
  141. opml_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), opml_file_path)
  142. if not os.path.exists(opml_file_path):
  143. print(f"文件 {opml_file_path} 不存在!")
  144. exit(1)
  145. parser = OPMLParser(opml_file_path)
  146. parsed_data = parser.parse()
  147. print(f'一共有 {len(parsed_data)} 个订阅源')
  148. get_news = GetNews(parsed_data)
  149. # 异步获取所有新闻数据
  150. loop = asyncio.get_event_loop()
  151. all_news = loop.run_until_complete(get_news.get_all_news())
  152. valid_data = []
  153. for news_list in all_news:
  154. if news_list:
  155. for news in news_list:
  156. valid_data.append(news)
  157. S = SearchByKeyword(valid_data)
  158. result = S.search('deepseek')
  159. for keyword, item in result.items():
  160. print(f'关键词 {keyword} 的新闻有:{len(item)} 条')
  161. for news in item:
  162. print(f'标题:{news["title"]}')
  163. print(f'链接:{news["link"]}')
  164. print('-' * 200)