| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- # coding=utf-8
- """
- RSS 解析器
- 支持 RSS 2.0、Atom 和 JSON Feed 1.1 格式的解析
- """
- import re
- import html
- import json
- from dataclasses import dataclass
- from datetime import datetime
- from typing import List, Optional, Dict, Any
- from email.utils import parsedate_to_datetime
- try:
- import feedparser
- HAS_FEEDPARSER = True
- except ImportError:
- HAS_FEEDPARSER = False
- feedparser = None
- @dataclass
- class ParsedRSSItem:
- """解析后的 RSS 条目"""
- title: str
- url: str
- published_at: Optional[str] = None
- summary: Optional[str] = None
- author: Optional[str] = None
- guid: Optional[str] = None
- class RSSParser:
- """RSS 解析器"""
- def __init__(self, max_summary_length: int = 500):
- """
- 初始化解析器
- Args:
- max_summary_length: 摘要最大长度
- """
- if not HAS_FEEDPARSER:
- raise ImportError("RSS 解析需要安装 feedparser: pip install feedparser")
- self.max_summary_length = max_summary_length
- def parse(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
- """
- 解析 RSS/Atom/JSON Feed 内容
- Args:
- content: Feed 内容(XML 或 JSON)
- feed_url: Feed URL(用于错误提示)
- Returns:
- 解析后的条目列表
- """
- # 先尝试检测 JSON Feed
- if self._is_json_feed(content):
- return self._parse_json_feed(content, feed_url)
- # 使用 feedparser 解析 RSS/Atom
- feed = feedparser.parse(content)
- if feed.bozo and not feed.entries:
- raise ValueError(f"RSS 解析失败 ({feed_url}): {feed.bozo_exception}")
- items = []
- for entry in feed.entries:
- item = self._parse_entry(entry)
- if item:
- items.append(item)
- return items
- def _is_json_feed(self, content: str) -> bool:
- """
- 检测内容是否为 JSON Feed 格式
- JSON Feed 必须包含 version 字段,值为 https://jsonfeed.org/version/1 或 1.1
- """
- content = content.strip()
- if not content.startswith("{"):
- return False
- try:
- data = json.loads(content)
- version = data.get("version", "")
- return "jsonfeed.org" in version
- except (json.JSONDecodeError, TypeError):
- return False
- def _parse_json_feed(self, content: str, feed_url: str = "") -> List[ParsedRSSItem]:
- """
- 解析 JSON Feed 1.1 格式
- JSON Feed 规范: https://www.jsonfeed.org/version/1.1/
- Args:
- content: JSON Feed 内容
- feed_url: Feed URL(用于错误提示)
- Returns:
- 解析后的条目列表
- """
- try:
- data = json.loads(content)
- except json.JSONDecodeError as e:
- raise ValueError(f"JSON Feed 解析失败 ({feed_url}): {e}")
- items_data = data.get("items", [])
- if not items_data:
- return []
- items = []
- for item_data in items_data:
- item = self._parse_json_feed_item(item_data)
- if item:
- items.append(item)
- return items
- def _parse_json_feed_item(self, item_data: Dict[str, Any]) -> Optional[ParsedRSSItem]:
- """解析单个 JSON Feed 条目"""
- url = item_data.get("url", "") or item_data.get("external_url", "")
- title = item_data.get("title", "")
- if not title:
- content_text = item_data.get("content_text", "")
- if content_text:
- title = content_text[:20] + ("..." if len(content_text) > 20 else "")
- title = self._clean_text(title)
- if not title and url:
- title = url
- if not title:
- return None
- # 发布时间(ISO 8601 格式)
- published_at = None
- date_str = item_data.get("date_published") or item_data.get("date_modified")
- if date_str:
- published_at = self._parse_iso_date(date_str)
- # 摘要:优先 summary,否则使用 content_text
- summary = item_data.get("summary", "")
- if not summary:
- content_text = item_data.get("content_text", "")
- content_html = item_data.get("content_html", "")
- summary = content_text or self._clean_text(content_html)
- if summary:
- summary = self._clean_text(summary)
- if len(summary) > self.max_summary_length:
- summary = summary[:self.max_summary_length] + "..."
- # 作者
- author = None
- authors = item_data.get("authors", [])
- if authors:
- names = [a.get("name", "") for a in authors if isinstance(a, dict) and a.get("name")]
- if names:
- author = ", ".join(names)
- # GUID
- guid = item_data.get("id", "") or url
- return ParsedRSSItem(
- title=title,
- url=url,
- published_at=published_at,
- summary=summary or None,
- author=author,
- guid=guid,
- )
- def _parse_iso_date(self, date_str: str) -> Optional[str]:
- """解析 ISO 8601 日期格式"""
- if not date_str:
- return None
- try:
- # 处理常见的 ISO 8601 格式
- # 替换 Z 为 +00:00
- date_str = date_str.replace("Z", "+00:00")
- dt = datetime.fromisoformat(date_str)
- return dt.isoformat()
- except (ValueError, TypeError):
- pass
- return None
- def parse_url(self, url: str, timeout: int = 10) -> List[ParsedRSSItem]:
- """
- 从 URL 解析 RSS
- Args:
- url: RSS URL
- timeout: 超时时间(秒)
- Returns:
- 解析后的条目列表
- """
- import requests
- response = requests.get(url, timeout=timeout, headers={
- "User-Agent": "TrendRadar/2.0 RSS Reader"
- })
- response.raise_for_status()
- return self.parse(response.text, url)
- def _parse_entry(self, entry: Any) -> Optional[ParsedRSSItem]:
- """解析单个条目"""
- title = self._clean_text(entry.get("title", ""))
- url = entry.get("link", "")
- if not url:
- links = entry.get("links", [])
- for link in links:
- if link.get("rel") == "alternate" or link.get("type", "").startswith("text/html"):
- url = link.get("href", "")
- break
- if not url and links:
- url = links[0].get("href", "")
- if not title:
- raw_summary = entry.get("summary") or entry.get("description", "")
- if not raw_summary:
- content = entry.get("content", [])
- if content and isinstance(content, list):
- raw_summary = content[0].get("value", "")
- if raw_summary:
- title = self._clean_text(raw_summary)
- if len(title) > 20:
- title = title[:20] + "..."
- if not title and url:
- title = url
- if not title:
- return None
- published_at = self._parse_date(entry)
- summary = self._parse_summary(entry)
- author = self._parse_author(entry)
- guid = entry.get("id") or entry.get("guid", {}).get("value") or url
- return ParsedRSSItem(
- title=title,
- url=url,
- published_at=published_at,
- summary=summary,
- author=author,
- guid=guid,
- )
- def _clean_text(self, text: str) -> str:
- """清理文本"""
- if not text:
- return ""
- # 解码 HTML 实体
- text = html.unescape(text)
- # 移除 HTML 标签
- text = re.sub(r'<[^>]+>', '', text)
- # 移除多余空白
- text = re.sub(r'\s+', ' ', text)
- return text.strip()
- def _parse_date(self, entry: Any) -> Optional[str]:
- """解析发布日期"""
- # feedparser 会自动解析日期到 published_parsed
- date_struct = entry.get("published_parsed") or entry.get("updated_parsed")
- if date_struct:
- try:
- dt = datetime(*date_struct[:6])
- return dt.isoformat()
- except (ValueError, TypeError):
- pass
- # 尝试手动解析
- date_str = entry.get("published") or entry.get("updated")
- if date_str:
- try:
- dt = parsedate_to_datetime(date_str)
- return dt.isoformat()
- except (ValueError, TypeError):
- pass
- # 尝试 ISO 格式
- try:
- dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
- return dt.isoformat()
- except (ValueError, TypeError):
- pass
- return None
- def _parse_summary(self, entry: Any) -> Optional[str]:
- """解析摘要"""
- summary = entry.get("summary") or entry.get("description", "")
- if not summary:
- # 尝试从 content 获取
- content = entry.get("content", [])
- if content and isinstance(content, list):
- summary = content[0].get("value", "")
- if not summary:
- return None
- summary = self._clean_text(summary)
- # 截断过长的摘要
- if len(summary) > self.max_summary_length:
- summary = summary[:self.max_summary_length] + "..."
- return summary
- def _parse_author(self, entry: Any) -> Optional[str]:
- """解析作者"""
- author = entry.get("author")
- if author:
- return self._clean_text(author)
- # 尝试从 dc:creator 获取
- author = entry.get("dc_creator")
- if author:
- return self._clean_text(author)
- # 尝试从 authors 列表获取
- authors = entry.get("authors", [])
- if authors:
- names = [a.get("name", "") for a in authors if a.get("name")]
- if names:
- return ", ".join(names)
- return None
|