data_service.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892
  1. """
  2. 数据访问服务
  3. 提供统一的数据查询接口,封装数据访问逻辑。
  4. """
  5. import re
  6. from collections import Counter
  7. from datetime import datetime, timedelta
  8. from typing import Dict, List, Optional, Tuple
  9. from .cache_service import get_cache
  10. from .parser_service import ParserService
  11. from ..utils.errors import DataNotFoundError
  12. class DataService:
  13. """数据访问服务类"""
  14. # 中文停用词列表(用于 auto_extract 模式)
  15. STOPWORDS = {
  16. '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一',
  17. '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有',
  18. '看', '好', '自己', '这', '那', '来', '被', '与', '为', '对', '将', '从',
  19. '以', '及', '等', '但', '或', '而', '于', '中', '由', '可', '可以', '已',
  20. '已经', '还', '更', '最', '再', '因为', '所以', '如果', '虽然', '然而',
  21. '什么', '怎么', '如何', '哪', '哪些', '多少', '几', '这个', '那个',
  22. '他', '她', '它', '他们', '她们', '我们', '你们', '大家', '自己',
  23. '这样', '那样', '怎样', '这么', '那么', '多么', '非常', '特别',
  24. '应该', '可能', '能够', '需要', '必须', '一定', '肯定', '确实',
  25. '正在', '已经', '曾经', '将要', '即将', '刚刚', '马上', '立刻',
  26. '回应', '发布', '表示', '称', '曝', '官方', '最新', '重磅', '突发',
  27. '热搜', '刷屏', '引发', '关注', '网友', '评论', '转发', '点赞'
  28. }
  29. def __init__(self, project_root: str = None):
  30. """
  31. 初始化数据服务
  32. Args:
  33. project_root: 项目根目录
  34. """
  35. self.parser = ParserService(project_root)
  36. self.cache = get_cache()
  37. def get_latest_news(
  38. self,
  39. platforms: Optional[List[str]] = None,
  40. limit: int = 50,
  41. include_url: bool = False
  42. ) -> List[Dict]:
  43. """
  44. 获取最新一批爬取的新闻数据
  45. Args:
  46. platforms: 平台ID列表,None表示所有平台
  47. limit: 返回条数限制
  48. include_url: 是否包含URL链接,默认False(节省token)
  49. Returns:
  50. 新闻列表
  51. Raises:
  52. DataNotFoundError: 数据不存在
  53. """
  54. # 尝试从缓存获取
  55. cache_key = f"latest_news:{','.join(platforms or [])}:{limit}:{include_url}"
  56. cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
  57. if cached:
  58. return cached
  59. # 读取今天的数据
  60. all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
  61. date=None,
  62. platform_ids=platforms
  63. )
  64. # 获取最新的文件时间
  65. if timestamps:
  66. latest_timestamp = max(timestamps.values())
  67. fetch_time = datetime.fromtimestamp(latest_timestamp)
  68. else:
  69. fetch_time = datetime.now()
  70. # 转换为新闻列表
  71. news_list = []
  72. for platform_id, titles in all_titles.items():
  73. platform_name = id_to_name.get(platform_id, platform_id)
  74. for title, info in titles.items():
  75. # 取第一个排名
  76. rank = info["ranks"][0] if info["ranks"] else 0
  77. news_item = {
  78. "title": title,
  79. "platform": platform_id,
  80. "platform_name": platform_name,
  81. "rank": rank,
  82. "timestamp": fetch_time.strftime("%Y-%m-%d %H:%M:%S")
  83. }
  84. # 条件性添加 URL 字段
  85. if include_url:
  86. news_item["url"] = info.get("url", "")
  87. news_item["mobileUrl"] = info.get("mobileUrl", "")
  88. news_list.append(news_item)
  89. # 按排名排序
  90. news_list.sort(key=lambda x: x["rank"])
  91. # 限制返回数量
  92. result = news_list[:limit]
  93. # 缓存结果
  94. self.cache.set(cache_key, result)
  95. return result
  96. def get_news_by_date(
  97. self,
  98. target_date: datetime,
  99. platforms: Optional[List[str]] = None,
  100. limit: int = 50,
  101. include_url: bool = False
  102. ) -> List[Dict]:
  103. """
  104. 按指定日期获取新闻
  105. Args:
  106. target_date: 目标日期
  107. platforms: 平台ID列表,None表示所有平台
  108. limit: 返回条数限制
  109. include_url: 是否包含URL链接,默认False(节省token)
  110. Returns:
  111. 新闻列表
  112. Raises:
  113. DataNotFoundError: 数据不存在
  114. Examples:
  115. >>> service = DataService()
  116. >>> news = service.get_news_by_date(
  117. ... target_date=datetime(2025, 10, 10),
  118. ... platforms=['zhihu'],
  119. ... limit=20
  120. ... )
  121. """
  122. # 尝试从缓存获取
  123. date_str = target_date.strftime("%Y-%m-%d")
  124. cache_key = f"news_by_date:{date_str}:{','.join(platforms or [])}:{limit}:{include_url}"
  125. cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
  126. if cached:
  127. return cached
  128. # 读取指定日期的数据
  129. all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date(
  130. date=target_date,
  131. platform_ids=platforms
  132. )
  133. # 转换为新闻列表
  134. news_list = []
  135. for platform_id, titles in all_titles.items():
  136. platform_name = id_to_name.get(platform_id, platform_id)
  137. for title, info in titles.items():
  138. # 计算平均排名
  139. avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
  140. news_item = {
  141. "title": title,
  142. "platform": platform_id,
  143. "platform_name": platform_name,
  144. "rank": info["ranks"][0] if info["ranks"] else 0,
  145. "avg_rank": round(avg_rank, 2),
  146. "count": len(info["ranks"]),
  147. "date": date_str
  148. }
  149. # 条件性添加 URL 字段
  150. if include_url:
  151. news_item["url"] = info.get("url", "")
  152. news_item["mobileUrl"] = info.get("mobileUrl", "")
  153. news_list.append(news_item)
  154. # 按排名排序
  155. news_list.sort(key=lambda x: x["rank"])
  156. # 限制返回数量
  157. result = news_list[:limit]
  158. # 缓存结果(历史数据缓存更久)
  159. self.cache.set(cache_key, result)
  160. return result
  161. def search_news_by_keyword(
  162. self,
  163. keyword: str,
  164. date_range: Optional[Tuple[datetime, datetime]] = None,
  165. platforms: Optional[List[str]] = None,
  166. limit: Optional[int] = None
  167. ) -> Dict:
  168. """
  169. 按关键词搜索新闻
  170. Args:
  171. keyword: 搜索关键词
  172. date_range: 日期范围 (start_date, end_date)
  173. platforms: 平台过滤列表
  174. limit: 返回条数限制(可选)
  175. Returns:
  176. 搜索结果字典
  177. Raises:
  178. DataNotFoundError: 数据不存在
  179. """
  180. # 确定搜索日期范围
  181. if date_range:
  182. start_date, end_date = date_range
  183. else:
  184. # 默认搜索今天
  185. start_date = end_date = datetime.now()
  186. # 收集所有匹配的新闻
  187. results = []
  188. platform_distribution = Counter()
  189. # 遍历日期范围
  190. current_date = start_date
  191. while current_date <= end_date:
  192. try:
  193. all_titles, id_to_name, _ = self.parser.read_all_titles_for_date(
  194. date=current_date,
  195. platform_ids=platforms
  196. )
  197. # 搜索包含关键词的标题
  198. for platform_id, titles in all_titles.items():
  199. platform_name = id_to_name.get(platform_id, platform_id)
  200. for title, info in titles.items():
  201. if keyword.lower() in title.lower():
  202. # 计算平均排名
  203. avg_rank = sum(info["ranks"]) / len(info["ranks"]) if info["ranks"] else 0
  204. results.append({
  205. "title": title,
  206. "platform": platform_id,
  207. "platform_name": platform_name,
  208. "ranks": info["ranks"],
  209. "count": len(info["ranks"]),
  210. "avg_rank": round(avg_rank, 2),
  211. "url": info.get("url", ""),
  212. "mobileUrl": info.get("mobileUrl", ""),
  213. "date": current_date.strftime("%Y-%m-%d")
  214. })
  215. platform_distribution[platform_id] += 1
  216. except DataNotFoundError:
  217. # 该日期没有数据,继续下一天
  218. pass
  219. # 下一天
  220. current_date += timedelta(days=1)
  221. if not results:
  222. raise DataNotFoundError(
  223. f"未找到包含关键词 '{keyword}' 的新闻",
  224. suggestion="请尝试其他关键词或扩大日期范围"
  225. )
  226. # 计算统计信息
  227. total_ranks = []
  228. for item in results:
  229. total_ranks.extend(item["ranks"])
  230. avg_rank = sum(total_ranks) / len(total_ranks) if total_ranks else 0
  231. # 限制返回数量(如果指定)
  232. total_found = len(results)
  233. if limit is not None and limit > 0:
  234. results = results[:limit]
  235. return {
  236. "results": results,
  237. "total": len(results),
  238. "total_found": total_found,
  239. "statistics": {
  240. "platform_distribution": dict(platform_distribution),
  241. "avg_rank": round(avg_rank, 2),
  242. "keyword": keyword
  243. }
  244. }
  245. def _extract_words_from_title(self, title: str, min_length: int = 2) -> List[str]:
  246. """
  247. 从标题中提取有意义的词语(用于 auto_extract 模式)
  248. Args:
  249. title: 新闻标题
  250. min_length: 最小词长
  251. Returns:
  252. 关键词列表
  253. """
  254. # 移除URL和特殊字符
  255. title = re.sub(r'http[s]?://\S+', '', title)
  256. title = re.sub(r'\[.*?\]', '', title) # 移除方括号内容
  257. title = re.sub(r'[【】《》「」『』""''・·•]', '', title) # 移除中文标点
  258. # 使用正则表达式分词(中文和英文)
  259. # 匹配连续的中文字符或英文单词
  260. words = re.findall(r'[\u4e00-\u9fff]{2,}|[a-zA-Z]{2,}[a-zA-Z0-9]*', title)
  261. # 过滤停用词和短词
  262. keywords = [
  263. word for word in words
  264. if word and len(word) >= min_length and word.lower() not in self.STOPWORDS
  265. and word not in self.STOPWORDS
  266. ]
  267. return keywords
  268. def get_trending_topics(
  269. self,
  270. top_n: int = 10,
  271. mode: str = "current",
  272. extract_mode: str = "keywords"
  273. ) -> Dict:
  274. """
  275. 获取热点话题统计
  276. Args:
  277. top_n: 返回TOP N话题
  278. mode: 时间模式
  279. - "daily": 当日累计数据统计
  280. - "current": 最新一批数据统计(默认)
  281. extract_mode: 提取模式
  282. - "keywords": 统计预设关注词(基于 config/frequency_words.txt)
  283. - "auto_extract": 自动从新闻标题提取高频词
  284. Returns:
  285. 话题频率统计字典
  286. Raises:
  287. DataNotFoundError: 数据不存在
  288. """
  289. # 尝试从缓存获取
  290. cache_key = f"trending_topics:{top_n}:{mode}:{extract_mode}"
  291. cached = self.cache.get(cache_key, ttl=900) # 15分钟缓存
  292. if cached:
  293. return cached
  294. # 读取今天的数据
  295. all_titles, id_to_name, timestamps = self.parser.read_all_titles_for_date()
  296. if not all_titles:
  297. raise DataNotFoundError(
  298. "未找到今天的新闻数据",
  299. suggestion="请确保爬虫已经运行并生成了数据"
  300. )
  301. # 根据 mode 选择要处理的标题数据
  302. if mode == "daily":
  303. titles_to_process = all_titles
  304. elif mode == "current":
  305. titles_to_process = all_titles # 简化实现
  306. else:
  307. raise ValueError(f"不支持的模式: {mode}。支持的模式: daily, current")
  308. # 统计词频
  309. word_frequency = Counter()
  310. keyword_to_news = {}
  311. # 遍历要处理的标题
  312. for platform_id, titles in titles_to_process.items():
  313. for title in titles.keys():
  314. if extract_mode == "keywords":
  315. # 基于预设关键词统计(支持正则匹配)
  316. from trendradar.core.frequency import _word_matches
  317. word_groups = self.parser.parse_frequency_words()
  318. title_lower = title.lower()
  319. for group in word_groups:
  320. all_words = group.get("required", []) + group.get("normal", [])
  321. # 检查是否匹配词组中的任意一个词
  322. matched = any(_word_matches(word_config, title_lower) for word_config in all_words)
  323. if matched:
  324. # 使用组的 display_name(组别名或行别名拼接)
  325. display_key = group.get("display_name") or group.get("group_key", "")
  326. word_frequency[display_key] += 1
  327. if display_key not in keyword_to_news:
  328. keyword_to_news[display_key] = []
  329. keyword_to_news[display_key].append(title)
  330. break # 每个标题只计入第一个匹配的词组
  331. elif extract_mode == "auto_extract":
  332. # 自动提取关键词
  333. extracted_words = self._extract_words_from_title(title)
  334. for word in extracted_words:
  335. word_frequency[word] += 1
  336. if word not in keyword_to_news:
  337. keyword_to_news[word] = []
  338. keyword_to_news[word].append(title)
  339. # 获取TOP N关键词
  340. top_keywords = word_frequency.most_common(top_n)
  341. # 构建话题列表
  342. topics = []
  343. for keyword, frequency in top_keywords:
  344. matched_news = keyword_to_news.get(keyword, [])
  345. topics.append({
  346. "keyword": keyword,
  347. "frequency": frequency,
  348. "matched_news": len(set(matched_news)), # 去重后的新闻数量
  349. "trend": "stable",
  350. "weight_score": 0.0
  351. })
  352. # 构建结果
  353. result = {
  354. "topics": topics,
  355. "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  356. "mode": mode,
  357. "extract_mode": extract_mode,
  358. "total_keywords": len(word_frequency),
  359. "description": self._get_mode_description(mode, extract_mode)
  360. }
  361. # 缓存结果
  362. self.cache.set(cache_key, result)
  363. return result
  364. def _get_mode_description(self, mode: str, extract_mode: str = "keywords") -> str:
  365. """获取模式描述"""
  366. mode_desc = {
  367. "daily": "当日累计统计",
  368. "current": "最新一批统计"
  369. }.get(mode, "未知时间模式")
  370. extract_desc = {
  371. "keywords": "基于预设关注词",
  372. "auto_extract": "自动提取高频词"
  373. }.get(extract_mode, "未知提取模式")
  374. return f"{mode_desc} - {extract_desc}"
  375. def get_current_config(self, section: str = "all") -> Dict:
  376. """
  377. 获取当前系统配置
  378. Args:
  379. section: 配置节 - all/crawler/push/keywords/weights
  380. Returns:
  381. 配置字典
  382. Raises:
  383. FileParseError: 配置文件解析错误
  384. """
  385. # 解析配置文件
  386. config_data = self.parser.parse_yaml_config()
  387. word_groups = self.parser.parse_frequency_words()
  388. # 根据section返回对应配置
  389. advanced = config_data.get("advanced", {})
  390. advanced_crawler = advanced.get("crawler", {})
  391. platforms_config = config_data.get("platforms", {})
  392. if section == "all" or section == "crawler":
  393. crawler_config = {
  394. "enable_crawler": platforms_config.get("enabled", True),
  395. "use_proxy": advanced_crawler.get("use_proxy", False),
  396. "request_interval": advanced_crawler.get("request_interval", 1),
  397. "retry_times": 3,
  398. "platforms": [p["id"] for p in platforms_config.get("sources", [])]
  399. }
  400. if section == "all" or section == "push":
  401. notification = config_data.get("notification", {})
  402. batch_size = advanced.get("batch_size", {})
  403. push_config = {
  404. "enable_notification": notification.get("enabled", True),
  405. "enabled_channels": [],
  406. "message_batch_size": batch_size.get("default", 4000),
  407. "push_window": notification.get("push_window", {})
  408. }
  409. # 检测已配置的通知渠道
  410. channels = notification.get("channels", {})
  411. if channels.get("feishu", {}).get("webhook_url"):
  412. push_config["enabled_channels"].append("feishu")
  413. if channels.get("dingtalk", {}).get("webhook_url"):
  414. push_config["enabled_channels"].append("dingtalk")
  415. if channels.get("wework", {}).get("webhook_url"):
  416. push_config["enabled_channels"].append("wework")
  417. if section == "all" or section == "keywords":
  418. keywords_config = {
  419. "word_groups": word_groups,
  420. "total_groups": len(word_groups)
  421. }
  422. if section == "all" or section == "weights":
  423. weight = advanced.get("weight", {})
  424. weights_config = {
  425. "rank_weight": weight.get("rank", 0.6),
  426. "frequency_weight": weight.get("frequency", 0.3),
  427. "hotness_weight": weight.get("hotness", 0.1)
  428. }
  429. # 组装结果
  430. if section == "all":
  431. result = {
  432. "crawler": crawler_config,
  433. "push": push_config,
  434. "keywords": keywords_config,
  435. "weights": weights_config
  436. }
  437. elif section == "crawler":
  438. result = crawler_config
  439. elif section == "push":
  440. result = push_config
  441. elif section == "keywords":
  442. result = keywords_config
  443. elif section == "weights":
  444. result = weights_config
  445. else:
  446. result = {}
  447. return result
  448. def get_available_date_range(self) -> Tuple[Optional[datetime], Optional[datetime]]:
  449. """
  450. 扫描 output 目录,返回实际可用的日期范围
  451. Returns:
  452. (最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
  453. Examples:
  454. >>> service = DataService()
  455. >>> earliest, latest = service.get_available_date_range()
  456. >>> print(f"可用日期范围:{earliest} 至 {latest}")
  457. """
  458. output_dir = self.parser.project_root / "output"
  459. if not output_dir.exists():
  460. return (None, None)
  461. available_dates = []
  462. # 遍历日期文件夹
  463. for date_folder in output_dir.iterdir():
  464. if date_folder.is_dir() and not date_folder.name.startswith('.'):
  465. folder_date = self._parse_date_folder_name(date_folder.name)
  466. if folder_date:
  467. available_dates.append(folder_date)
  468. if not available_dates:
  469. return (None, None)
  470. return (min(available_dates), max(available_dates))
  471. def _parse_date_folder_name(self, folder_name: str) -> Optional[datetime]:
  472. """
  473. 解析日期文件夹名称(兼容中文和ISO格式)
  474. 支持两种格式:
  475. - 中文格式:YYYY年MM月DD日
  476. - ISO格式:YYYY-MM-DD
  477. Args:
  478. folder_name: 文件夹名称
  479. Returns:
  480. datetime 对象,解析失败返回 None
  481. """
  482. # 尝试中文格式:YYYY年MM月DD日
  483. chinese_match = re.match(r'(\d{4})年(\d{2})月(\d{2})日', folder_name)
  484. if chinese_match:
  485. try:
  486. return datetime(
  487. int(chinese_match.group(1)),
  488. int(chinese_match.group(2)),
  489. int(chinese_match.group(3))
  490. )
  491. except ValueError:
  492. pass
  493. # 尝试 ISO 格式:YYYY-MM-DD
  494. iso_match = re.match(r'(\d{4})-(\d{2})-(\d{2})', folder_name)
  495. if iso_match:
  496. try:
  497. return datetime(
  498. int(iso_match.group(1)),
  499. int(iso_match.group(2)),
  500. int(iso_match.group(3))
  501. )
  502. except ValueError:
  503. pass
  504. return None
  505. def get_system_status(self) -> Dict:
  506. """
  507. 获取系统运行状态
  508. Returns:
  509. 系统状态字典
  510. """
  511. # 获取数据统计
  512. output_dir = self.parser.project_root / "output"
  513. total_storage = 0
  514. oldest_record = None
  515. latest_record = None
  516. total_news = 0
  517. if output_dir.exists():
  518. # 遍历日期文件夹
  519. for date_folder in output_dir.iterdir():
  520. if date_folder.is_dir() and not date_folder.name.startswith('.'):
  521. # 解析日期(兼容中文和ISO格式)
  522. folder_date = self._parse_date_folder_name(date_folder.name)
  523. if folder_date:
  524. if oldest_record is None or folder_date < oldest_record:
  525. oldest_record = folder_date
  526. if latest_record is None or folder_date > latest_record:
  527. latest_record = folder_date
  528. # 计算存储大小
  529. for item in date_folder.rglob("*"):
  530. if item.is_file():
  531. total_storage += item.stat().st_size
  532. # 读取版本信息
  533. version_file = self.parser.project_root / "version"
  534. version = "unknown"
  535. if version_file.exists():
  536. try:
  537. with open(version_file, "r") as f:
  538. version = f.read().strip()
  539. except:
  540. pass
  541. return {
  542. "system": {
  543. "version": version,
  544. "project_root": str(self.parser.project_root)
  545. },
  546. "data": {
  547. "total_storage": f"{total_storage / 1024 / 1024:.2f} MB",
  548. "oldest_record": oldest_record.strftime("%Y-%m-%d") if oldest_record else None,
  549. "latest_record": latest_record.strftime("%Y-%m-%d") if latest_record else None,
  550. },
  551. "cache": self.cache.get_stats(),
  552. "health": "healthy"
  553. }
  554. # ========================================
  555. # RSS 数据查询方法
  556. # ========================================
  557. def get_latest_rss(
  558. self,
  559. feeds: Optional[List[str]] = None,
  560. days: int = 1,
  561. limit: int = 50,
  562. include_summary: bool = False
  563. ) -> List[Dict]:
  564. """
  565. 获取最新的 RSS 数据(支持多日查询)
  566. Args:
  567. feeds: RSS 源 ID 列表,None 表示所有源
  568. days: 获取最近 N 天的数据,默认 1(仅今天),最大 30 天
  569. limit: 返回条数限制
  570. include_summary: 是否包含摘要,默认 False(节省 token)
  571. Returns:
  572. RSS 条目列表(按 URL 去重)
  573. Raises:
  574. DataNotFoundError: 数据不存在
  575. """
  576. days = min(max(days, 1), 30) # 限制 1-30 天
  577. cache_key = f"latest_rss:{','.join(feeds or [])}:{days}:{limit}:{include_summary}"
  578. cached = self.cache.get(cache_key, ttl=900)
  579. if cached:
  580. return cached
  581. rss_list = []
  582. seen_urls = set() # 跨日期 URL 去重
  583. today = datetime.now()
  584. for i in range(days):
  585. target_date = today - timedelta(days=i)
  586. try:
  587. all_items, id_to_name, timestamps = self.parser.read_all_titles_for_date(
  588. date=target_date,
  589. platform_ids=feeds,
  590. db_type="rss"
  591. )
  592. # 获取抓取时间
  593. if timestamps:
  594. latest_timestamp = max(timestamps.values())
  595. fetch_time = datetime.fromtimestamp(latest_timestamp)
  596. else:
  597. fetch_time = target_date
  598. # 转换为列表
  599. for feed_id, items in all_items.items():
  600. feed_name = id_to_name.get(feed_id, feed_id)
  601. for title, info in items.items():
  602. # 跨日期 URL 去重
  603. url = info.get("url", "")
  604. if url and url in seen_urls:
  605. continue
  606. if url:
  607. seen_urls.add(url)
  608. rss_item = {
  609. "title": title,
  610. "feed_id": feed_id,
  611. "feed_name": feed_name,
  612. "url": url,
  613. "published_at": info.get("published_at", ""),
  614. "author": info.get("author", ""),
  615. "date": target_date.strftime("%Y-%m-%d"),
  616. "fetch_time": fetch_time.strftime("%Y-%m-%d %H:%M:%S") if isinstance(fetch_time, datetime) else target_date.strftime("%Y-%m-%d")
  617. }
  618. if include_summary:
  619. rss_item["summary"] = info.get("summary", "")
  620. rss_list.append(rss_item)
  621. except DataNotFoundError:
  622. continue
  623. # 按发布时间排序(最新的在前)
  624. rss_list.sort(key=lambda x: x.get("published_at", ""), reverse=True)
  625. # 限制返回数量
  626. result = rss_list[:limit]
  627. # 缓存结果
  628. self.cache.set(cache_key, result)
  629. return result
  630. def search_rss(
  631. self,
  632. keyword: str,
  633. feeds: Optional[List[str]] = None,
  634. days: int = 7,
  635. limit: int = 50,
  636. include_summary: bool = False
  637. ) -> List[Dict]:
  638. """
  639. 搜索 RSS 数据(跨日期自动去重)
  640. Args:
  641. keyword: 搜索关键词
  642. feeds: RSS 源 ID 列表,None 表示所有源
  643. days: 搜索最近 N 天的数据
  644. limit: 返回条数限制
  645. include_summary: 是否包含摘要
  646. Returns:
  647. 匹配的 RSS 条目列表(按 URL 去重)
  648. """
  649. cache_key = f"search_rss:{keyword}:{','.join(feeds or [])}:{days}:{limit}:{include_summary}"
  650. cached = self.cache.get(cache_key, ttl=900)
  651. if cached:
  652. return cached
  653. results = []
  654. seen_urls = set() # 用于 URL 去重
  655. today = datetime.now()
  656. for i in range(days):
  657. target_date = today - timedelta(days=i)
  658. try:
  659. all_items, id_to_name, _ = self.parser.read_all_titles_for_date(
  660. date=target_date,
  661. platform_ids=feeds,
  662. db_type="rss"
  663. )
  664. for feed_id, items in all_items.items():
  665. feed_name = id_to_name.get(feed_id, feed_id)
  666. for title, info in items.items():
  667. # 跨日期去重:如果 URL 已出现过则跳过
  668. url = info.get("url", "")
  669. if url and url in seen_urls:
  670. continue
  671. if url:
  672. seen_urls.add(url)
  673. # 关键词匹配(标题或摘要)
  674. summary = info.get("summary", "")
  675. if keyword.lower() in title.lower() or keyword.lower() in summary.lower():
  676. rss_item = {
  677. "title": title,
  678. "feed_id": feed_id,
  679. "feed_name": feed_name,
  680. "url": url,
  681. "published_at": info.get("published_at", ""),
  682. "author": info.get("author", ""),
  683. "date": target_date.strftime("%Y-%m-%d")
  684. }
  685. if include_summary:
  686. rss_item["summary"] = summary
  687. results.append(rss_item)
  688. except DataNotFoundError:
  689. continue
  690. # 按发布时间排序
  691. results.sort(key=lambda x: x.get("published_at", ""), reverse=True)
  692. # 限制返回数量
  693. result = results[:limit]
  694. # 缓存结果
  695. self.cache.set(cache_key, result)
  696. return result
  697. def get_rss_feeds_status(self) -> Dict:
  698. """
  699. 获取 RSS 源状态
  700. Returns:
  701. RSS 源状态信息
  702. """
  703. cache_key = "rss_feeds_status"
  704. cached = self.cache.get(cache_key, ttl=900)
  705. if cached:
  706. return cached
  707. # 获取可用的 RSS 日期
  708. available_dates = self.parser.get_available_dates(db_type="rss")
  709. # 获取今天的 RSS 数据统计
  710. today_stats = {}
  711. try:
  712. all_items, id_to_name, _ = self.parser.read_all_titles_for_date(
  713. date=None,
  714. platform_ids=None,
  715. db_type="rss"
  716. )
  717. for feed_id, items in all_items.items():
  718. today_stats[feed_id] = {
  719. "name": id_to_name.get(feed_id, feed_id),
  720. "item_count": len(items)
  721. }
  722. except DataNotFoundError:
  723. pass
  724. result = {
  725. "available_dates": available_dates[:10], # 最近 10 天
  726. "total_dates": len(available_dates),
  727. "today_feeds": today_stats,
  728. "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  729. }
  730. self.cache.set(cache_key, result)
  731. return result