data.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # coding=utf-8
  2. """
  3. 数据处理模块
  4. 提供数据读取、保存和检测功能:
  5. - save_titles_to_file: 保存标题到 TXT 文件
  6. - read_all_today_titles: 从存储后端读取当天所有标题
  7. - detect_latest_new_titles: 检测最新批次的新增标题
  8. Author: TrendRadar Team
  9. """
  10. from pathlib import Path
  11. from typing import Dict, List, Tuple, Optional, Callable
  12. def save_titles_to_file(
  13. results: Dict,
  14. id_to_name: Dict,
  15. failed_ids: List,
  16. output_path: str,
  17. clean_title_func: Callable[[str], str],
  18. ) -> str:
  19. """
  20. 保存标题到 TXT 文件
  21. Args:
  22. results: 抓取结果 {source_id: {title: title_data}}
  23. id_to_name: ID 到名称的映射
  24. failed_ids: 失败的 ID 列表
  25. output_path: 输出文件路径
  26. clean_title_func: 标题清理函数
  27. Returns:
  28. str: 保存的文件路径
  29. """
  30. # 确保目录存在
  31. Path(output_path).parent.mkdir(parents=True, exist_ok=True)
  32. with open(output_path, "w", encoding="utf-8") as f:
  33. for id_value, title_data in results.items():
  34. # id | name 或 id
  35. name = id_to_name.get(id_value)
  36. if name and name != id_value:
  37. f.write(f"{id_value} | {name}\n")
  38. else:
  39. f.write(f"{id_value}\n")
  40. # 按排名排序标题
  41. sorted_titles = []
  42. for title, info in title_data.items():
  43. cleaned_title = clean_title_func(title)
  44. if isinstance(info, dict):
  45. ranks = info.get("ranks", [])
  46. url = info.get("url", "")
  47. mobile_url = info.get("mobileUrl", "")
  48. else:
  49. ranks = info if isinstance(info, list) else []
  50. url = ""
  51. mobile_url = ""
  52. rank = ranks[0] if ranks else 1
  53. sorted_titles.append((rank, cleaned_title, url, mobile_url))
  54. sorted_titles.sort(key=lambda x: x[0])
  55. for rank, cleaned_title, url, mobile_url in sorted_titles:
  56. line = f"{rank}. {cleaned_title}"
  57. if url:
  58. line += f" [URL:{url}]"
  59. if mobile_url:
  60. line += f" [MOBILE:{mobile_url}]"
  61. f.write(line + "\n")
  62. f.write("\n")
  63. if failed_ids:
  64. f.write("==== 以下ID请求失败 ====\n")
  65. for id_value in failed_ids:
  66. f.write(f"{id_value}\n")
  67. return output_path
  68. def read_all_today_titles_from_storage(
  69. storage_manager,
  70. current_platform_ids: Optional[List[str]] = None,
  71. ) -> Tuple[Dict, Dict, Dict]:
  72. """
  73. 从存储后端读取当天所有标题(SQLite 数据)
  74. Args:
  75. storage_manager: 存储管理器实例
  76. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  77. Returns:
  78. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  79. """
  80. try:
  81. news_data = storage_manager.get_today_all_data()
  82. if not news_data or not news_data.items:
  83. return {}, {}, {}
  84. all_results = {}
  85. final_id_to_name = {}
  86. title_info = {}
  87. for source_id, news_list in news_data.items.items():
  88. # 按平台过滤
  89. if current_platform_ids is not None and source_id not in current_platform_ids:
  90. continue
  91. # 获取来源名称
  92. source_name = news_data.id_to_name.get(source_id, source_id)
  93. final_id_to_name[source_id] = source_name
  94. if source_id not in all_results:
  95. all_results[source_id] = {}
  96. title_info[source_id] = {}
  97. for item in news_list:
  98. title = item.title
  99. ranks = getattr(item, 'ranks', [item.rank])
  100. first_time = getattr(item, 'first_time', item.crawl_time)
  101. last_time = getattr(item, 'last_time', item.crawl_time)
  102. count = getattr(item, 'count', 1)
  103. rank_timeline = getattr(item, 'rank_timeline', [])
  104. all_results[source_id][title] = {
  105. "ranks": ranks,
  106. "url": item.url or "",
  107. "mobileUrl": item.mobile_url or "",
  108. }
  109. title_info[source_id][title] = {
  110. "first_time": first_time,
  111. "last_time": last_time,
  112. "count": count,
  113. "ranks": ranks,
  114. "url": item.url or "",
  115. "mobileUrl": item.mobile_url or "",
  116. "rank_timeline": rank_timeline,
  117. }
  118. return all_results, final_id_to_name, title_info
  119. except Exception as e:
  120. print(f"[存储] 从存储后端读取数据失败: {e}")
  121. return {}, {}, {}
  122. def read_all_today_titles(
  123. storage_manager,
  124. current_platform_ids: Optional[List[str]] = None,
  125. quiet: bool = False,
  126. ) -> Tuple[Dict, Dict, Dict]:
  127. """
  128. 读取当天所有标题(从存储后端)
  129. Args:
  130. storage_manager: 存储管理器实例
  131. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  132. quiet: 是否静默模式(不打印日志)
  133. Returns:
  134. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  135. """
  136. all_results, final_id_to_name, title_info = read_all_today_titles_from_storage(
  137. storage_manager, current_platform_ids
  138. )
  139. if not quiet:
  140. if all_results:
  141. total_count = sum(len(titles) for titles in all_results.values())
  142. print(f"[存储] 已从存储后端读取 {total_count} 条标题")
  143. else:
  144. print("[存储] 当天暂无数据")
  145. return all_results, final_id_to_name, title_info
  146. def detect_latest_new_titles_from_storage(
  147. storage_manager,
  148. current_platform_ids: Optional[List[str]] = None,
  149. ) -> Dict:
  150. """
  151. 从存储后端检测最新批次的新增标题
  152. Args:
  153. storage_manager: 存储管理器实例
  154. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  155. Returns:
  156. Dict: 新增标题 {source_id: {title: title_data}}
  157. """
  158. try:
  159. # 获取最新抓取数据
  160. latest_data = storage_manager.get_latest_crawl_data()
  161. if not latest_data or not latest_data.items:
  162. return {}
  163. # 获取所有历史数据
  164. all_data = storage_manager.get_today_all_data()
  165. if not all_data or not all_data.items:
  166. # 没有历史数据(第一次抓取),不应该有"新增"标题
  167. return {}
  168. # 获取最新批次时间
  169. latest_time = latest_data.crawl_time
  170. # 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
  171. latest_titles = {}
  172. for source_id, news_list in latest_data.items.items():
  173. if current_platform_ids is not None and source_id not in current_platform_ids:
  174. continue
  175. latest_titles[source_id] = {}
  176. for item in news_list:
  177. latest_titles[source_id][item.title] = {
  178. "ranks": [item.rank],
  179. "url": item.url or "",
  180. "mobileUrl": item.mobile_url or "",
  181. }
  182. # 步骤2:收集历史标题
  183. # 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
  184. # 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
  185. historical_titles = {}
  186. for source_id, news_list in all_data.items.items():
  187. if current_platform_ids is not None and source_id not in current_platform_ids:
  188. continue
  189. historical_titles[source_id] = set()
  190. for item in news_list:
  191. first_time = getattr(item, 'first_time', item.crawl_time)
  192. # 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
  193. if first_time < latest_time:
  194. historical_titles[source_id].add(item.title)
  195. # 检查是否是当天第一次抓取(没有任何历史标题)
  196. # 如果所有平台的历史标题集合都为空,说明只有一个抓取批次,不应该有"新增"标题
  197. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  198. if not has_historical_data:
  199. return {}
  200. # 步骤3:找出新增标题 = 最新批次标题 - 历史标题
  201. new_titles = {}
  202. for source_id, source_latest_titles in latest_titles.items():
  203. historical_set = historical_titles.get(source_id, set())
  204. source_new_titles = {}
  205. for title, title_data in source_latest_titles.items():
  206. if title not in historical_set:
  207. source_new_titles[title] = title_data
  208. if source_new_titles:
  209. new_titles[source_id] = source_new_titles
  210. return new_titles
  211. except Exception as e:
  212. print(f"[存储] 从存储后端检测新标题失败: {e}")
  213. return {}
  214. def detect_latest_new_titles(
  215. storage_manager,
  216. current_platform_ids: Optional[List[str]] = None,
  217. quiet: bool = False,
  218. ) -> Dict:
  219. """
  220. 检测当日最新批次的新增标题(从存储后端)
  221. Args:
  222. storage_manager: 存储管理器实例
  223. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  224. quiet: 是否静默模式(不打印日志)
  225. Returns:
  226. Dict: 新增标题 {source_id: {title: title_data}}
  227. """
  228. new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
  229. if new_titles and not quiet:
  230. total_new = sum(len(titles) for titles in new_titles.values())
  231. print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
  232. return new_titles