# coding=utf-8 """ 频率词配置加载模块 负责从配置文件加载频率词规则,支持: - 普通词组 - 必须词(+前缀) - 过滤词(!前缀) - 全局过滤词([GLOBAL_FILTER] 区域) - 最大显示数量(@前缀) - 正则表达式(/pattern/ 语法) - 显示名称(=> 别名 语法) - 组别名([组别名] 语法,作为词组第一行) """ import os import re from pathlib import Path from typing import Dict, List, Tuple, Optional, Union def _parse_word(word: str) -> Dict: """ 解析单个词,识别是否为正则表达式,支持显示名称 Args: word: 原始配置行 (e.g. "/京东|刘强东/ => 京东") Returns: Dict: 包含 word, is_regex, pattern, display_name """ display_name = None # 1. 优先处理显示名称 (=>) # 先切分出 "配置内容" 和 "显示名称" if '=>' in word: parts = re.split(r'\s*=>\s*', word, 1) word_config = parts[0].strip() # 只有当 => 右边有内容时才作为 display_name if len(parts) > 1 and parts[1].strip(): display_name = parts[1].strip() else: word_config = word.strip() # 2. 解析正则表达式 # 规则:以 / 开头,以 / 结尾(可能跟 flags),中间内容贪婪提取 # [a-z]*$ 表示允许末尾有 flags (如 i, g),但在下面代码中会被忽略 regex_match = re.match(r'^/(.+)/[a-z]*$', word_config) if regex_match: pattern_str = regex_match.group(1) try: pattern = re.compile(pattern_str, re.IGNORECASE) return { "word": pattern_str, "is_regex": True, "pattern": pattern, "display_name": display_name, } except re.error as e: print(f"Warning: Invalid regex pattern '/{pattern_str}/': {e}") pass return { "word": word_config, "is_regex": False, "pattern": None, "display_name": display_name } def _word_matches(word_config: Union[str, Dict], title_lower: str) -> bool: """ 检查词是否在标题中匹配 Args: word_config: 词配置(字符串或字典) title_lower: 小写的标题 Returns: 是否匹配 """ if isinstance(word_config, str): # 向后兼容:纯字符串 return word_config.lower() in title_lower if word_config.get("is_regex") and word_config.get("pattern"): # 正则匹配 return bool(word_config["pattern"].search(title_lower)) else: # 子字符串匹配 return word_config["word"].lower() in title_lower def load_frequency_words( frequency_file: Optional[str] = None, ) -> Tuple[List[Dict], List[str], List[str]]: """ 加载频率词配置 配置文件格式说明: - 每个词组由空行分隔 - [GLOBAL_FILTER] 区域定义全局过滤词 - [WORD_GROUPS] 区域定义词组(默认) 词组语法: - 普通词:直接写入,任意匹配即可 - +词:必须词,所有必须词都要匹配 - !词:过滤词,匹配则排除 - @数字:该词组最多显示的条数 Args: frequency_file: 频率词配置文件路径,默认从环境变量 FREQUENCY_WORDS_PATH 获取或使用 config/frequency_words.txt,短文件名从 config/custom/keyword/ 查找 Returns: (词组列表, 词组内过滤词, 全局过滤词) Raises: FileNotFoundError: 频率词文件不存在 """ if frequency_file is None: frequency_file = os.environ.get( "FREQUENCY_WORDS_PATH", "config/frequency_words.txt" ) frequency_path = Path(frequency_file) if not frequency_path.exists(): # 尝试作为短文件名,拼接 config/custom/keyword/ 前缀 custom_path = Path("config/custom/keyword") / frequency_file if custom_path.exists(): frequency_path = custom_path else: raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在") with open(frequency_path, "r", encoding="utf-8") as f: content = f.read() word_groups = [group.strip() for group in content.split("\n\n") if group.strip()] processed_groups = [] filter_words = [] global_filters = [] # 默认区域(向后兼容) current_section = "WORD_GROUPS" for group in word_groups: # 过滤空行和注释行(# 开头) lines = [line.strip() for line in group.split("\n") if line.strip() and not line.strip().startswith("#")] if not lines: continue # 检查是否为区域标记 if lines[0].startswith("[") and lines[0].endswith("]"): section_name = lines[0][1:-1].upper() if section_name in ("GLOBAL_FILTER", "WORD_GROUPS"): current_section = section_name lines = lines[1:] # 移除标记行 # 处理全局过滤区域 if current_section == "GLOBAL_FILTER": # 直接添加所有非空行到全局过滤列表 for line in lines: # 忽略特殊语法前缀,只提取纯文本 if line.startswith(("!", "+", "@")): continue # 全局过滤区不支持特殊语法 if line: global_filters.append(line) continue # 处理词组区域 words = lines group_alias = None # 组别名([别名] 语法) # 检查第一行是否为组别名(非区域标记) if words and words[0].startswith("[") and words[0].endswith("]"): potential_alias = words[0][1:-1].strip() # 排除区域标记(GLOBAL_FILTER, WORD_GROUPS) if potential_alias.upper() not in ("GLOBAL_FILTER", "WORD_GROUPS"): group_alias = potential_alias words = words[1:] # 移除组别名行 group_required_words = [] group_normal_words = [] group_max_count = 0 # 默认不限制 for word in words: if word.startswith("@"): # 解析最大显示数量(只接受正整数) try: count = int(word[1:]) if count > 0: group_max_count = count except (ValueError, IndexError): pass # 忽略无效的@数字格式 elif word.startswith("!"): # 过滤词(支持正则语法) filter_word = word[1:] parsed = _parse_word(filter_word) filter_words.append(parsed) elif word.startswith("+"): # 必须词(支持正则语法) req_word = word[1:] group_required_words.append(_parse_word(req_word)) else: # 普通词(支持正则语法) group_normal_words.append(_parse_word(word)) if group_required_words or group_normal_words: if group_normal_words: group_key = " ".join(w["word"] for w in group_normal_words) else: group_key = " ".join(w["word"] for w in group_required_words) # 生成显示名称 # 优先级:组别名 > 行别名拼接 > 关键词拼接 if group_alias: # 有组别名,直接使用 display_name = group_alias else: # 没有组别名,拼接每行的显示名(行别名或关键词本身) all_words = group_normal_words + group_required_words display_parts = [] for w in all_words: # 优先使用行别名,否则使用关键词本身 part = w.get("display_name") or w["word"] display_parts.append(part) # 用 " / " 拼接多个词 display_name = " / ".join(display_parts) if display_parts else None processed_groups.append( { "required": group_required_words, "normal": group_normal_words, "group_key": group_key, "display_name": display_name, # 可能为 None "max_count": group_max_count, } ) return processed_groups, filter_words, global_filters def matches_word_groups( title: str, word_groups: List[Dict], filter_words: List, global_filters: Optional[List[str]] = None ) -> bool: """ 检查标题是否匹配词组规则 Args: title: 标题文本 word_groups: 词组列表 filter_words: 过滤词列表(可以是字符串列表或字典列表) global_filters: 全局过滤词列表 Returns: 是否匹配 """ # 防御性类型检查:确保 title 是有效字符串 if not isinstance(title, str): title = str(title) if title is not None else "" if not title.strip(): return False title_lower = title.lower() # 全局过滤检查(优先级最高) if global_filters: if any(global_word.lower() in title_lower for global_word in global_filters): return False # 如果没有配置词组,则匹配所有标题(支持显示全部新闻) if not word_groups: return True # 过滤词检查(兼容新旧格式) for filter_item in filter_words: if _word_matches(filter_item, title_lower): return False # 词组匹配检查 for group in word_groups: required_words = group["required"] normal_words = group["normal"] # 必须词检查 if required_words: all_required_present = all( _word_matches(req_item, title_lower) for req_item in required_words ) if not all_required_present: continue # 普通词检查 if normal_words: any_normal_present = any( _word_matches(normal_item, title_lower) for normal_item in normal_words ) if not any_normal_present: continue return True return False