| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- # coding=utf-8
- """
- 数据处理模块
- 提供数据读取和检测功能:
- - read_all_today_titles: 从存储后端读取当天所有标题
- - detect_latest_new_titles: 检测最新批次的新增标题
- Author: TrendRadar Team
- """
- from typing import Dict, List, Tuple, Optional
- def read_all_today_titles_from_storage(
- storage_manager,
- current_platform_ids: Optional[List[str]] = None,
- ) -> Tuple[Dict, Dict, Dict]:
- """
- 从存储后端读取当天所有标题(SQLite 数据)
- Args:
- storage_manager: 存储管理器实例
- current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
- Returns:
- Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
- """
- try:
- news_data = storage_manager.get_today_all_data()
- if not news_data or not news_data.items:
- return {}, {}, {}
- all_results = {}
- final_id_to_name = {}
- title_info = {}
- for source_id, news_list in news_data.items.items():
- # 按平台过滤
- if current_platform_ids is not None and source_id not in current_platform_ids:
- continue
- # 获取来源名称
- source_name = news_data.id_to_name.get(source_id, source_id)
- final_id_to_name[source_id] = source_name
- if source_id not in all_results:
- all_results[source_id] = {}
- title_info[source_id] = {}
- for item in news_list:
- title = item.title
- ranks = item.ranks or [item.rank]
- first_time = item.first_time or item.crawl_time
- last_time = item.last_time or item.crawl_time
- count = item.count
- rank_timeline = item.rank_timeline
- all_results[source_id][title] = {
- "ranks": ranks,
- "url": item.url or "",
- "mobileUrl": item.mobile_url or "",
- }
- title_info[source_id][title] = {
- "first_time": first_time,
- "last_time": last_time,
- "count": count,
- "ranks": ranks,
- "url": item.url or "",
- "mobileUrl": item.mobile_url or "",
- "rank_timeline": rank_timeline,
- }
- return all_results, final_id_to_name, title_info
- except Exception as e:
- print(f"[存储] 从存储后端读取数据失败: {e}")
- return {}, {}, {}
- def read_all_today_titles(
- storage_manager,
- current_platform_ids: Optional[List[str]] = None,
- quiet: bool = False,
- ) -> Tuple[Dict, Dict, Dict]:
- """
- 读取当天所有标题(从存储后端)
- Args:
- storage_manager: 存储管理器实例
- current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
- quiet: 是否静默模式(不打印日志)
- Returns:
- Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
- """
- all_results, final_id_to_name, title_info = read_all_today_titles_from_storage(
- storage_manager, current_platform_ids
- )
- if not quiet:
- if all_results:
- total_count = sum(len(titles) for titles in all_results.values())
- print(f"[存储] 已从存储后端读取 {total_count} 条标题")
- else:
- print("[存储] 当天暂无数据")
- return all_results, final_id_to_name, title_info
- def detect_latest_new_titles_from_storage(
- storage_manager,
- current_platform_ids: Optional[List[str]] = None,
- ) -> Dict:
- """
- 从存储后端检测最新批次的新增标题
- Args:
- storage_manager: 存储管理器实例
- current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
- Returns:
- Dict: 新增标题 {source_id: {title: title_data}}
- """
- try:
- # 获取最新抓取数据
- latest_data = storage_manager.get_latest_crawl_data()
- if not latest_data or not latest_data.items:
- return {}
- # 获取所有历史数据
- all_data = storage_manager.get_today_all_data()
- if not all_data or not all_data.items:
- # 没有历史数据(第一次抓取),不应该有"新增"标题
- return {}
- # 获取最新批次时间
- latest_time = latest_data.crawl_time
- # 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
- latest_titles = {}
- for source_id, news_list in latest_data.items.items():
- if current_platform_ids is not None and source_id not in current_platform_ids:
- continue
- latest_titles[source_id] = {}
- for item in news_list:
- latest_titles[source_id][item.title] = {
- "ranks": [item.rank],
- "url": item.url or "",
- "mobileUrl": item.mobile_url or "",
- }
- # 步骤2:收集历史标题
- # 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
- # 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
- historical_titles = {}
- for source_id, news_list in all_data.items.items():
- if current_platform_ids is not None and source_id not in current_platform_ids:
- continue
- historical_titles[source_id] = set()
- for item in news_list:
- first_time = item.first_time or item.crawl_time
- # 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
- if first_time < latest_time:
- historical_titles[source_id].add(item.title)
- # 检查是否是当天第一次抓取(没有任何历史标题)
- # 如果所有平台的历史标题集合都为空,说明只有一个抓取批次
- # 在这种情况下,将所有最新批次的标题视为"新增"(用于增量模式的第一次推送)
- has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
- if not has_historical_data:
- # 第一次爬取:返回所有最新标题作为"新增"
- return latest_titles
- # 步骤3:找出新增标题 = 最新批次标题 - 历史标题
- new_titles = {}
- for source_id, source_latest_titles in latest_titles.items():
- historical_set = historical_titles.get(source_id, set())
- source_new_titles = {}
- for title, title_data in source_latest_titles.items():
- if title not in historical_set:
- source_new_titles[title] = title_data
- if source_new_titles:
- new_titles[source_id] = source_new_titles
- return new_titles
- except Exception as e:
- print(f"[存储] 从存储后端检测新标题失败: {e}")
- return {}
- def detect_latest_new_titles(
- storage_manager,
- current_platform_ids: Optional[List[str]] = None,
- quiet: bool = False,
- ) -> Dict:
- """
- 检测当日最新批次的新增标题(从存储后端)
- Args:
- storage_manager: 存储管理器实例
- current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
- quiet: 是否静默模式(不打印日志)
- Returns:
- Dict: 新增标题 {source_id: {title: title_data}}
- """
- new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
- if new_titles and not quiet:
- total_new = sum(len(titles) for titles in new_titles.values())
- print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
- return new_titles
|