data.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. # coding=utf-8
  2. """
  3. 数据处理模块
  4. 提供数据读取、保存和检测功能:
  5. - save_titles_to_file: 保存标题到 TXT 文件
  6. - read_all_today_titles: 从存储后端读取当天所有标题
  7. - detect_latest_new_titles: 检测最新批次的新增标题
  8. Author: TrendRadar Team
  9. """
  10. from pathlib import Path
  11. from typing import Dict, List, Tuple, Optional, Callable
  12. def save_titles_to_file(
  13. results: Dict,
  14. id_to_name: Dict,
  15. failed_ids: List,
  16. output_path: str,
  17. clean_title_func: Callable[[str], str],
  18. ) -> str:
  19. """
  20. 保存标题到 TXT 文件
  21. Args:
  22. results: 抓取结果 {source_id: {title: title_data}}
  23. id_to_name: ID 到名称的映射
  24. failed_ids: 失败的 ID 列表
  25. output_path: 输出文件路径
  26. clean_title_func: 标题清理函数
  27. Returns:
  28. str: 保存的文件路径
  29. """
  30. # 确保目录存在
  31. Path(output_path).parent.mkdir(parents=True, exist_ok=True)
  32. with open(output_path, "w", encoding="utf-8") as f:
  33. for id_value, title_data in results.items():
  34. # id | name 或 id
  35. name = id_to_name.get(id_value)
  36. if name and name != id_value:
  37. f.write(f"{id_value} | {name}\n")
  38. else:
  39. f.write(f"{id_value}\n")
  40. # 按排名排序标题
  41. sorted_titles = []
  42. for title, info in title_data.items():
  43. cleaned_title = clean_title_func(title)
  44. if isinstance(info, dict):
  45. ranks = info.get("ranks", [])
  46. url = info.get("url", "")
  47. mobile_url = info.get("mobileUrl", "")
  48. else:
  49. ranks = info if isinstance(info, list) else []
  50. url = ""
  51. mobile_url = ""
  52. rank = ranks[0] if ranks else 1
  53. sorted_titles.append((rank, cleaned_title, url, mobile_url))
  54. sorted_titles.sort(key=lambda x: x[0])
  55. for rank, cleaned_title, url, mobile_url in sorted_titles:
  56. line = f"{rank}. {cleaned_title}"
  57. if url:
  58. line += f" [URL:{url}]"
  59. if mobile_url:
  60. line += f" [MOBILE:{mobile_url}]"
  61. f.write(line + "\n")
  62. f.write("\n")
  63. if failed_ids:
  64. f.write("==== 以下ID请求失败 ====\n")
  65. for id_value in failed_ids:
  66. f.write(f"{id_value}\n")
  67. return output_path
  68. def read_all_today_titles_from_storage(
  69. storage_manager,
  70. current_platform_ids: Optional[List[str]] = None,
  71. ) -> Tuple[Dict, Dict, Dict]:
  72. """
  73. 从存储后端读取当天所有标题(SQLite 数据)
  74. Args:
  75. storage_manager: 存储管理器实例
  76. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  77. Returns:
  78. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  79. """
  80. try:
  81. news_data = storage_manager.get_today_all_data()
  82. if not news_data or not news_data.items:
  83. return {}, {}, {}
  84. all_results = {}
  85. final_id_to_name = {}
  86. title_info = {}
  87. for source_id, news_list in news_data.items.items():
  88. # 按平台过滤
  89. if current_platform_ids is not None and source_id not in current_platform_ids:
  90. continue
  91. # 获取来源名称
  92. source_name = news_data.id_to_name.get(source_id, source_id)
  93. final_id_to_name[source_id] = source_name
  94. if source_id not in all_results:
  95. all_results[source_id] = {}
  96. title_info[source_id] = {}
  97. for item in news_list:
  98. title = item.title
  99. ranks = getattr(item, 'ranks', [item.rank])
  100. first_time = getattr(item, 'first_time', item.crawl_time)
  101. last_time = getattr(item, 'last_time', item.crawl_time)
  102. count = getattr(item, 'count', 1)
  103. all_results[source_id][title] = {
  104. "ranks": ranks,
  105. "url": item.url or "",
  106. "mobileUrl": item.mobile_url or "",
  107. }
  108. title_info[source_id][title] = {
  109. "first_time": first_time,
  110. "last_time": last_time,
  111. "count": count,
  112. "ranks": ranks,
  113. "url": item.url or "",
  114. "mobileUrl": item.mobile_url or "",
  115. }
  116. return all_results, final_id_to_name, title_info
  117. except Exception as e:
  118. print(f"[存储] 从存储后端读取数据失败: {e}")
  119. return {}, {}, {}
  120. def read_all_today_titles(
  121. storage_manager,
  122. current_platform_ids: Optional[List[str]] = None,
  123. quiet: bool = False,
  124. ) -> Tuple[Dict, Dict, Dict]:
  125. """
  126. 读取当天所有标题(从存储后端)
  127. Args:
  128. storage_manager: 存储管理器实例
  129. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  130. quiet: 是否静默模式(不打印日志)
  131. Returns:
  132. Tuple[Dict, Dict, Dict]: (all_results, id_to_name, title_info)
  133. """
  134. all_results, final_id_to_name, title_info = read_all_today_titles_from_storage(
  135. storage_manager, current_platform_ids
  136. )
  137. if not quiet:
  138. if all_results:
  139. total_count = sum(len(titles) for titles in all_results.values())
  140. print(f"[存储] 已从存储后端读取 {total_count} 条标题")
  141. else:
  142. print("[存储] 当天暂无数据")
  143. return all_results, final_id_to_name, title_info
  144. def detect_latest_new_titles_from_storage(
  145. storage_manager,
  146. current_platform_ids: Optional[List[str]] = None,
  147. ) -> Dict:
  148. """
  149. 从存储后端检测最新批次的新增标题
  150. Args:
  151. storage_manager: 存储管理器实例
  152. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  153. Returns:
  154. Dict: 新增标题 {source_id: {title: title_data}}
  155. """
  156. try:
  157. # 获取最新抓取数据
  158. latest_data = storage_manager.get_latest_crawl_data()
  159. if not latest_data or not latest_data.items:
  160. return {}
  161. # 获取所有历史数据
  162. all_data = storage_manager.get_today_all_data()
  163. if not all_data or not all_data.items:
  164. # 没有历史数据(第一次抓取),不应该有"新增"标题
  165. return {}
  166. # 获取最新批次时间
  167. latest_time = latest_data.crawl_time
  168. # 步骤1:收集最新批次的标题(last_crawl_time = latest_time 的标题)
  169. latest_titles = {}
  170. for source_id, news_list in latest_data.items.items():
  171. if current_platform_ids is not None and source_id not in current_platform_ids:
  172. continue
  173. latest_titles[source_id] = {}
  174. for item in news_list:
  175. latest_titles[source_id][item.title] = {
  176. "ranks": [item.rank],
  177. "url": item.url or "",
  178. "mobileUrl": item.mobile_url or "",
  179. }
  180. # 步骤2:收集历史标题
  181. # 关键逻辑:一个标题只要其 first_crawl_time < latest_time,就是历史标题
  182. # 这样即使同一标题有多条记录(URL 不同),只要任何一条是历史的,该标题就算历史
  183. historical_titles = {}
  184. for source_id, news_list in all_data.items.items():
  185. if current_platform_ids is not None and source_id not in current_platform_ids:
  186. continue
  187. historical_titles[source_id] = set()
  188. for item in news_list:
  189. first_time = getattr(item, 'first_time', item.crawl_time)
  190. # 如果该记录的首次出现时间早于最新批次,则该标题是历史标题
  191. if first_time < latest_time:
  192. historical_titles[source_id].add(item.title)
  193. # 检查是否是当天第一次抓取(没有任何历史标题)
  194. # 如果所有平台的历史标题集合都为空,说明只有一个抓取批次,不应该有"新增"标题
  195. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  196. if not has_historical_data:
  197. return {}
  198. # 步骤3:找出新增标题 = 最新批次标题 - 历史标题
  199. new_titles = {}
  200. for source_id, source_latest_titles in latest_titles.items():
  201. historical_set = historical_titles.get(source_id, set())
  202. source_new_titles = {}
  203. for title, title_data in source_latest_titles.items():
  204. if title not in historical_set:
  205. source_new_titles[title] = title_data
  206. if source_new_titles:
  207. new_titles[source_id] = source_new_titles
  208. return new_titles
  209. except Exception as e:
  210. print(f"[存储] 从存储后端检测新标题失败: {e}")
  211. return {}
  212. def detect_latest_new_titles(
  213. storage_manager,
  214. current_platform_ids: Optional[List[str]] = None,
  215. quiet: bool = False,
  216. ) -> Dict:
  217. """
  218. 检测当日最新批次的新增标题(从存储后端)
  219. Args:
  220. storage_manager: 存储管理器实例
  221. current_platform_ids: 当前监控的平台 ID 列表(用于过滤)
  222. quiet: 是否静默模式(不打印日志)
  223. Returns:
  224. Dict: 新增标题 {source_id: {title: title_data}}
  225. """
  226. new_titles = detect_latest_new_titles_from_storage(storage_manager, current_platform_ids)
  227. if new_titles and not quiet:
  228. total_new = sum(len(titles) for titles in new_titles.values())
  229. print(f"[存储] 从存储后端检测到 {total_new} 条新增标题")
  230. return new_titles
  231. def is_first_crawl_today(output_dir: str, date_folder: str) -> bool:
  232. """
  233. 检测是否是当天第一次爬取
  234. Args:
  235. output_dir: 输出目录
  236. date_folder: 日期文件夹名称
  237. Returns:
  238. bool: 是否是当天第一次爬取
  239. """
  240. txt_dir = Path(output_dir) / date_folder / "txt"
  241. if not txt_dir.exists():
  242. return True
  243. files = sorted([f for f in txt_dir.iterdir() if f.suffix == ".txt"])
  244. return len(files) <= 1