parser.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. # coding=utf-8
  2. """
  3. RSS 解析器
  4. 支持 RSS 2.0、Atom 和 JSON Feed 1.1 格式的解析
  5. """
  6. import re
  7. import html
  8. import json
  9. from dataclasses import dataclass
  10. from datetime import datetime
  11. from typing import List, Optional, Dict, Any
  12. from email.utils import parsedate_to_datetime
  13. try:
  14. import feedparser
  15. HAS_FEEDPARSER = True
  16. except ImportError:
  17. HAS_FEEDPARSER = False
  18. feedparser = None
  19. @dataclass
  20. class ParsedRSSItem:
  21. """解析后的 RSS 条目"""
  22. title: str
  23. url: str
  24. published_at: Optional[str] = None
  25. summary: Optional[str] = None
  26. author: Optional[str] = None
  27. guid: Optional[str] = None
  28. class RSSParser:
  29. """RSS 解析器"""
  30. def __init__(self, max_summary_length: int = 500):
  31. """
  32. 初始化解析器
  33. Args:
  34. max_summary_length: 摘要最大长度
  35. """
  36. if not HAS_FEEDPARSER:
  37. raise ImportError("RSS 解析需要安装 feedparser: pip install feedparser")
  38. self.max_summary_length = max_summary_length
  39. def parse(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
  40. """
  41. 解析 RSS/Atom/JSON Feed 内容
  42. Args:
  43. content: Feed 内容(XML 或 JSON)
  44. feed_url: Feed URL(用于错误提示)
  45. Returns:
  46. 解析后的条目列表
  47. """
  48. # 先尝试检测 JSON Feed
  49. if self._is_json_feed(content):
  50. return self._parse_json_feed(content, feed_url)
  51. # 使用 feedparser 解析 RSS/Atom
  52. feed = feedparser.parse(content)
  53. if feed.bozo and not feed.entries:
  54. raise ValueError(f"RSS 解析失败 ({feed_url}): {feed.bozo_exception}")
  55. items = []
  56. for entry in feed.entries:
  57. item = self._parse_entry(entry)
  58. if item:
  59. items.append(item)
  60. return items
  61. def _is_json_feed(self, content: str) -> bool:
  62. """
  63. 检测内容是否为 JSON Feed 格式
  64. JSON Feed 必须包含 version 字段,值为 https://jsonfeed.org/version/1 或 1.1
  65. """
  66. content = content.strip()
  67. if not content.startswith("{"):
  68. return False
  69. try:
  70. data = json.loads(content)
  71. version = data.get("version", "")
  72. return "jsonfeed.org" in version
  73. except (json.JSONDecodeError, TypeError):
  74. return False
  75. def _parse_json_feed(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
  76. """
  77. 解析 JSON Feed 1.1 格式
  78. JSON Feed 规范: https://www.jsonfeed.org/version/1.1/
  79. Args:
  80. content: JSON Feed 内容
  81. feed_url: Feed URL(用于错误提示)
  82. Returns:
  83. 解析后的条目列表
  84. """
  85. try:
  86. data = json.loads(content)
  87. except json.JSONDecodeError as e:
  88. raise ValueError(f"JSON Feed 解析失败 ({feed_url}): {e}")
  89. items_data = data.get("items", [])
  90. if not items_data:
  91. return []
  92. items = []
  93. for item_data in items_data:
  94. item = self._parse_json_feed_item(item_data)
  95. if item:
  96. items.append(item)
  97. return items
  98. def _parse_json_feed_item(self, item_data: Dict[str, Any]) -> Optional[ParsedRSSItem]:
  99. """解析单个 JSON Feed 条目"""
  100. # 标题:优先 title,否则使用 content_text 的前 100 字符
  101. title = item_data.get("title", "")
  102. if not title:
  103. content_text = item_data.get("content_text", "")
  104. if content_text:
  105. title = content_text[:100] + ("..." if len(content_text) > 100 else "")
  106. title = self._clean_text(title)
  107. if not title:
  108. return None
  109. # URL
  110. url = item_data.get("url", "") or item_data.get("external_url", "")
  111. # 发布时间(ISO 8601 格式)
  112. published_at = None
  113. date_str = item_data.get("date_published") or item_data.get("date_modified")
  114. if date_str:
  115. published_at = self._parse_iso_date(date_str)
  116. # 摘要:优先 summary,否则使用 content_text
  117. summary = item_data.get("summary", "")
  118. if not summary:
  119. content_text = item_data.get("content_text", "")
  120. content_html = item_data.get("content_html", "")
  121. summary = content_text or self._clean_text(content_html)
  122. if summary:
  123. summary = self._clean_text(summary)
  124. if len(summary) > self.max_summary_length:
  125. summary = summary[:self.max_summary_length] + "..."
  126. # 作者
  127. author = None
  128. authors = item_data.get("authors", [])
  129. if authors:
  130. names = [a.get("name", "") for a in authors if isinstance(a, dict) and a.get("name")]
  131. if names:
  132. author = ", ".join(names)
  133. # GUID
  134. guid = item_data.get("id", "") or url
  135. return ParsedRSSItem(
  136. title=title,
  137. url=url,
  138. published_at=published_at,
  139. summary=summary or None,
  140. author=author,
  141. guid=guid,
  142. )
  143. def _parse_iso_date(self, date_str: str) -> Optional[str]:
  144. """解析 ISO 8601 日期格式"""
  145. if not date_str:
  146. return None
  147. try:
  148. # 处理常见的 ISO 8601 格式
  149. # 替换 Z 为 +00:00
  150. date_str = date_str.replace("Z", "+00:00")
  151. dt = datetime.fromisoformat(date_str)
  152. return dt.isoformat()
  153. except (ValueError, TypeError):
  154. pass
  155. return None
  156. def parse_url(self, url: str, timeout: int = 10) -> List[ParsedRSSItem]:
  157. """
  158. 从 URL 解析 RSS
  159. Args:
  160. url: RSS URL
  161. timeout: 超时时间(秒)
  162. Returns:
  163. 解析后的条目列表
  164. """
  165. import requests
  166. response = requests.get(url, timeout=timeout, headers={
  167. "User-Agent": "TrendRadar/2.0 RSS Reader"
  168. })
  169. response.raise_for_status()
  170. return self.parse(response.text, url)
  171. def _parse_entry(self, entry: Any) -> Optional[ParsedRSSItem]:
  172. """解析单个条目"""
  173. title = self._clean_text(entry.get("title", ""))
  174. if not title:
  175. return None
  176. url = entry.get("link", "")
  177. if not url:
  178. # 尝试从 links 中获取
  179. links = entry.get("links", [])
  180. for link in links:
  181. if link.get("rel") == "alternate" or link.get("type", "").startswith("text/html"):
  182. url = link.get("href", "")
  183. break
  184. if not url and links:
  185. url = links[0].get("href", "")
  186. published_at = self._parse_date(entry)
  187. summary = self._parse_summary(entry)
  188. author = self._parse_author(entry)
  189. guid = entry.get("id") or entry.get("guid", {}).get("value") or url
  190. return ParsedRSSItem(
  191. title=title,
  192. url=url,
  193. published_at=published_at,
  194. summary=summary,
  195. author=author,
  196. guid=guid,
  197. )
  198. def _clean_text(self, text: str) -> str:
  199. """清理文本"""
  200. if not text:
  201. return ""
  202. # 解码 HTML 实体
  203. text = html.unescape(text)
  204. # 移除 HTML 标签
  205. text = re.sub(r'<[^>]+>', '', text)
  206. # 移除多余空白
  207. text = re.sub(r'\s+', ' ', text)
  208. return text.strip()
  209. def _parse_date(self, entry: Any) -> Optional[str]:
  210. """解析发布日期"""
  211. # feedparser 会自动解析日期到 published_parsed
  212. date_struct = entry.get("published_parsed") or entry.get("updated_parsed")
  213. if date_struct:
  214. try:
  215. dt = datetime(*date_struct[:6])
  216. return dt.isoformat()
  217. except (ValueError, TypeError):
  218. pass
  219. # 尝试手动解析
  220. date_str = entry.get("published") or entry.get("updated")
  221. if date_str:
  222. try:
  223. dt = parsedate_to_datetime(date_str)
  224. return dt.isoformat()
  225. except (ValueError, TypeError):
  226. pass
  227. # 尝试 ISO 格式
  228. try:
  229. dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
  230. return dt.isoformat()
  231. except (ValueError, TypeError):
  232. pass
  233. return None
  234. def _parse_summary(self, entry: Any) -> Optional[str]:
  235. """解析摘要"""
  236. summary = entry.get("summary") or entry.get("description", "")
  237. if not summary:
  238. # 尝试从 content 获取
  239. content = entry.get("content", [])
  240. if content and isinstance(content, list):
  241. summary = content[0].get("value", "")
  242. if not summary:
  243. return None
  244. summary = self._clean_text(summary)
  245. # 截断过长的摘要
  246. if len(summary) > self.max_summary_length:
  247. summary = summary[:self.max_summary_length] + "..."
  248. return summary
  249. def _parse_author(self, entry: Any) -> Optional[str]:
  250. """解析作者"""
  251. author = entry.get("author")
  252. if author:
  253. return self._clean_text(author)
  254. # 尝试从 dc:creator 获取
  255. author = entry.get("dc_creator")
  256. if author:
  257. return self._clean_text(author)
  258. # 尝试从 authors 列表获取
  259. authors = entry.get("authors", [])
  260. if authors:
  261. names = [a.get("name", "") for a in authors if a.get("name")]
  262. if names:
  263. return ", ".join(names)
  264. return None