# coding=utf-8 """ 统计分析模块 提供新闻统计和分析功能: - calculate_news_weight: 计算新闻权重 - format_time_display: 格式化时间显示 - count_word_frequency: 统计词频 """ from typing import Dict, List, Tuple, Optional, Callable from trendradar.core.frequency import matches_word_groups, _word_matches from trendradar.utils.time import DEFAULT_TIMEZONE def calculate_news_weight( title_data: Dict, rank_threshold: int, weight_config: Dict, ) -> float: """ 计算新闻权重,用于排序 Args: title_data: 标题数据,包含 ranks 和 count rank_threshold: 排名阈值 weight_config: 权重配置 {RANK_WEIGHT, FREQUENCY_WEIGHT, HOTNESS_WEIGHT} Returns: float: 计算出的权重值 """ ranks = title_data.get("ranks", []) if not ranks: return 0.0 count = title_data.get("count", len(ranks)) # 排名权重:Σ(11 - min(rank, 10)) / 出现次数 rank_scores = [] for rank in ranks: score = 11 - min(rank, 10) rank_scores.append(score) rank_weight = sum(rank_scores) / len(ranks) if ranks else 0 # 频次权重:min(出现次数, 10) × 10 frequency_weight = min(count, 10) * 10 # 热度加成:高排名次数 / 总出现次数 × 100 high_rank_count = sum(1 for rank in ranks if rank <= rank_threshold) hotness_ratio = high_rank_count / len(ranks) if ranks else 0 hotness_weight = hotness_ratio * 100 total_weight = ( rank_weight * weight_config["RANK_WEIGHT"] + frequency_weight * weight_config["FREQUENCY_WEIGHT"] + hotness_weight * weight_config["HOTNESS_WEIGHT"] ) return total_weight def format_time_display( first_time: str, last_time: str, convert_time_func: Callable[[str], str], ) -> str: """ 格式化时间显示(将 HH-MM 转换为 HH:MM) Args: first_time: 首次出现时间 last_time: 最后出现时间 convert_time_func: 时间格式转换函数 Returns: str: 格式化后的时间显示字符串 """ if not first_time: return "" # 转换为显示格式 first_display = convert_time_func(first_time) last_display = convert_time_func(last_time) if first_display == last_display or not last_display: return first_display else: return f"[{first_display} ~ {last_display}]" def count_word_frequency( results: Dict, word_groups: List[Dict], filter_words: List[str], id_to_name: Dict, title_info: Optional[Dict] = None, rank_threshold: int = 3, new_titles: Optional[Dict] = None, mode: str = "daily", global_filters: Optional[List[str]] = None, weight_config: Optional[Dict] = None, max_news_per_keyword: int = 0, sort_by_position_first: bool = False, is_first_crawl_func: Optional[Callable[[], bool]] = None, convert_time_func: Optional[Callable[[str], str]] = None, quiet: bool = False, ) -> Tuple[List[Dict], int]: """ 统计词频,支持必须词、频率词、过滤词、全局过滤词,并标记新增标题 Args: results: 抓取结果 {source_id: {title: title_data}} word_groups: 词组配置列表 filter_words: 过滤词列表 id_to_name: ID 到名称的映射 title_info: 标题统计信息(可选) rank_threshold: 排名阈值 new_titles: 新增标题(可选) mode: 报告模式 (daily/incremental/current) global_filters: 全局过滤词(可选) weight_config: 权重配置 max_news_per_keyword: 每个关键词最大显示数量 sort_by_position_first: 是否优先按配置位置排序 is_first_crawl_func: 检测是否是当天第一次爬取的函数 convert_time_func: 时间格式转换函数 quiet: 是否静默模式(不打印日志) Returns: Tuple[List[Dict], int]: (统计结果列表, 总标题数) """ # 默认权重配置 if weight_config is None: weight_config = { "RANK_WEIGHT": 0.4, "FREQUENCY_WEIGHT": 0.3, "HOTNESS_WEIGHT": 0.3, } # 默认时间转换函数 if convert_time_func is None: convert_time_func = lambda x: x # 默认首次爬取检测函数 if is_first_crawl_func is None: is_first_crawl_func = lambda: True # 如果没有配置词组,创建一个包含所有新闻的虚拟词组 if not word_groups: print("频率词配置为空,将显示所有新闻") word_groups = [{"required": [], "normal": [], "group_key": "全部新闻"}] filter_words = [] # 清空过滤词,显示所有新闻 is_first_today = is_first_crawl_func() # 确定处理的数据源和新增标记逻辑 if mode == "incremental": if is_first_today: # 增量模式 + 当天第一次:处理所有新闻,都标记为新增 results_to_process = results all_news_are_new = True else: # 增量模式 + 当天非第一次:只处理新增的新闻 results_to_process = new_titles if new_titles else {} all_news_are_new = True elif mode == "current": # current 模式:只处理当前时间批次的新闻,但统计信息来自全部历史 if title_info: latest_time = None for source_titles in title_info.values(): for title_data in source_titles.values(): last_time = title_data.get("last_time", "") if last_time: if latest_time is None or last_time > latest_time: latest_time = last_time # 只处理 last_time 等于最新时间的新闻 if latest_time: results_to_process = {} for source_id, source_titles in results.items(): if source_id in title_info: filtered_titles = {} for title, title_data in source_titles.items(): if title in title_info[source_id]: info = title_info[source_id][title] if info.get("last_time") == latest_time: filtered_titles[title] = title_data if filtered_titles: results_to_process[source_id] = filtered_titles if not quiet: print( f"当前榜单模式:最新时间 {latest_time},筛选出 {sum(len(titles) for titles in results_to_process.values())} 条当前榜单新闻" ) else: results_to_process = results else: results_to_process = results all_news_are_new = False else: # 当日汇总模式:处理所有新闻 results_to_process = results all_news_are_new = False total_input_news = sum(len(titles) for titles in results.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词过滤" ) print(f"当日汇总模式:处理 {total_input_news} 条新闻,模式:{filter_status}") word_stats = {} total_titles = 0 processed_titles = {} matched_new_count = 0 if title_info is None: title_info = {} if new_titles is None: new_titles = {} for group in word_groups: group_key = group["group_key"] word_stats[group_key] = {"count": 0, "titles": {}} for source_id, titles_data in results_to_process.items(): total_titles += len(titles_data) if source_id not in processed_titles: processed_titles[source_id] = {} for title, title_data in titles_data.items(): if title in processed_titles.get(source_id, {}): continue # 使用统一的匹配逻辑 matches_frequency_words = matches_word_groups( title, word_groups, filter_words, global_filters ) if not matches_frequency_words: continue # 如果是增量模式或 current 模式第一次,统计匹配的新增新闻数量 if (mode == "incremental" and all_news_are_new) or ( mode == "current" and is_first_today ): matched_new_count += 1 source_ranks = title_data.get("ranks", []) source_url = title_data.get("url", "") source_mobile_url = title_data.get("mobileUrl", "") # 找到匹配的词组(防御性转换确保类型安全) title_lower = str(title).lower() if not isinstance(title, str) else title.lower() for group in word_groups: required_words = group["required"] normal_words = group["normal"] # 如果是"全部新闻"模式,所有标题都匹配第一个(唯一的)词组 if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻": group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] else: # 原有的匹配逻辑(支持正则语法) if required_words: all_required_present = all( _word_matches(req_item, title_lower) for req_item in required_words ) if not all_required_present: continue if normal_words: any_normal_present = any( _word_matches(normal_item, title_lower) for normal_item in normal_words ) if not any_normal_present: continue group_key = group["group_key"] word_stats[group_key]["count"] += 1 if source_id not in word_stats[group_key]["titles"]: word_stats[group_key]["titles"][source_id] = [] first_time = "" last_time = "" count_info = 1 ranks = source_ranks if source_ranks else [] url = source_url mobile_url = source_mobile_url rank_timeline = [] # 对于 current 模式,从历史统计信息中获取完整数据 if ( mode == "current" and title_info and source_id in title_info and title in title_info[source_id] ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") count_info = info.get("count", 1) if "ranks" in info and info["ranks"]: ranks = info["ranks"] url = info.get("url", source_url) mobile_url = info.get("mobileUrl", source_mobile_url) rank_timeline = info.get("rank_timeline", []) elif ( title_info and source_id in title_info and title in title_info[source_id] ): info = title_info[source_id][title] first_time = info.get("first_time", "") last_time = info.get("last_time", "") count_info = info.get("count", 1) if "ranks" in info and info["ranks"]: ranks = info["ranks"] url = info.get("url", source_url) mobile_url = info.get("mobileUrl", source_mobile_url) rank_timeline = info.get("rank_timeline", []) if not ranks: ranks = [99] time_display = format_time_display(first_time, last_time, convert_time_func) source_name = id_to_name.get(source_id, source_id) # 判断是否为新增 is_new = False if all_news_are_new: # 增量模式下所有处理的新闻都是新增,或者当天第一次的所有新闻都是新增 is_new = True elif new_titles and source_id in new_titles: # 检查是否在新增列表中 new_titles_for_source = new_titles[source_id] is_new = title in new_titles_for_source word_stats[group_key]["titles"][source_id].append( { "title": title, "source_name": source_name, "first_time": first_time, "last_time": last_time, "time_display": time_display, "count": count_info, "ranks": ranks, "rank_threshold": rank_threshold, "url": url, "mobileUrl": mobile_url, "is_new": is_new, "rank_timeline": rank_timeline, } ) if source_id not in processed_titles: processed_titles[source_id] = {} processed_titles[source_id][title] = True break # 最后统一打印汇总信息 if mode == "incremental": if is_first_today: total_input_news = sum(len(titles) for titles in results.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) if not quiet: print( f"增量模式:当天第一次爬取,{total_input_news} 条新闻中有 {matched_new_count} 条{filter_status}" ) else: if new_titles: total_new_count = sum(len(titles) for titles in new_titles.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "匹配频率词" ) if not quiet: print( f"增量模式:{total_new_count} 条新增新闻中,有 {matched_new_count} 条{filter_status}" ) if matched_new_count == 0 and len(word_groups) > 1: print("增量模式:没有新增新闻匹配频率词,将不会发送通知") else: if not quiet: print("增量模式:未检测到新增新闻") elif mode == "current": total_input_news = sum(len(titles) for titles in results_to_process.values()) if is_first_today: filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) if not quiet: print( f"当前榜单模式:当天第一次爬取,{total_input_news} 条当前榜单新闻中有 {matched_new_count} 条{filter_status}" ) else: matched_count = sum(stat["count"] for stat in word_stats.values()) filter_status = ( "全部显示" if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部新闻" else "频率词匹配" ) if not quiet: print( f"当前榜单模式:{total_input_news} 条当前榜单新闻中有 {matched_count} 条{filter_status}" ) stats = [] # 创建 group_key 到位置、最大数量、显示名称的映射 group_key_to_position = { group["group_key"]: idx for idx, group in enumerate(word_groups) } group_key_to_max_count = { group["group_key"]: group.get("max_count", 0) for group in word_groups } group_key_to_display_name = { group["group_key"]: group.get("display_name") for group in word_groups } for group_key, data in word_stats.items(): all_titles = [] for source_id, title_list in data["titles"].items(): all_titles.extend(title_list) # 按权重排序 sorted_titles = sorted( all_titles, key=lambda x: ( -calculate_news_weight(x, rank_threshold, weight_config), min(x["ranks"]) if x["ranks"] else 999, -x["count"], ), ) # 应用最大显示数量限制(优先级:单独配置 > 全局配置) group_max_count = group_key_to_max_count.get(group_key, 0) if group_max_count == 0: # 使用全局配置 group_max_count = max_news_per_keyword if group_max_count > 0: sorted_titles = sorted_titles[:group_max_count] # 优先使用 display_name,否则使用 group_key display_word = group_key_to_display_name.get(group_key) or group_key stats.append( { "word": display_word, "count": data["count"], "position": group_key_to_position.get(group_key, 999), "titles": sorted_titles, "percentage": ( round(data["count"] / total_titles * 100, 2) if total_titles > 0 else 0 ), } ) # 根据配置选择排序优先级 if sort_by_position_first: # 先按配置位置,再按热点条数 stats.sort(key=lambda x: (x["position"], -x["count"])) else: # 先按热点条数,再按配置位置(原逻辑) stats.sort(key=lambda x: (-x["count"], x["position"])) # 打印过滤后的匹配新闻数 matched_news_count = sum(len(stat["titles"]) for stat in stats if stat["count"] > 0) if not quiet and mode == "daily": print(f"当日汇总模式:处理 {total_titles} 条新闻,模式:频率词过滤") print(f"频率词过滤后:{matched_news_count} 条新闻匹配") return stats, total_titles def count_rss_frequency( rss_items: List[Dict], word_groups: List[Dict], filter_words: List[str], global_filters: Optional[List[str]] = None, new_items: Optional[List[Dict]] = None, max_news_per_keyword: int = 0, sort_by_position_first: bool = False, timezone: str = DEFAULT_TIMEZONE, rank_threshold: int = 5, quiet: bool = False, ) -> Tuple[List[Dict], int]: """ 按关键词分组统计 RSS 条目(与热榜统计格式一致) Args: rss_items: RSS 条目列表,每个条目包含: - title: 标题 - feed_id: RSS 源 ID - feed_name: RSS 源名称 - url: 文章链接 - published_at: 发布时间(ISO 格式) word_groups: 词组配置列表 filter_words: 过滤词列表 global_filters: 全局过滤词(可选) new_items: 新增条目列表(可选,用于标记 is_new) max_news_per_keyword: 每个关键词最大显示数量 sort_by_position_first: 是否优先按配置位置排序 timezone: 时区名称(用于时间格式化) quiet: 是否静默模式 Returns: Tuple[List[Dict], int]: (统计结果列表, 总条目数) 统计结果格式与热榜一致: [ { "word": "关键词", "count": 5, "position": 0, "titles": [ { "title": "标题", "source_name": "Hacker News", "time_display": "12-29 08:20", "count": 1, "ranks": [1], # RSS 用发布时间顺序作为排名 "rank_threshold": 50, "url": "...", "mobile_url": "", "is_new": True/False } ], "percentage": 10.0 } ] """ from trendradar.utils.time import format_iso_time_friendly if not rss_items: return [], 0 # 如果没有配置词组,创建一个包含所有条目的虚拟词组 if not word_groups: if not quiet: print("[RSS] 频率词配置为空,将显示所有 RSS 条目") word_groups = [{"required": [], "normal": [], "group_key": "全部 RSS"}] filter_words = [] # 创建新增条目的 URL 集合,用于快速查找 new_urls = set() if new_items: for item in new_items: if item.get("url"): new_urls.add(item["url"]) # 初始化词组统计 word_stats = {} for group in word_groups: group_key = group["group_key"] word_stats[group_key] = {"count": 0, "titles": []} total_items = len(rss_items) processed_urls = set() # 用于去重 # 为每个条目分配一个基于发布时间的"排名" # 按发布时间排序,最新的排在前面 sorted_items = sorted( rss_items, key=lambda x: x.get("published_at", ""), reverse=True ) url_to_rank = {item.get("url", ""): idx + 1 for idx, item in enumerate(sorted_items)} for item in rss_items: title = item.get("title", "") url = item.get("url", "") # 去重 if url and url in processed_urls: continue if url: processed_urls.add(url) # 使用统一的匹配逻辑 if not matches_word_groups(title, word_groups, filter_words, global_filters): continue # 找到匹配的词组 title_lower = title.lower() for group in word_groups: required_words = group["required"] normal_words = group["normal"] group_key = group["group_key"] # "全部 RSS" 模式:所有条目都匹配 if len(word_groups) == 1 and word_groups[0]["group_key"] == "全部 RSS": matched = True else: # 检查必须词(支持正则语法) if required_words: all_required_present = all( _word_matches(req_item, title_lower) for req_item in required_words ) if not all_required_present: continue # 检查普通词(支持正则语法) if normal_words: any_normal_present = any( _word_matches(normal_item, title_lower) for normal_item in normal_words ) if not any_normal_present: continue matched = True if matched: word_stats[group_key]["count"] += 1 # 格式化时间显示 published_at = item.get("published_at", "") time_display = format_iso_time_friendly(published_at, timezone, include_date=True) if published_at else "" # 判断是否为新增 is_new = url in new_urls if url else False # 获取排名(基于发布时间顺序) rank = url_to_rank.get(url, 99) if url else 99 title_data = { "title": title, "source_name": item.get("feed_name", item.get("feed_id", "RSS")), "time_display": time_display, "count": 1, # RSS 条目通常只出现一次 "ranks": [rank], "rank_threshold": rank_threshold, "url": url, "mobile_url": "", "is_new": is_new, } word_stats[group_key]["titles"].append(title_data) break # 一个条目只匹配第一个词组 # 构建统计结果 stats = [] group_key_to_position = { group["group_key"]: idx for idx, group in enumerate(word_groups) } group_key_to_max_count = { group["group_key"]: group.get("max_count", 0) for group in word_groups } group_key_to_display_name = { group["group_key"]: group.get("display_name") for group in word_groups } for group_key, data in word_stats.items(): if data["count"] == 0: continue # 按发布时间排序(最新在前) sorted_titles = sorted( data["titles"], key=lambda x: x["ranks"][0] if x["ranks"] else 999 ) # 应用最大显示数量限制 group_max_count = group_key_to_max_count.get(group_key, 0) if group_max_count == 0: group_max_count = max_news_per_keyword if group_max_count > 0: sorted_titles = sorted_titles[:group_max_count] # 优先使用 display_name,否则使用 group_key display_word = group_key_to_display_name.get(group_key) or group_key stats.append({ "word": display_word, "count": data["count"], "position": group_key_to_position.get(group_key, 999), "titles": sorted_titles, "percentage": round(data["count"] / total_items * 100, 2) if total_items > 0 else 0, }) # 排序 if sort_by_position_first: stats.sort(key=lambda x: (x["position"], -x["count"])) else: stats.sort(key=lambda x: (-x["count"], x["position"])) matched_count = sum(stat["count"] for stat in stats) if not quiet: print(f"[RSS] 关键词分组统计:{matched_count}/{total_items} 条匹配") return stats, total_items def convert_keyword_stats_to_platform_stats( keyword_stats: List[Dict], weight_config: Dict, rank_threshold: int = 5, ) -> List[Dict]: """ 将按关键词分组的统计数据转换为按平台分组的统计数据 Args: keyword_stats: 原始按关键词分组的统计数据 weight_config: 权重配置 rank_threshold: 排名阈值 Returns: 按平台分组的统计数据,格式与原 stats 一致 """ # 1. 收集所有新闻,按平台分组 platform_map: Dict[str, List[Dict]] = {} for stat in keyword_stats: keyword = stat["word"] for title_data in stat["titles"]: source_name = title_data["source_name"] if source_name not in platform_map: platform_map[source_name] = [] # 复制 title_data 并添加匹配的关键词 title_with_keyword = title_data.copy() title_with_keyword["matched_keyword"] = keyword platform_map[source_name].append(title_with_keyword) # 2. 去重(同一平台下相同标题只保留一条,保留第一个匹配的关键词) for source_name, titles in platform_map.items(): seen_titles: Dict[str, bool] = {} unique_titles = [] for title_data in titles: title_text = title_data["title"] if title_text not in seen_titles: seen_titles[title_text] = True unique_titles.append(title_data) platform_map[source_name] = unique_titles # 3. 按权重排序每个平台内的新闻 for source_name, titles in platform_map.items(): platform_map[source_name] = sorted( titles, key=lambda x: ( -calculate_news_weight(x, rank_threshold, weight_config), min(x["ranks"]) if x["ranks"] else 999, -x["count"], ), ) # 4. 构建平台统计结果 platform_stats = [] for source_name, titles in platform_map.items(): platform_stats.append({ "word": source_name, # 平台名作为分组标识 "count": len(titles), "titles": titles, "percentage": 0, # 可后续计算 }) # 5. 按新闻条数排序平台 platform_stats.sort(key=lambda x: -x["count"]) return platform_stats