| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- # coding=utf-8
- """
- RSS 抓取器
- 负责从配置的 RSS 源抓取数据并转换为标准格式
- """
- import time
- import random
- from dataclasses import dataclass
- from datetime import datetime
- from typing import List, Dict, Optional, Tuple, Callable
- import requests
- from .parser import RSSParser, ParsedRSSItem
- from trendradar.storage.base import RSSItem, RSSData
- from trendradar.utils.time import get_configured_time, is_within_days, DEFAULT_TIMEZONE
- @dataclass
- class RSSFeedConfig:
- """RSS 源配置"""
- id: str # 源 ID
- name: str # 显示名称
- url: str # RSS URL
- max_items: int = 0 # 最大条目数(0=不限制)
- enabled: bool = True # 是否启用
- max_age_days: Optional[int] = None # 文章最大年龄(天),覆盖全局设置;None=使用全局,0=禁用过滤
- class RSSFetcher:
- """RSS 抓取器"""
- def __init__(
- self,
- feeds: List[RSSFeedConfig],
- request_interval: int = 2000,
- timeout: int = 15,
- use_proxy: bool = False,
- proxy_url: str = "",
- timezone: str = DEFAULT_TIMEZONE,
- freshness_enabled: bool = True,
- default_max_age_days: int = 3,
- ):
- """
- 初始化抓取器
- Args:
- feeds: RSS 源配置列表
- request_interval: 请求间隔(毫秒)
- timeout: 请求超时(秒)
- use_proxy: 是否使用代理
- proxy_url: 代理 URL
- timezone: 时区配置(如 'Asia/Shanghai')
- freshness_enabled: 是否启用新鲜度过滤
- default_max_age_days: 默认最大文章年龄(天)
- """
- self.feeds = [f for f in feeds if f.enabled]
- self.request_interval = request_interval
- self.timeout = timeout
- self.use_proxy = use_proxy
- self.proxy_url = proxy_url
- self.timezone = timezone
- self.freshness_enabled = freshness_enabled
- self.default_max_age_days = default_max_age_days
- self.parser = RSSParser()
- self.session = self._create_session()
- def _create_session(self) -> requests.Session:
- """创建请求会话"""
- session = requests.Session()
- session.headers.update({
- "User-Agent": "TrendRadar/2.0 RSS Reader (https://github.com/trendradar)",
- "Accept": "application/feed+json, application/json, application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- })
- if self.use_proxy and self.proxy_url:
- session.proxies = {
- "http": self.proxy_url,
- "https": self.proxy_url,
- }
- return session
- def _filter_by_freshness(
- self,
- items: List[RSSItem],
- feed: RSSFeedConfig,
- ) -> Tuple[List[RSSItem], int]:
- """
- 根据新鲜度过滤文章
- Args:
- items: 待过滤的文章列表
- feed: RSS 源配置
- Returns:
- (过滤后的文章列表, 被过滤的文章数)
- """
- # 如果全局禁用,直接返回
- if not self.freshness_enabled:
- return items, 0
- # 确定此 feed 的 max_age_days
- max_days = feed.max_age_days
- if max_days is None:
- max_days = self.default_max_age_days
- # 如果设为 0,禁用此 feed 的过滤
- if max_days == 0:
- return items, 0
- # 过滤逻辑:无发布时间的文章保留
- filtered = []
- for item in items:
- if not item.published_at:
- # 无发布时间,保留
- filtered.append(item)
- elif is_within_days(item.published_at, max_days, self.timezone):
- # 在指定天数内,保留
- filtered.append(item)
- # 否则过滤掉
- filtered_count = len(items) - len(filtered)
- return filtered, filtered_count
- def fetch_feed(self, feed: RSSFeedConfig) -> Tuple[List[RSSItem], Optional[str]]:
- """
- 抓取单个 RSS 源
- Args:
- feed: RSS 源配置
- Returns:
- (条目列表, 错误信息) 元组
- """
- try:
- response = self.session.get(feed.url, timeout=self.timeout)
- response.raise_for_status()
- parsed_items = self.parser.parse(response.text, feed.url)
- # 限制条目数量(0=不限制)
- if feed.max_items > 0:
- parsed_items = parsed_items[:feed.max_items]
- # 转换为 RSSItem(使用配置的时区)
- now = get_configured_time(self.timezone)
- crawl_time = now.strftime("%H:%M")
- items = []
- for parsed in parsed_items:
- item = RSSItem(
- title=parsed.title,
- feed_id=feed.id,
- feed_name=feed.name,
- url=parsed.url,
- published_at=parsed.published_at or "",
- summary=parsed.summary or "",
- author=parsed.author or "",
- crawl_time=crawl_time,
- first_time=crawl_time,
- last_time=crawl_time,
- count=1,
- )
- items.append(item)
- # 注意:新鲜度过滤已移至推送阶段(_convert_rss_items_to_list)
- # 这样所有文章都会存入数据库,但旧文章不会推送
- print(f"[RSS] {feed.name}: 获取 {len(items)} 条")
- return items, None
- except requests.Timeout:
- error = f"请求超时 ({self.timeout}s)"
- print(f"[RSS] {feed.name}: {error}")
- return [], error
- except requests.RequestException as e:
- error = f"请求失败: {e}"
- print(f"[RSS] {feed.name}: {error}")
- return [], error
- except ValueError as e:
- error = f"解析失败: {e}"
- print(f"[RSS] {feed.name}: {error}")
- return [], error
- except Exception as e:
- error = f"未知错误: {e}"
- print(f"[RSS] {feed.name}: {error}")
- return [], error
- def fetch_all(self) -> RSSData:
- """
- 抓取所有 RSS 源
- Returns:
- RSSData 对象
- """
- all_items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- failed_ids: List[str] = []
- # 使用配置的时区
- now = get_configured_time(self.timezone)
- crawl_time = now.strftime("%H:%M")
- crawl_date = now.strftime("%Y-%m-%d")
- print(f"[RSS] 开始抓取 {len(self.feeds)} 个 RSS 源...")
- for i, feed in enumerate(self.feeds):
- # 请求间隔(带随机波动)
- if i > 0:
- interval = self.request_interval / 1000
- jitter = random.uniform(-0.2, 0.2) * interval
- time.sleep(interval + jitter)
- items, error = self.fetch_feed(feed)
- id_to_name[feed.id] = feed.name
- if error:
- failed_ids.append(feed.id)
- else:
- all_items[feed.id] = items
- total_items = sum(len(items) for items in all_items.values())
- print(f"[RSS] 抓取完成: {len(all_items)} 个源成功, {len(failed_ids)} 个失败, 共 {total_items} 条")
- return RSSData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=all_items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- @classmethod
- def from_config(cls, config: Dict) -> "RSSFetcher":
- """
- 从配置字典创建抓取器
- Args:
- config: 配置字典,格式如下:
- {
- "enabled": true,
- "request_interval": 2000,
- "freshness_filter": {
- "enabled": true,
- "max_age_days": 3
- },
- "feeds": [
- {"id": "hacker-news", "name": "Hacker News", "url": "...", "max_age_days": 1}
- ]
- }
- Returns:
- RSSFetcher 实例
- """
- # 读取新鲜度过滤配置
- freshness_config = config.get("freshness_filter", {})
- freshness_enabled = freshness_config.get("enabled", True) # 默认启用
- default_max_age_days = freshness_config.get("max_age_days", 3) # 默认3天
- feeds = []
- for feed_config in config.get("feeds", []):
- # 读取并验证单个 feed 的 max_age_days(可选)
- max_age_days_raw = feed_config.get("max_age_days")
- max_age_days = None
- if max_age_days_raw is not None:
- try:
- max_age_days = int(max_age_days_raw)
- if max_age_days < 0:
- feed_id = feed_config.get("id", "unknown")
- print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 为负数,将使用全局默认值")
- max_age_days = None
- except (ValueError, TypeError):
- feed_id = feed_config.get("id", "unknown")
- print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 格式错误:{max_age_days_raw}")
- max_age_days = None
- feed = RSSFeedConfig(
- id=feed_config.get("id", ""),
- name=feed_config.get("name", ""),
- url=feed_config.get("url", ""),
- max_items=feed_config.get("max_items", 0), # 0=不限制
- enabled=feed_config.get("enabled", True),
- max_age_days=max_age_days, # None=使用全局,0=禁用,>0=覆盖
- )
- if feed.id and feed.url:
- feeds.append(feed)
- return cls(
- feeds=feeds,
- request_interval=config.get("request_interval", 2000),
- timeout=config.get("timeout", 15),
- use_proxy=config.get("use_proxy", False),
- proxy_url=config.get("proxy_url", ""),
- timezone=config.get("timezone", DEFAULT_TIMEZONE),
- freshness_enabled=freshness_enabled,
- default_max_age_days=default_max_age_days,
- )
|