data.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # coding=utf-8
  2. """
  3. 数据处理模块
  4. 提供数据读取和检测功能:
  5. - read_all_today_titles: 从存储后端读取当天所有标题
  6. - detect_latest_new_titles: 检测最新批次的新增标题
  7. Author: TrendRadar Team
  8. """
  9. from typing import Dict, List, Tuple, Optional
  10. def read_all_today_titles_from_storage(
  11. storage_manager,
  12. current_platform_ids: Optional[List[str]] = None,
  13. ) -> Tuple[Dict, Dict, Dict]:
  14. """
  15. 从存储后端读取当天所有标题(SQLite 数据)
  16. Args:
  17. storage_manager: 存储管理器实例
  18. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  19. Returns:
  20. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  21. """
  22. try:
  23. news_data = storage_manager.get_today_all_data()
  24. if not news_data or not news_data.items:
  25. return {}, {}, {}
  26. all_results = {}
  27. final_id_to_name = {}
  28. title_info = {}
  29. for source_id, news_list in news_data.items.items():
  30. # 按平台过滤
  31. if current_platform_ids is not None and source_id not in current_platform_ids:
  32. continue
  33. # 获取来源名称
  34. source_name = news_data.id_to_name.get(source_id, source_id)
  35. final_id_to_name[source_id] = source_name
  36. if source_id not in all_results:
  37. all_results[source_id] = {}
  38. title_info[source_id] = {}
  39. for item in news_list:
  40. title = item.title
  41. ranks = item.ranks or [item.rank]
  42. first_time = item.first_time or item.crawl_time
  43. last_time = item.last_time or item.crawl_time
  44. count = item.count
  45. rank_timeline = item.rank_timeline
  46. all_results[source_id][title] = {
  47. "ranks": ranks,
  48. "url": item.url or "",
  49. "mobileUrl": item.mobile_url or "",
  50. }
  51. title_info[source_id][title] = {
  52. "first_time": first_time,
  53. "last_time": last_time,
  54. "count": count,
  55. "ranks": ranks,
  56. "url": item.url or "",
  57. "mobileUrl": item.mobile_url or "",
  58. "rank_timeline": rank_timeline,
  59. }
  60. return all_results, final_id_to_name, title_info
  61. except Exception as e:
  62. print(f"[存储] 从存储后端读取数据失败: {e}")
  63. return {}, {}, {}
  64. def read_all_today_titles(
  65. storage_manager,
  66. current_platform_ids: Optional[List[str]] = None,
  67. quiet: bool = False,
  68. ) -> Tuple[Dict, Dict, Dict]:
  69. """
  70. 读取当天所有标题(从存储后端)
  71. Args:
  72. storage_manager: 存储管理器实例
  73. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  74. quiet: 是否静默模式(不打印日志)
  75. Returns:
  76. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  77. """
  78. all_results, final_id_to_name, title_info = read_all_today_titles_from_storage(
  79. storage_manager, current_platform_ids
  80. )
  81. if not quiet:
  82. if all_results:
  83. total_count = sum(len(titles) for titles in all_results.values())
  84. print(f"[存储] 已从存储后端读取 {total_count} 条标题")
  85. else:
  86. print("[存储] 当天暂无数据")
  87. return all_results, final_id_to_name, title_info
  88. def detect_latest_new_titles_from_storage(
  89. storage_manager,
  90. current_platform_ids: Optional[List[str]] = None,
  91. ) -> Dict:
  92. """
  93. 从存储后端检测最新批次的新增标题
  94. Args:
  95. storage_manager: 存储管理器实例
  96. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  97. Returns:
  98. Dict: 新增标题 {source_id: {title: title_data}}
  99. """
  100. try:
  101. # 获取最新抓取数据
  102. latest_data = storage_manager.get_latest_crawl_data()
  103. if not latest_data or not latest_data.items:
  104. return {}
  105. # 获取所有历史数据
  106. all_data = storage_manager.get_today_all_data()
  107. if not all_data or not all_data.items:
  108. # 没有历史数据(第一次抓取),不应该有"新增"标题
  109. return {}
  110. # 获取最新批次时间
  111. latest_time = latest_data.crawl_time
  112. # 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
  113. latest_titles = {}
  114. for source_id, news_list in latest_data.items.items():
  115. if current_platform_ids is not None and source_id not in current_platform_ids:
  116. continue
  117. latest_titles[source_id] = {}
  118. for item in news_list:
  119. latest_titles[source_id][item.title] = {
  120. "ranks": [item.rank],
  121. "url": item.url or "",
  122. "mobileUrl": item.mobile_url or "",
  123. }
  124. # 步骤2:收集历史标题
  125. # 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
  126. # 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
  127. historical_titles = {}
  128. for source_id, news_list in all_data.items.items():
  129. if current_platform_ids is not None and source_id not in current_platform_ids:
  130. continue
  131. historical_titles[source_id] = set()
  132. for item in news_list:
  133. first_time = item.first_time or item.crawl_time
  134. # 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
  135. if first_time < latest_time:
  136. historical_titles[source_id].add(item.title)
  137. # 检查是否是当天第一次抓取(没有任何历史标题)
  138. # 如果所有平台的历史标题集合都为空,说明只有一个抓取批次
  139. # 在这种情况下,将所有最新批次的标题视为"新增"(用于增量模式的第一次推送)
  140. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  141. if not has_historical_data:
  142. # 第一次爬取:返回所有最新标题作为"新增"
  143. return latest_titles
  144. # 步骤3:找出新增标题 = 最新批次标题 - 历史标题
  145. new_titles = {}
  146. for source_id, source_latest_titles in latest_titles.items():
  147. historical_set = historical_titles.get(source_id, set())
  148. source_new_titles = {}
  149. for title, title_data in source_latest_titles.items():
  150. if title not in historical_set:
  151. source_new_titles[title] = title_data
  152. if source_new_titles:
  153. new_titles[source_id] = source_new_titles
  154. return new_titles
  155. except Exception as e:
  156. print(f"[存储] 从存储后端检测新标题失败: {e}")
  157. return {}
  158. def detect_latest_new_titles(
  159. storage_manager,
  160. current_platform_ids: Optional[List[str]] = None,
  161. quiet: bool = False,
  162. ) -> Dict:
  163. """
  164. 检测当日最新批次的新增标题(从存储后端)
  165. Args:
  166. storage_manager: 存储管理器实例
  167. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  168. quiet: 是否静默模式(不打印日志)
  169. Returns:
  170. Dict: 新增标题 {source_id: {title: title_data}}
  171. """
  172. new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
  173. if new_titles and not quiet:
  174. total_new = sum(len(titles) for titles in new_titles.values())
  175. print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
  176. return new_titles