parser.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. # coding=utf-8
  2. """
  3. RSS 解析器
  4. 支持 RSS 2.0、Atom 和 JSON Feed 1.1 格式的解析
  5. """
  6. import re
  7. import html
  8. import json
  9. from dataclasses import dataclass
  10. from datetime import datetime
  11. from typing import List, Optional, Dict, Any
  12. from email.utils import parsedate_to_datetime
  13. try:
  14. import feedparser
  15. HAS_FEEDPARSER = True
  16. except ImportError:
  17. HAS_FEEDPARSER = False
  18. feedparser = None
  19. @dataclass
  20. class ParsedRSSItem:
  21. """解析后的 RSS 条目"""
  22. title: str
  23. url: str
  24. published_at: Optional[str] = None
  25. summary: Optional[str] = None
  26. author: Optional[str] = None
  27. guid: Optional[str] = None
  28. class RSSParser:
  29. """RSS 解析器"""
  30. def __init__(self, max_summary_length: int = 500):
  31. """
  32. 初始化解析器
  33. Args:
  34. max_summary_length: 摘要最大长度
  35. """
  36. if not HAS_FEEDPARSER:
  37. raise ImportError("RSS 解析需要安装 feedparser: pip install feedparser")
  38. self.max_summary_length = max_summary_length
  39. def parse(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
  40. """
  41. 解析 RSS/Atom/JSON Feed 内容
  42. Args:
  43. content: Feed 内容(XML 或 JSON)
  44. feed_url: Feed URL(用于错误提示)
  45. Returns:
  46. 解析后的条目列表
  47. """
  48. # 先尝试检测 JSON Feed
  49. if self._is_json_feed(content):
  50. return self._parse_json_feed(content, feed_url)
  51. # 使用 feedparser 解析 RSS/Atom
  52. feed = feedparser.parse(content)
  53. if feed.bozo and not feed.entries:
  54. raise ValueError(f"RSS 解析失败 ({feed_url}): {feed.bozo_exception}")
  55. items = []
  56. for entry in feed.entries:
  57. item = self._parse_entry(entry)
  58. if item:
  59. items.append(item)
  60. return items
  61. def _is_json_feed(self, content: str) -> bool:
  62. """
  63. 检测内容是否为 JSON Feed 格式
  64. JSON Feed 必须包含 version 字段,值为 https://jsonfeed.org/version/1 或 1.1
  65. """
  66. content = content.strip()
  67. if not content.startswith("{"):
  68. return False
  69. try:
  70. data = json.loads(content)
  71. version = data.get("version", "")
  72. return "jsonfeed.org" in version
  73. except (json.JSONDecodeError, TypeError):
  74. return False
  75. def _parse_json_feed(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
  76. """
  77. 解析 JSON Feed 1.1 格式
  78. JSON Feed 规范: https://www.jsonfeed.org/version/1.1/
  79. Args:
  80. content: JSON Feed 内容
  81. feed_url: Feed URL(用于错误提示)
  82. Returns:
  83. 解析后的条目列表
  84. """
  85. try:
  86. data = json.loads(content)
  87. except json.JSONDecodeError as e:
  88. raise ValueError(f"JSON Feed 解析失败 ({feed_url}): {e}")
  89. items_data = data.get("items", [])
  90. if not items_data:
  91. return []
  92. items = []
  93. for item_data in items_data:
  94. item = self._parse_json_feed_item(item_data)
  95. if item:
  96. items.append(item)
  97. return items
  98. def _parse_json_feed_item(self, item_data: Dict[str, Any]) -> Optional[ParsedRSSItem]:
  99. """解析单个 JSON Feed 条目"""
  100. url = item_data.get("url", "") or item_data.get("external_url", "")
  101. title = item_data.get("title", "")
  102. if not title:
  103. content_text = item_data.get("content_text", "")
  104. if content_text:
  105. title = content_text[:20] + ("..." if len(content_text) > 20 else "")
  106. title = self._clean_text(title)
  107. if not title and url:
  108. title = url
  109. if not title:
  110. return None
  111. # 发布时间(ISO 8601 格式)
  112. published_at = None
  113. date_str = item_data.get("date_published") or item_data.get("date_modified")
  114. if date_str:
  115. published_at = self._parse_iso_date(date_str)
  116. # 摘要:优先 summary,否则使用 content_text
  117. summary = item_data.get("summary", "")
  118. if not summary:
  119. content_text = item_data.get("content_text", "")
  120. content_html = item_data.get("content_html", "")
  121. summary = content_text or self._clean_text(content_html)
  122. if summary:
  123. summary = self._clean_text(summary)
  124. if len(summary) > self.max_summary_length:
  125. summary = summary[:self.max_summary_length] + "..."
  126. # 作者
  127. author = None
  128. authors = item_data.get("authors", [])
  129. if authors:
  130. names = [a.get("name", "") for a in authors if isinstance(a, dict) and a.get("name")]
  131. if names:
  132. author = ", ".join(names)
  133. # GUID
  134. guid = item_data.get("id", "") or url
  135. return ParsedRSSItem(
  136. title=title,
  137. url=url,
  138. published_at=published_at,
  139. summary=summary or None,
  140. author=author,
  141. guid=guid,
  142. )
  143. def _parse_iso_date(self, date_str: str) -> Optional[str]:
  144. """解析 ISO 8601 日期格式"""
  145. if not date_str:
  146. return None
  147. try:
  148. # 处理常见的 ISO 8601 格式
  149. # 替换 Z 为 +00:00
  150. date_str = date_str.replace("Z", "+00:00")
  151. dt = datetime.fromisoformat(date_str)
  152. return dt.isoformat()
  153. except (ValueError, TypeError):
  154. pass
  155. return None
  156. def parse_url(self, url: str, timeout: int = 10) -> List[ParsedRSSItem]:
  157. """
  158. 从 URL 解析 RSS
  159. Args:
  160. url: RSS URL
  161. timeout: 超时时间(秒)
  162. Returns:
  163. 解析后的条目列表
  164. """
  165. import requests
  166. response = requests.get(url, timeout=timeout, headers={
  167. "User-Agent": "TrendRadar/2.0 RSS Reader"
  168. })
  169. response.raise_for_status()
  170. return self.parse(response.text, url)
  171. def _parse_entry(self, entry: Any) -> Optional[ParsedRSSItem]:
  172. """解析单个条目"""
  173. title = self._clean_text(entry.get("title", ""))
  174. url = entry.get("link", "")
  175. if not url:
  176. links = entry.get("links", [])
  177. for link in links:
  178. if link.get("rel") == "alternate" or link.get("type", "").startswith("text/html"):
  179. url = link.get("href", "")
  180. break
  181. if not url and links:
  182. url = links[0].get("href", "")
  183. if not title:
  184. raw_summary = entry.get("summary") or entry.get("description", "")
  185. if not raw_summary:
  186. content = entry.get("content", [])
  187. if content and isinstance(content, list):
  188. raw_summary = content[0].get("value", "")
  189. if raw_summary:
  190. title = self._clean_text(raw_summary)
  191. if len(title) > 20:
  192. title = title[:20] + "..."
  193. if not title and url:
  194. title = url
  195. if not title:
  196. return None
  197. published_at = self._parse_date(entry)
  198. summary = self._parse_summary(entry)
  199. author = self._parse_author(entry)
  200. guid = entry.get("id") or entry.get("guid", {}).get("value") or url
  201. return ParsedRSSItem(
  202. title=title,
  203. url=url,
  204. published_at=published_at,
  205. summary=summary,
  206. author=author,
  207. guid=guid,
  208. )
  209. def _clean_text(self, text: str) -> str:
  210. """清理文本"""
  211. if not text:
  212. return ""
  213. # 解码 HTML 实体
  214. text = html.unescape(text)
  215. # 移除 HTML 标签
  216. text = re.sub(r'<[^>]+>', '', text)
  217. # 移除多余空白
  218. text = re.sub(r'\s+', ' ', text)
  219. return text.strip()
  220. def _parse_date(self, entry: Any) -> Optional[str]:
  221. """解析发布日期"""
  222. # feedparser 会自动解析日期到 published_parsed
  223. date_struct = entry.get("published_parsed") or entry.get("updated_parsed")
  224. if date_struct:
  225. try:
  226. dt = datetime(*date_struct[:6])
  227. return dt.isoformat()
  228. except (ValueError, TypeError):
  229. pass
  230. # 尝试手动解析
  231. date_str = entry.get("published") or entry.get("updated")
  232. if date_str:
  233. try:
  234. dt = parsedate_to_datetime(date_str)
  235. return dt.isoformat()
  236. except (ValueError, TypeError):
  237. pass
  238. # 尝试 ISO 格式
  239. try:
  240. dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
  241. return dt.isoformat()
  242. except (ValueError, TypeError):
  243. pass
  244. return None
  245. def _parse_summary(self, entry: Any) -> Optional[str]:
  246. """解析摘要"""
  247. summary = entry.get("summary") or entry.get("description", "")
  248. if not summary:
  249. # 尝试从 content 获取
  250. content = entry.get("content", [])
  251. if content and isinstance(content, list):
  252. summary = content[0].get("value", "")
  253. if not summary:
  254. return None
  255. summary = self._clean_text(summary)
  256. # 截断过长的摘要
  257. if len(summary) > self.max_summary_length:
  258. summary = summary[:self.max_summary_length] + "..."
  259. return summary
  260. def _parse_author(self, entry: Any) -> Optional[str]:
  261. """解析作者"""
  262. author = entry.get("author")
  263. if author:
  264. return self._clean_text(author)
  265. # 尝试从 dc:creator 获取
  266. author = entry.get("dc_creator")
  267. if author:
  268. return self._clean_text(author)
  269. # 尝试从 authors 列表获取
  270. authors = entry.get("authors", [])
  271. if authors:
  272. names = [a.get("name", "") for a in authors if a.get("name")]
  273. if names:
  274. return ", ".join(names)
  275. return None