fetcher.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. # coding=utf-8
  2. """
  3. RSS 抓取器
  4. 负责从配置的 RSS 源抓取数据并转换为标准格式
  5. """
  6. import time
  7. import random
  8. from dataclasses import dataclass
  9. from typing import List, Dict, Optional, Tuple
  10. import requests
  11. from .parser import RSSParser
  12. from trendradar.storage.base import RSSItem, RSSData
  13. from trendradar.utils.time import get_configured_time, is_within_days, DEFAULT_TIMEZONE
  14. @dataclass
  15. class RSSFeedConfig:
  16. """RSS 源配置"""
  17. id: str # 源 ID
  18. name: str # 显示名称
  19. url: str # RSS URL
  20. max_items: int = 0 # 最大条目数(0=不限制)
  21. enabled: bool = True # 是否启用
  22. max_age_days: Optional[int] = None # 文章最大年龄(天),覆盖全局设置;None=使用全局,0=禁用过滤
  23. class RSSFetcher:
  24. """RSS 抓取器"""
  25. def __init__(
  26. self,
  27. feeds: List[RSSFeedConfig],
  28. request_interval: int = 2000,
  29. timeout: int = 15,
  30. use_proxy: bool = False,
  31. proxy_url: str = "",
  32. timezone: str = DEFAULT_TIMEZONE,
  33. freshness_enabled: bool = True,
  34. default_max_age_days: int = 3,
  35. ):
  36. """
  37. 初始化抓取器
  38. Args:
  39. feeds: RSS 源配置列表
  40. request_interval: 请求间隔(毫秒)
  41. timeout: 请求超时(秒)
  42. use_proxy: 是否使用代理
  43. proxy_url: 代理 URL
  44. timezone: 时区配置(如 'Asia/Shanghai')
  45. freshness_enabled: 是否启用新鲜度过滤
  46. default_max_age_days: 默认最大文章年龄(天)
  47. """
  48. self.feeds = [f for f in feeds if f.enabled]
  49. self.request_interval = request_interval
  50. self.timeout = timeout
  51. self.use_proxy = use_proxy
  52. self.proxy_url = proxy_url
  53. self.timezone = timezone
  54. self.freshness_enabled = freshness_enabled
  55. self.default_max_age_days = default_max_age_days
  56. self.parser = RSSParser()
  57. self.session = self._create_session()
  58. def _create_session(self) -> requests.Session:
  59. """创建请求会话"""
  60. session = requests.Session()
  61. session.headers.update({
  62. "User-Agent": "TrendRadar/2.0 RSS Reader (https://github.com/trendradar)",
  63. "Accept": "application/feed+json, application/json, application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
  64. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
  65. })
  66. if self.use_proxy and self.proxy_url:
  67. session.proxies = {
  68. "http": self.proxy_url,
  69. "https": self.proxy_url,
  70. }
  71. return session
  72. def _filter_by_freshness(
  73. self,
  74. items: List[RSSItem],
  75. feed: RSSFeedConfig,
  76. ) -> Tuple[List[RSSItem], int]:
  77. """
  78. 根据新鲜度过滤文章
  79. Args:
  80. items: 待过滤的文章列表
  81. feed: RSS 源配置
  82. Returns:
  83. (过滤后的文章列表, 被过滤的文章数)
  84. """
  85. # 如果全局禁用,直接返回
  86. if not self.freshness_enabled:
  87. return items, 0
  88. # 确定此 feed 的 max_age_days
  89. max_days = feed.max_age_days
  90. if max_days is None:
  91. max_days = self.default_max_age_days
  92. # 如果设为 0,禁用此 feed 的过滤
  93. if max_days == 0:
  94. return items, 0
  95. # 过滤逻辑:无发布时间的文章保留
  96. filtered = []
  97. for item in items:
  98. if not item.published_at:
  99. # 无发布时间,保留
  100. filtered.append(item)
  101. elif is_within_days(item.published_at, max_days, self.timezone):
  102. # 在指定天数内,保留
  103. filtered.append(item)
  104. # 否则过滤掉
  105. filtered_count = len(items) - len(filtered)
  106. return filtered, filtered_count
  107. def fetch_feed(self, feed: RSSFeedConfig) -> Tuple[List[RSSItem], Optional[str]]:
  108. """
  109. 抓取单个 RSS 源
  110. Args:
  111. feed: RSS 源配置
  112. Returns:
  113. (条目列表, 错误信息) 元组
  114. """
  115. try:
  116. response = self.session.get(feed.url, timeout=self.timeout)
  117. response.raise_for_status()
  118. parsed_items = self.parser.parse(response.text, feed.url)
  119. # 限制条目数量(0=不限制)
  120. if feed.max_items > 0:
  121. parsed_items = parsed_items[:feed.max_items]
  122. # 转换为 RSSItem(使用配置的时区)
  123. now = get_configured_time(self.timezone)
  124. crawl_time = now.strftime("%H:%M")
  125. items = []
  126. for parsed in parsed_items:
  127. item = RSSItem(
  128. title=parsed.title,
  129. feed_id=feed.id,
  130. feed_name=feed.name,
  131. url=parsed.url,
  132. guid=parsed.guid or "",
  133. published_at=parsed.published_at or "",
  134. summary=parsed.summary or "",
  135. author=parsed.author or "",
  136. crawl_time=crawl_time,
  137. first_time=crawl_time,
  138. last_time=crawl_time,
  139. count=1,
  140. )
  141. items.append(item)
  142. # 注意:新鲜度过滤已移至推送阶段(_convert_rss_items_to_list)
  143. # 这样所有文章都会存入数据库,但旧文章不会推送
  144. print(f"[RSS] {feed.name}: 获取 {len(items)} 条")
  145. return items, None
  146. except requests.Timeout:
  147. error = f"请求超时 ({self.timeout}s)"
  148. print(f"[RSS] {feed.name}: {error}")
  149. return [], error
  150. except requests.RequestException as e:
  151. error = f"请求失败: {e}"
  152. print(f"[RSS] {feed.name}: {error}")
  153. return [], error
  154. except ValueError as e:
  155. error = f"解析失败: {e}"
  156. print(f"[RSS] {feed.name}: {error}")
  157. return [], error
  158. except Exception as e:
  159. error = f"未知错误: {e}"
  160. print(f"[RSS] {feed.name}: {error}")
  161. return [], error
  162. def fetch_all(self) -> RSSData:
  163. """
  164. 抓取所有 RSS 源
  165. Returns:
  166. RSSData 对象
  167. """
  168. all_items: Dict[str, List[RSSItem]] = {}
  169. id_to_name: Dict[str, str] = {}
  170. failed_ids: List[str] = []
  171. # 使用配置的时区
  172. now = get_configured_time(self.timezone)
  173. crawl_time = now.strftime("%H:%M")
  174. crawl_date = now.strftime("%Y-%m-%d")
  175. print(f"[RSS] 开始抓取 {len(self.feeds)} 个 RSS 源...")
  176. for i, feed in enumerate(self.feeds):
  177. # 请求间隔(带随机波动)
  178. if i > 0:
  179. interval = self.request_interval / 1000
  180. jitter = random.uniform(-0.2, 0.2) * interval
  181. time.sleep(interval + jitter)
  182. items, error = self.fetch_feed(feed)
  183. id_to_name[feed.id] = feed.name
  184. if error:
  185. failed_ids.append(feed.id)
  186. else:
  187. all_items[feed.id] = items
  188. total_items = sum(len(items) for items in all_items.values())
  189. print(f"[RSS] 抓取完成: {len(all_items)} 个源成功, {len(failed_ids)} 个失败, 共 {total_items} 条")
  190. return RSSData(
  191. date=crawl_date,
  192. crawl_time=crawl_time,
  193. items=all_items,
  194. id_to_name=id_to_name,
  195. failed_ids=failed_ids,
  196. )
  197. @classmethod
  198. def from_config(cls, config: Dict) -> "RSSFetcher":
  199. """
  200. 从配置字典创建抓取器
  201. Args:
  202. config: 配置字典,格式如下:
  203. {
  204. "enabled": true,
  205. "request_interval": 2000,
  206. "freshness_filter": {
  207. "enabled": true,
  208. "max_age_days": 3
  209. },
  210. "feeds": [
  211. {"id": "hacker-news", "name": "Hacker News", "url": "...", "max_age_days": 1}
  212. ]
  213. }
  214. Returns:
  215. RSSFetcher 实例
  216. """
  217. # 读取新鲜度过滤配置
  218. freshness_config = config.get("freshness_filter", {})
  219. freshness_enabled = freshness_config.get("enabled", True) # 默认启用
  220. default_max_age_days = freshness_config.get("max_age_days", 3) # 默认3天
  221. feeds = []
  222. for feed_config in config.get("feeds", []):
  223. # 读取并验证单个 feed 的 max_age_days(可选)
  224. max_age_days_raw = feed_config.get("max_age_days")
  225. max_age_days = None
  226. if max_age_days_raw is not None:
  227. try:
  228. max_age_days = int(max_age_days_raw)
  229. if max_age_days < 0:
  230. feed_id = feed_config.get("id", "unknown")
  231. print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 为负数,将使用全局默认值")
  232. max_age_days = None
  233. except (ValueError, TypeError):
  234. feed_id = feed_config.get("id", "unknown")
  235. print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 格式错误:{max_age_days_raw}")
  236. max_age_days = None
  237. feed = RSSFeedConfig(
  238. id=feed_config.get("id", ""),
  239. name=feed_config.get("name", ""),
  240. url=feed_config.get("url", ""),
  241. max_items=feed_config.get("max_items", 0), # 0=不限制
  242. enabled=feed_config.get("enabled", True),
  243. max_age_days=max_age_days, # None=使用全局,0=禁用,>0=覆盖
  244. )
  245. if feed.id and feed.url:
  246. feeds.append(feed)
  247. return cls(
  248. feeds=feeds,
  249. request_interval=config.get("request_interval", 2000),
  250. timeout=config.get("timeout", 15),
  251. use_proxy=config.get("use_proxy", False),
  252. proxy_url=config.get("proxy_url", ""),
  253. timezone=config.get("timezone", DEFAULT_TIMEZONE),
  254. freshness_enabled=freshness_enabled,
  255. default_max_age_days=default_max_age_days,
  256. )