analyzer.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. # coding=utf-8
  2. """
  3. 统计分析模块
  4. 提供新闻统计和分析功能:
  5. - calculate_news_weight: 计算新闻权重
  6. - format_time_display: 格式化时间显示
  7. - count_word_frequency: 统计词频
  8. """
  9. from typing import Dict, List, Tuple, Optional, Callable
  10. from trendradar.core.frequency import matches_word_groups
  11. def calculate_news_weight(
  12. title_data: Dict,
  13. rank_threshold: int,
  14. weight_config: Dict,
  15. ) -> float:
  16. """
  17. 计算新闻权重,用于排序
  18. Args:
  19. title_data: 标题数据,包含 ranks 和 count
  20. rank_threshold: 排名阈值
  21. weight_config: 权重配置 {RANK_WEIGHT, FREQUENCY_WEIGHT, HOTNESS_WEIGHT}
  22. Returns:
  23. float: 计算出的权重值
  24. """
  25. ranks = title_data.get("ranks", [])
  26. if not ranks:
  27. return 0.0
  28. count = title_data.get("count", len(ranks))
  29. # 排名权重:Σ(11 - min(rank, 10)) / 出现次数
  30. rank_scores = []
  31. for rank in ranks:
  32. score = 11 - min(rank, 10)
  33. rank_scores.append(score)
  34. rank_weight = sum(rank_scores) / len(ranks) if ranks else 0
  35. # 频次权重:min(出现次数, 10) × 10
  36. frequency_weight = min(count, 10) * 10
  37. # 热度加成:高排名次数 / 总出现次数 × 100
  38. high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold)
  39. hotness_ratio = high_rank_count / len(ranks) if ranks else 0
  40. hotness_weight = hotness_ratio * 100
  41. total_weight = (
  42. rank_weight * weight_config["RANK_WEIGHT"]
  43. + frequency_weight * weight_config["FREQUENCY_WEIGHT"]
  44. + hotness_weight * weight_config["HOTNESS_WEIGHT"]
  45. )
  46. return total_weight
  47. def format_time_display(
  48. first_time: str,
  49. last_time: str,
  50. convert_time_func: Callable[[str], str],
  51. ) -> str:
  52. """
  53. 格式化时间显示(将 HH-MM 转换为 HH:MM)
  54. Args:
  55. first_time: 首次出现时间
  56. last_time: 最后出现时间
  57. convert_time_func: 时间格式转换函数
  58. Returns:
  59. str: 格式化后的时间显示字符串
  60. """
  61. if not first_time:
  62. return ""
  63. # 转换为显示格式
  64. first_display = convert_time_func(first_time)
  65. last_display = convert_time_func(last_time)
  66. if first_display == last_display or not last_display:
  67. return first_display
  68. else:
  69. return f"[{first_display} ~ {last_display}]"
  70. def count_word_frequency(
  71. results: Dict,
  72. word_groups: List[Dict],
  73. filter_words: List[str],
  74. id_to_name: Dict,
  75. title_info: Optional[Dict] = None,
  76. rank_threshold: int = 3,
  77. new_titles: Optional[Dict] = None,
  78. mode: str = "daily",
  79. global_filters: Optional[List[str]] = None,
  80. weight_config: Optional[Dict] = None,
  81. max_news_per_keyword: int = 0,
  82. sort_by_position_first: bool = False,
  83. is_first_crawl_func: Optional[Callable[[], bool]] = None,
  84. convert_time_func: Optional[Callable[[str], str]] = None,
  85. ) -> Tuple[List[Dict], int]:
  86. """
  87. 统计词频,支持必须词、频率词、过滤词、全局过滤词,并标记新增标题
  88. Args:
  89. results: 抓取结果 {source_id: {title: title_data}}
  90. word_groups: 词组配置列表
  91. filter_words: 过滤词列表
  92. id_to_name: ID 到名称的映射
  93. title_info: 标题统计信息(可选)
  94. rank_threshold: 排名阈值
  95. new_titles: 新增标题(可选)
  96. mode: 报告模式 (daily/incremental/current)
  97. global_filters: 全局过滤词(可选)
  98. weight_config: 权重配置
  99. max_news_per_keyword: 每个关键词最大显示数量
  100. sort_by_position_first: 是否优先按配置位置排序
  101. is_first_crawl_func: 检测是否是当天第一次爬取的函数
  102. convert_time_func: 时间格式转换函数
  103. Returns:
  104. Tuple[List[Dict], int]: (统计结果列表, 总标题数)
  105. """
  106. # 默认权重配置
  107. if weight_config is None:
  108. weight_config = {
  109. "RANK_WEIGHT": 0.4,
  110. "FREQUENCY_WEIGHT": 0.3,
  111. "HOTNESS_WEIGHT": 0.3,
  112. }
  113. # 默认时间转换函数
  114. if convert_time_func is None:
  115. convert_time_func = lambda x: x
  116. # 默认首次爬取检测函数
  117. if is_first_crawl_func is None:
  118. is_first_crawl_func = lambda: True
  119. # 如果没有配置词组,创建一个包含所有新闻的虚拟词组
  120. if not word_groups:
  121. print("频率词配置为空,将显示所有新闻")
  122. word_groups = [{"required": [], "normal": [], "group_key": "全部新闻"}]
  123. filter_words = [] # 清空过滤词,显示所有新闻
  124. is_first_today = is_first_crawl_func()
  125. # 确定处理的数据源和新增标记逻辑
  126. if mode == "incremental":
  127. if is_first_today:
  128. # 增量模式 + 当天第一次:处理所有新闻,都标记为新增
  129. results_to_process = results
  130. all_news_are_new = True
  131. else:
  132. # 增量模式 + 当天非第一次:只处理新增的新闻
  133. results_to_process = new_titles if new_titles else {}
  134. all_news_are_new = True
  135. elif mode == "current":
  136. # current 模式:只处理当前时间批次的新闻,但统计信息来自全部历史
  137. if title_info:
  138. latest_time = None
  139. for source_titles in title_info.values():
  140. for title_data in source_titles.values():
  141. last_time = title_data.get("last_time", "")
  142. if last_time:
  143. if latest_time is None or last_time > latest_time:
  144. latest_time = last_time
  145. # 只处理 last_time 等于最新时间的新闻
  146. if latest_time:
  147. results_to_process = {}
  148. for source_id, source_titles in results.items():
  149. if source_id in title_info:
  150. filtered_titles = {}
  151. for title, title_data in source_titles.items():
  152. if title in title_info[source_id]:
  153. info = title_info[source_id][title]
  154. if info.get("last_time") == latest_time:
  155. filtered_titles[title] = title_data
  156. if filtered_titles:
  157. results_to_process[source_id] = filtered_titles
  158. print(
  159. f"当前榜单模式:最新时间 {latest_time},筛选出 {sum(len(titles) for titles in results_to_process.values())} 条当前榜单新闻"
  160. )
  161. else:
  162. results_to_process = results
  163. else:
  164. results_to_process = results
  165. all_news_are_new = False
  166. else:
  167. # 当日汇总模式:处理所有新闻
  168. results_to_process = results
  169. all_news_are_new = False
  170. total_input_news = sum(len(titles) for titles in results.values())
  171. filter_status = (
  172. "全部显示"
  173. if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
  174. else "频率词过滤"
  175. )
  176. print(f"当日汇总模式:处理 {total_input_news} 条新闻,模式:{filter_status}")
  177. word_stats = {}
  178. total_titles = 0
  179. processed_titles = {}
  180. matched_new_count = 0
  181. if title_info is None:
  182. title_info = {}
  183. if new_titles is None:
  184. new_titles = {}
  185. for group in word_groups:
  186. group_key = group["group_key"]
  187. word_stats[group_key] = {"count": 0, "titles": {}}
  188. for source_id, titles_data in results_to_process.items():
  189. total_titles += len(titles_data)
  190. if source_id not in processed_titles:
  191. processed_titles[source_id] = {}
  192. for title, title_data in titles_data.items():
  193. if title in processed_titles.get(source_id, {}):
  194. continue
  195. # 使用统一的匹配逻辑
  196. matches_frequency_words = matches_word_groups(
  197. title, word_groups, filter_words, global_filters
  198. )
  199. if not matches_frequency_words:
  200. continue
  201. # 如果是增量模式或 current 模式第一次,统计匹配的新增新闻数量
  202. if (mode == "incremental" and all_news_are_new) or (
  203. mode == "current" and is_first_today
  204. ):
  205. matched_new_count += 1
  206. source_ranks = title_data.get("ranks", [])
  207. source_url = title_data.get("url", "")
  208. source_mobile_url = title_data.get("mobileUrl", "")
  209. # 找到匹配的词组(防御性转换确保类型安全)
  210. title_lower = str(title).lower() if not isinstance(title, str) else title.lower()
  211. for group in word_groups:
  212. required_words = group["required"]
  213. normal_words = group["normal"]
  214. # 如果是"全部新闻"模式,所有标题都匹配第一个(唯一的)词组
  215. if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻":
  216. group_key = group["group_key"]
  217. word_stats[group_key]["count"] += 1
  218. if source_id not in word_stats[group_key]["titles"]:
  219. word_stats[group_key]["titles"][source_id] = []
  220. else:
  221. # 原有的匹配逻辑
  222. if required_words:
  223. all_required_present = all(
  224. req_word.lower() in title_lower
  225. for req_word in required_words
  226. )
  227. if not all_required_present:
  228. continue
  229. if normal_words:
  230. any_normal_present = any(
  231. normal_word.lower() in title_lower
  232. for normal_word in normal_words
  233. )
  234. if not any_normal_present:
  235. continue
  236. group_key = group["group_key"]
  237. word_stats[group_key]["count"] += 1
  238. if source_id not in word_stats[group_key]["titles"]:
  239. word_stats[group_key]["titles"][source_id] = []
  240. first_time = ""
  241. last_time = ""
  242. count_info = 1
  243. ranks = source_ranks if source_ranks else []
  244. url = source_url
  245. mobile_url = source_mobile_url
  246. # 对于 current 模式,从历史统计信息中获取完整数据
  247. if (
  248. mode == "current"
  249. and title_info
  250. and source_id in title_info
  251. and title in title_info[source_id]
  252. ):
  253. info = title_info[source_id][title]
  254. first_time = info.get("first_time", "")
  255. last_time = info.get("last_time", "")
  256. count_info = info.get("count", 1)
  257. if "ranks" in info and info["ranks"]:
  258. ranks = info["ranks"]
  259. url = info.get("url", source_url)
  260. mobile_url = info.get("mobileUrl", source_mobile_url)
  261. elif (
  262. title_info
  263. and source_id in title_info
  264. and title in title_info[source_id]
  265. ):
  266. info = title_info[source_id][title]
  267. first_time = info.get("first_time", "")
  268. last_time = info.get("last_time", "")
  269. count_info = info.get("count", 1)
  270. if "ranks" in info and info["ranks"]:
  271. ranks = info["ranks"]
  272. url = info.get("url", source_url)
  273. mobile_url = info.get("mobileUrl", source_mobile_url)
  274. if not ranks:
  275. ranks = [99]
  276. time_display = format_time_display(first_time, last_time, convert_time_func)
  277. source_name = id_to_name.get(source_id, source_id)
  278. # 判断是否为新增
  279. is_new = False
  280. if all_news_are_new:
  281. # 增量模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增
  282. is_new = True
  283. elif new_titles and source_id in new_titles:
  284. # 检查是否在新增列表中
  285. new_titles_for_source = new_titles[source_id]
  286. is_new = title in new_titles_for_source
  287. word_stats[group_key]["titles"][source_id].append(
  288. {
  289. "title": title,
  290. "source_name": source_name,
  291. "first_time": first_time,
  292. "last_time": last_time,
  293. "time_display": time_display,
  294. "count": count_info,
  295. "ranks": ranks,
  296. "rank_threshold": rank_threshold,
  297. "url": url,
  298. "mobileUrl": mobile_url,
  299. "is_new": is_new,
  300. }
  301. )
  302. if source_id not in processed_titles:
  303. processed_titles[source_id] = {}
  304. processed_titles[source_id][title] = True
  305. break
  306. # 最后统一打印汇总信息
  307. if mode == "incremental":
  308. if is_first_today:
  309. total_input_news = sum(len(titles) for titles in results.values())
  310. filter_status = (
  311. "全部显示"
  312. if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
  313. else "频率词匹配"
  314. )
  315. print(
  316. f"增量模式:当天第一次爬取,{total_input_news} 条新闻中有 {matched_new_count} 条{filter_status}"
  317. )
  318. else:
  319. if new_titles:
  320. total_new_count = sum(len(titles) for titles in new_titles.values())
  321. filter_status = (
  322. "全部显示"
  323. if len(word_groups) == 1
  324. and word_groups[0]["group_key"] == "全部新闻"
  325. else "匹配频率词"
  326. )
  327. print(
  328. f"增量模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条{filter_status}"
  329. )
  330. if matched_new_count == 0 and len(word_groups) > 1:
  331. print("增量模式:没有新增新闻匹配频率词,将不会发送通知")
  332. else:
  333. print("增量模式:未检测到新增新闻")
  334. elif mode == "current":
  335. total_input_news = sum(len(titles) for titles in results_to_process.values())
  336. if is_first_today:
  337. filter_status = (
  338. "全部显示"
  339. if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
  340. else "频率词匹配"
  341. )
  342. print(
  343. f"当前榜单模式:当天第一次爬取,{total_input_news} 条当前榜单新闻中有 {matched_new_count} 条{filter_status}"
  344. )
  345. else:
  346. matched_count = sum(stat["count"] for stat in word_stats.values())
  347. filter_status = (
  348. "全部显示"
  349. if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻"
  350. else "频率词匹配"
  351. )
  352. print(
  353. f"当前榜单模式:{total_input_news} 条当前榜单新闻中有 {matched_count} 条{filter_status}"
  354. )
  355. stats = []
  356. # 创建 group_key 到位置和最大数量的映射
  357. group_key_to_position = {
  358. group["group_key"]: idx for idx, group in enumerate(word_groups)
  359. }
  360. group_key_to_max_count = {
  361. group["group_key"]: group.get("max_count", 0) for group in word_groups
  362. }
  363. for group_key, data in word_stats.items():
  364. all_titles = []
  365. for source_id, title_list in data["titles"].items():
  366. all_titles.extend(title_list)
  367. # 按权重排序
  368. sorted_titles = sorted(
  369. all_titles,
  370. key=lambda x: (
  371. -calculate_news_weight(x, rank_threshold, weight_config),
  372. min(x["ranks"]) if x["ranks"] else 999,
  373. -x["count"],
  374. ),
  375. )
  376. # 应用最大显示数量限制(优先级:单独配置 > 全局配置)
  377. group_max_count = group_key_to_max_count.get(group_key, 0)
  378. if group_max_count == 0:
  379. # 使用全局配置
  380. group_max_count = max_news_per_keyword
  381. if group_max_count > 0:
  382. sorted_titles = sorted_titles[:group_max_count]
  383. stats.append(
  384. {
  385. "word": group_key,
  386. "count": data["count"],
  387. "position": group_key_to_position.get(group_key, 999),
  388. "titles": sorted_titles,
  389. "percentage": (
  390. round(data["count"] / total_titles * 100, 2)
  391. if total_titles > 0
  392. else 0
  393. ),
  394. }
  395. )
  396. # 根据配置选择排序优先级
  397. if sort_by_position_first:
  398. # 先按配置位置,再按热点条数
  399. stats.sort(key=lambda x: (x["position"], -x["count"]))
  400. else:
  401. # 先按热点条数,再按配置位置(原逻辑)
  402. stats.sort(key=lambda x: (-x["count"], x["position"]))
  403. # 打印过滤后的匹配新闻数(与推送显示一致)
  404. matched_news_count = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0)
  405. if mode == "daily":
  406. print(f"频率词过滤后:{matched_news_count} 条新闻匹配(将显示在推送中)")
  407. return stats, total_titles