| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840 |
- """
- 数据访问服务
- 提供统一的数据查询接口,封装数据访问逻辑。
- """
- import re
- from collections import Counter
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Tuple
- from .cache_service import get_cache
- from .parser_service import ParserService
- from ..utils.errors import DataNotFoundError
- class DataService:
- """数据访问服务类"""
- # 中文停用词列表(用于 auto_extract 模式)
- STOPWORDS = {
- '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一',
- '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有',
- '看', '好', '自己', '这', '那', '来', '被', '与', '为', '对', '将', '从',
- '以', '及', '等', '但', '或', '而', '于', '中', '由', '可', '可以', '已',
- '已经', '还', '更', '最', '再', '因为', '所以', '如果', '虽然', '然而',
- '什么', '怎么', '如何', '哪', '哪些', '多少', '几', '这个', '那个',
- '他', '她', '它', '他们', '她们', '我们', '你们', '大家', '自己',
- '这样', '那样', '怎样', '这么', '那么', '多么', '非常', '特别',
- '应该', '可能', '能够', '需要', '必须', '一定', '肯定', '确实',
- '正在', '已经', '曾经', '将要', '即将', '刚刚', '马上', '立刻',
- '回应', '发布', '表示', '称', '曝', '官方', '最新', '重磅', '突发',
- '热搜', '刷屏', '引发', '关注', '网友', '评论', '转发', '点赞'
- }
- def __init__(self, project_root: str = None):
- """
- 初始化数据服务
- Args:
- project_root: 项目根目录
- """
- self.parser = ParserService(project_root)
- self.cache = get_cache()
- def get_latest_news(
- self,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> List[Dict]:
- """
- 获取最新一批爬取的新闻数据
- Args:
- platforms: 平台ID列表,None表示所有平台
- limit: 返回条数限制
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- 新闻列表
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 尝试从缓存获取
- cache_key = f"latest_news:{','.join(platforms or [])}:{limit}:{include_url}"
- cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
- if cached:
- return cached
- # 读取今天的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
- date=None,
- platform_ids=platforms
- )
- # 获取最新的文件时间
- if timestamps:
- latest_timestamp = max(timestamps.values())
- fetch_time = datetime.fromtimestamp(latest_timestamp)
- else:
- fetch_time = datetime.now()
- # 转换为新闻列表
- news_list = []
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- # 取第一个排名
- rank = info["ranks"][0] if info["ranks"] else 0
- news_item = {
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "rank": rank,
- "timestamp": fetch_time.strftime("%Y-%m-%d %H:%M:%S")
- }
- # 条件性添加 URL 字段
- if include_url:
- news_item["url"] = info.get("url", "")
- news_item["mobileUrl"] = info.get("mobileUrl", "")
- news_list.append(news_item)
- # 按排名排序
- news_list.sort(key=lambda x: x["rank"])
- # 限制返回数量
- result = news_list[:limit]
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def get_news_by_date(
- self,
- target_date: datetime,
- platforms: Optional[List[str]] = None,
- limit: int = 50,
- include_url: bool = False
- ) -> List[Dict]:
- """
- 按指定日期获取新闻
- Args:
- target_date: 目标日期
- platforms: 平台ID列表,None表示所有平台
- limit: 返回条数限制
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- 新闻列表
- Raises:
- DataNotFoundError: 数据不存在
- Examples:
- >>> service = DataService()
- >>> news = service.get_news_by_date(
- ... target_date=datetime(2025, 10, 10),
- ... platforms=['zhihu'],
- ... limit=20
- ... )
- """
- # 尝试从缓存获取
- date_str = target_date.strftime("%Y-%m-%d")
- cache_key = f"news_by_date:{date_str}:{','.join(platforms or [])}:{limit}:{include_url}"
- cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
- if cached:
- return cached
- # 读取指定日期的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
- date=target_date,
- platform_ids=platforms
- )
- # 转换为新闻列表
- news_list = []
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- # 计算平均排名
- avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
- news_item = {
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "rank": info["ranks"][0] if info["ranks"] else 0,
- "avg_rank": round(avg_rank, 2),
- "count": len(info["ranks"]),
- "date": date_str
- }
- # 条件性添加 URL 字段
- if include_url:
- news_item["url"] = info.get("url", "")
- news_item["mobileUrl"] = info.get("mobileUrl", "")
- news_list.append(news_item)
- # 按排名排序
- news_list.sort(key=lambda x: x["rank"])
- # 限制返回数量
- result = news_list[:limit]
- # 缓存结果(历史数据缓存更久)
- self.cache.set(cache_key, result)
- return result
- def search_news_by_keyword(
- self,
- keyword: str,
- date_range: Optional[Tuple[datetime, datetime]] = None,
- platforms: Optional[List[str]] = None,
- limit: Optional[int] = None
- ) -> Dict:
- """
- 按关键词搜索新闻
- Args:
- keyword: 搜索关键词
- date_range: 日期范围 (start_date, end_date)
- platforms: 平台过滤列表
- limit: 返回条数限制(可选)
- Returns:
- 搜索结果字典
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 确定搜索日期范围
- if date_range:
- start_date, end_date = date_range
- else:
- # 默认搜索今天
- start_date = end_date = datetime.now()
- # 收集所有匹配的新闻
- results = []
- platform_distribution = Counter()
- # 遍历日期范围
- current_date = start_date
- while current_date <= end_date:
- try:
- all_titles, id_to_name, _ = self.parser.read_all_titles_for_date(
- date=current_date,
- platform_ids=platforms
- )
- # 搜索包含关键词的标题
- for platform_id, titles in all_titles.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles.items():
- if keyword.lower() in title.lower():
- # 计算平均排名
- avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
- results.append({
- "title": title,
- "platform": platform_id,
- "platform_name": platform_name,
- "ranks": info["ranks"],
- "count": len(info["ranks"]),
- "avg_rank": round(avg_rank, 2),
- "url": info.get("url", ""),
- "mobileUrl": info.get("mobileUrl", ""),
- "date": current_date.strftime("%Y-%m-%d")
- })
- platform_distribution[platform_id] += 1
- except DataNotFoundError:
- # 该日期没有数据,继续下一天
- pass
- # 下一天
- current_date += timedelta(days=1)
- if not results:
- raise DataNotFoundError(
- f"未找到包含关键词 '{keyword}' 的新闻",
- suggestion="请尝试其他关键词或扩大日期范围"
- )
- # 计算统计信息
- total_ranks = []
- for item in results:
- total_ranks.extend(item["ranks"])
- avg_rank = sum(total_ranks) / len(total_ranks) if total_ranks else 0
- # 限制返回数量(如果指定)
- total_found = len(results)
- if limit is not None and limit > 0:
- results = results[:limit]
- return {
- "results": results,
- "total": len(results),
- "total_found": total_found,
- "statistics": {
- "platform_distribution": dict(platform_distribution),
- "avg_rank": round(avg_rank, 2),
- "keyword": keyword
- }
- }
- def _extract_words_from_title(self, title: str, min_length: int = 2) -> List[str]:
- """
- 从标题中提取有意义的词语(用于 auto_extract 模式)
- Args:
- title: 新闻标题
- min_length: 最小词长
- Returns:
- 关键词列表
- """
- # 移除URL和特殊字符
- title = re.sub(r'http[s]?://\S+', '', title)
- title = re.sub(r'\[.*?\]', '', title) # 移除方括号内容
- title = re.sub(r'[【】《》「」『』""''・·•]', '', title) # 移除中文标点
- # 使用正则表达式分词(中文和英文)
- # 匹配连续的中文字符或英文单词
- words = re.findall(r'[\u4e00-\u9fff]{2,}|[a-zA-Z]{2,}[a-zA-Z0-9]*', title)
- # 过滤停用词和短词
- keywords = [
- word for word in words
- if word and len(word) >= min_length and word.lower() not in self.STOPWORDS
- and word not in self.STOPWORDS
- ]
- return keywords
- def get_trending_topics(
- self,
- top_n: int = 10,
- mode: str = "current",
- extract_mode: str = "keywords"
- ) -> Dict:
- """
- 获取热点话题统计
- Args:
- top_n: 返回TOP N话题
- mode: 时间模式
- - "daily": 当日累计数据统计
- - "current": 最新一批数据统计(默认)
- extract_mode: 提取模式
- - "keywords": 统计预设关注词(基于 config/frequency_words.txt)
- - "auto_extract": 自动从新闻标题提取高频词
- Returns:
- 话题频率统计字典
- Raises:
- DataNotFoundError: 数据不存在
- """
- # 尝试从缓存获取
- cache_key = f"trending_topics:{top_n}:{mode}:{extract_mode}"
- cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
- if cached:
- return cached
- # 读取今天的数据
- all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date()
- if not all_titles:
- raise DataNotFoundError(
- "未找到今天的新闻数据",
- suggestion="请确保爬虫已经运行并生成了数据"
- )
- # 根据 mode 选择要处理的标题数据
- if mode == "daily":
- titles_to_process = all_titles
- elif mode == "current":
- titles_to_process = all_titles # 简化实现
- else:
- raise ValueError(f"不支持的模式: {mode}。支持的模式: daily, current")
- # 统计词频
- word_frequency = Counter()
- keyword_to_news = {}
- # 预加载关键词数据(避免在循环内重复调用)
- if extract_mode == "keywords":
- from trendradar.core.frequency import _word_matches
- word_groups = self.parser.parse_frequency_words()
- # 遍历要处理的标题
- for platform_id, titles in titles_to_process.items():
- for title in titles.keys():
- if extract_mode == "keywords":
- # 基于预设关键词统计(支持正则匹配)
- title_lower = title.lower()
- for group in word_groups:
- all_words = group.get("required", []) + group.get("normal", [])
- # 检查是否匹配词组中的任意一个词
- matched = any(_word_matches(word_config, title_lower) for word_config in all_words)
- if matched:
- # 使用组的 display_name(组别名或行别名拼接)
- display_key = group.get("display_name") or group.get("group_key", "")
- word_frequency[display_key] += 1
- if display_key not in keyword_to_news:
- keyword_to_news[display_key] = []
- keyword_to_news[display_key].append(title)
- break # 每个标题只计入第一个匹配的词组
- elif extract_mode == "auto_extract":
- # 自动提取关键词
- extracted_words = self._extract_words_from_title(title)
- for word in extracted_words:
- word_frequency[word] += 1
- if word not in keyword_to_news:
- keyword_to_news[word] = []
- keyword_to_news[word].append(title)
- # 获取TOP N关键词
- top_keywords = word_frequency.most_common(top_n)
- # 构建话题列表
- topics = []
- for keyword, frequency in top_keywords:
- matched_news = keyword_to_news.get(keyword, [])
- topics.append({
- "keyword": keyword,
- "frequency": frequency,
- "matched_news": len(set(matched_news)), # 去重后的新闻数量
- "trend": "stable",
- "weight_score": 0.0
- })
- # 构建结果
- result = {
- "topics": topics,
- "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "mode": mode,
- "extract_mode": extract_mode,
- "total_keywords": len(word_frequency),
- "description": self._get_mode_description(mode, extract_mode)
- }
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def _get_mode_description(self, mode: str, extract_mode: str = "keywords") -> str:
- """获取模式描述"""
- mode_desc = {
- "daily": "当日累计统计",
- "current": "最新一批统计"
- }.get(mode, "未知时间模式")
- extract_desc = {
- "keywords": "基于预设关注词",
- "auto_extract": "自动提取高频词"
- }.get(extract_mode, "未知提取模式")
- return f"{mode_desc} - {extract_desc}"
- def get_current_config(self, section: str = "all") -> Dict:
- """
- 获取当前系统配置
- Args:
- section: 配置节 - all/crawler/push/keywords/weights
- Returns:
- 配置字典
- Raises:
- FileParseError: 配置文件解析错误
- """
- # 解析配置文件
- config_data = self.parser.parse_yaml_config()
- word_groups = self.parser.parse_frequency_words()
- # 根据section返回对应配置
- advanced = config_data.get("advanced", {})
- advanced_crawler = advanced.get("crawler", {})
- platforms_config = config_data.get("platforms", {})
- if section == "all" or section == "crawler":
- crawler_config = {
- "enable_crawler": platforms_config.get("enabled", True),
- "use_proxy": advanced_crawler.get("use_proxy", False),
- "request_interval": advanced_crawler.get("request_interval", 1),
- "retry_times": 3,
- "platforms": [p["id"] for p in platforms_config.get("sources", [])]
- }
- if section == "all" or section == "push":
- notification = config_data.get("notification", {})
- batch_size = advanced.get("batch_size", {})
- push_config = {
- "enable_notification": notification.get("enabled", True),
- "enabled_channels": [],
- "message_batch_size": batch_size.get("default", 4000),
- "push_window": {} # 已迁移至调度系统(schedule + timeline.yaml)
- }
- # 检测已配置的通知渠道(合并 config.yaml + .env)
- from trendradar.core.loader import _load_webhook_config
- webhook_config = _load_webhook_config(config_data)
- channel_checks = {
- "feishu": [webhook_config.get("FEISHU_WEBHOOK_URL")],
- "dingtalk": [webhook_config.get("DINGTALK_WEBHOOK_URL")],
- "wework": [webhook_config.get("WEWORK_WEBHOOK_URL")],
- "telegram": [webhook_config.get("TELEGRAM_BOT_TOKEN"), webhook_config.get("TELEGRAM_CHAT_ID")],
- "email": [webhook_config.get("EMAIL_FROM"), webhook_config.get("EMAIL_PASSWORD"), webhook_config.get("EMAIL_TO")],
- "ntfy": [webhook_config.get("NTFY_SERVER_URL"), webhook_config.get("NTFY_TOPIC")],
- "bark": [webhook_config.get("BARK_URL")],
- "slack": [webhook_config.get("SLACK_WEBHOOK_URL")],
- "generic_webhook": [webhook_config.get("GENERIC_WEBHOOK_URL")],
- }
- for ch_id, required_values in channel_checks.items():
- if all(required_values):
- push_config["enabled_channels"].append(ch_id)
- if section == "all" or section == "keywords":
- keywords_config = {
- "word_groups": word_groups,
- "total_groups": len(word_groups)
- }
- if section == "all" or section == "weights":
- weight = advanced.get("weight", {})
- weights_config = {
- "rank_weight": weight.get("rank", 0.6),
- "frequency_weight": weight.get("frequency", 0.3),
- "hotness_weight": weight.get("hotness", 0.1)
- }
- # 组装结果
- if section == "all":
- result = {
- "crawler": crawler_config,
- "push": push_config,
- "keywords": keywords_config,
- "weights": weights_config
- }
- elif section == "crawler":
- result = crawler_config
- elif section == "push":
- result = push_config
- elif section == "keywords":
- result = keywords_config
- elif section == "weights":
- result = weights_config
- else:
- result = {}
- return result
- def get_available_date_range(self, db_type: str = "news") -> Tuple[Optional[datetime], Optional[datetime]]:
- """
- 扫描 output 目录,返回实际可用的日期范围
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- (最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
- Examples:
- >>> service = DataService()
- >>> earliest, latest = service.get_available_date_range()
- >>> print(f"可用日期范围:{earliest} 至 {latest}")
- """
- return self.parser.get_available_date_range(db_type)
- def get_system_status(self) -> Dict:
- """
- 获取系统运行状态
- Returns:
- 系统状态字典
- """
- # 获取数据统计
- output_dir = self.parser.project_root / "output"
- total_storage = 0
- # 使用 parser 的方法获取日期范围
- oldest_record, latest_record = self.get_available_date_range(db_type="news")
- # 计算 output 目录总存储大小
- if output_dir.exists():
- for item in output_dir.rglob("*"):
- if item.is_file():
- total_storage += item.stat().st_size
- # 读取版本信息
- version_file = self.parser.project_root / "version"
- version = "unknown"
- if version_file.exists():
- try:
- with open(version_file, "r") as f:
- version = f.read().strip()
- except:
- pass
- return {
- "system": {
- "version": version,
- "project_root": str(self.parser.project_root)
- },
- "data": {
- "total_storage": f"{total_storage / 1024 / 1024:.2f} MB",
- "oldest_record": oldest_record.strftime("%Y-%m-%d") if oldest_record else None,
- "latest_record": latest_record.strftime("%Y-%m-%d") if latest_record else None,
- },
- "cache": self.cache.get_stats(),
- "health": "healthy"
- }
- # ========================================
- # RSS 数据查询方法
- # ========================================
- def get_latest_rss(
- self,
- feeds: Optional[List[str]] = None,
- days: int = 1,
- limit: int = 50,
- include_summary: bool = False
- ) -> List[Dict]:
- """
- 获取最新的 RSS 数据(支持多日查询)
- Args:
- feeds: RSS 源 ID 列表,None 表示所有源
- days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
- limit: 返回条数限制
- include_summary: 是否包含摘要,默认 False(节省 token)
- Returns:
- RSS 条目列表(按 URL 去重)
- Raises:
- DataNotFoundError: 数据不存在
- """
- days = min(max(days, 1), 30) # 限制 1-30 天
- cache_key = f"latest_rss:{','.join(feeds or [])}:{days}:{limit}:{include_summary}"
- cached = self.cache.get(cache_key, ttl=900)
- if cached:
- return cached
- rss_list = []
- seen_urls = set() # 跨日期 URL 去重
- today = datetime.now()
- for i in range(days):
- target_date = today - timedelta(days=i)
- try:
- all_items, id_to_name, timestamps = self.parser.read_all_titles_for_date(
- date=target_date,
- platform_ids=feeds,
- db_type="rss"
- )
- # 获取抓取时间
- if timestamps:
- latest_timestamp = max(timestamps.values())
- fetch_time = datetime.fromtimestamp(latest_timestamp)
- else:
- fetch_time = target_date
- # 转换为列表
- for feed_id, items in all_items.items():
- feed_name = id_to_name.get(feed_id, feed_id)
- for title, info in items.items():
- # 跨日期 URL 去重
- url = info.get("url", "")
- if url and url in seen_urls:
- continue
- if url:
- seen_urls.add(url)
- rss_item = {
- "title": title,
- "feed_id": feed_id,
- "feed_name": feed_name,
- "url": url,
- "published_at": info.get("published_at", ""),
- "author": info.get("author", ""),
- "date": target_date.strftime("%Y-%m-%d"),
- "fetch_time": fetch_time.strftime("%Y-%m-%d %H:%M:%S") if isinstance(fetch_time, datetime) else target_date.strftime("%Y-%m-%d")
- }
- if include_summary:
- rss_item["summary"] = info.get("summary", "")
- rss_list.append(rss_item)
- except DataNotFoundError:
- continue
- # 按发布时间排序(最新的在前)
- rss_list.sort(key=lambda x: x.get("published_at", ""), reverse=True)
- # 限制返回数量
- result = rss_list[:limit]
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def search_rss(
- self,
- keyword: str,
- feeds: Optional[List[str]] = None,
- days: int = 7,
- limit: int = 50,
- include_summary: bool = False
- ) -> List[Dict]:
- """
- 搜索 RSS 数据(跨日期自动去重)
- Args:
- keyword: 搜索关键词
- feeds: RSS 源 ID 列表,None 表示所有源
- days: 搜索最近 N 天的数据
- limit: 返回条数限制
- include_summary: 是否包含摘要
- Returns:
- 匹配的 RSS 条目列表(按 URL 去重)
- """
- cache_key = f"search_rss:{keyword}:{','.join(feeds or [])}:{days}:{limit}:{include_summary}"
- cached = self.cache.get(cache_key, ttl=900)
- if cached:
- return cached
- results = []
- seen_urls = set() # 用于 URL 去重
- today = datetime.now()
- for i in range(days):
- target_date = today - timedelta(days=i)
- try:
- all_items, id_to_name, _ = self.parser.read_all_titles_for_date(
- date=target_date,
- platform_ids=feeds,
- db_type="rss"
- )
- for feed_id, items in all_items.items():
- feed_name = id_to_name.get(feed_id, feed_id)
- for title, info in items.items():
- # 跨日期去重:如果 URL 已出现过则跳过
- url = info.get("url", "")
- if url and url in seen_urls:
- continue
- if url:
- seen_urls.add(url)
- # 关键词匹配(标题或摘要)
- summary = info.get("summary", "")
- if keyword.lower() in title.lower() or keyword.lower() in summary.lower():
- rss_item = {
- "title": title,
- "feed_id": feed_id,
- "feed_name": feed_name,
- "url": url,
- "published_at": info.get("published_at", ""),
- "author": info.get("author", ""),
- "date": target_date.strftime("%Y-%m-%d")
- }
- if include_summary:
- rss_item["summary"] = summary
- results.append(rss_item)
- except DataNotFoundError:
- continue
- # 按发布时间排序
- results.sort(key=lambda x: x.get("published_at", ""), reverse=True)
- # 限制返回数量
- result = results[:limit]
- # 缓存结果
- self.cache.set(cache_key, result)
- return result
- def get_rss_feeds_status(self) -> Dict:
- """
- 获取 RSS 源状态
- Returns:
- RSS 源状态信息
- """
- cache_key = "rss_feeds_status"
- cached = self.cache.get(cache_key, ttl=900)
- if cached:
- return cached
- # 获取可用的 RSS 日期
- available_dates = self.parser.get_available_dates(db_type="rss")
- # 获取今天的 RSS 数据统计
- today_stats = {}
- try:
- all_items, id_to_name, _ = self.parser.read_all_titles_for_date(
- date=None,
- platform_ids=None,
- db_type="rss"
- )
- for feed_id, items in all_items.items():
- today_stats[feed_id] = {
- "name": id_to_name.get(feed_id, feed_id),
- "item_count": len(items)
- }
- except DataNotFoundError:
- pass
- result = {
- "available_dates": available_dates[:10], # 最近 10 天
- "total_dates": len(available_dates),
- "today_feeds": today_stats,
- "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- }
- self.cache.set(cache_key, result)
- return result
|