frequency.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. # coding=utf-8
  2. """
  3. 频率词配置加载模块
  4. 负责从配置文件加载频率词规则,支持:
  5. - 普通词组
  6. - 必须词(+前缀)
  7. - 过滤词(!前缀)
  8. - 全局过滤词([GLOBAL_FILTER] 区域)
  9. - 最大显示数量(@前缀)
  10. - 正则表达式(/pattern/ 语法)
  11. - 显示名称(=> 备注 语法)
  12. """
  13. import os
  14. import re
  15. from pathlib import Path
  16. from typing import Dict, List, Tuple, Optional, Union
  17. def _parse_word(word: str) -> Dict:
  18. """
  19. 解析单个词,识别是否为正则表达式,支持显示名称
  20. 语法:
  21. - 普通词:word
  22. - 正则表达式:/pattern/ 或 /pattern/i(flags 会被忽略,默认已启用忽略大小写)
  23. - 带显示名称:word => 显示名称 或 word=>显示名称(=>两边空格可选)
  24. - 正则带显示名称:/pattern/ => 显示名称
  25. Args:
  26. word: 原始词
  27. Returns:
  28. {"word": str, "is_regex": bool, "pattern": Optional[re.Pattern], "display_name": Optional[str]}
  29. """
  30. display_name = None
  31. # 解析 => 显示名称 语法(支持 => 两边有或没有空格)
  32. # 使用正则匹配:空格可选的 =>
  33. display_match = re.search(r'\s*=>\s*', word)
  34. if display_match:
  35. parts = re.split(r'\s*=>\s*', word, 1)
  36. word = parts[0].strip()
  37. display_name = parts[1].strip() if len(parts) > 1 and parts[1].strip() else None
  38. # 解析正则表达式:支持 /pattern/ 或 /pattern/flags(如 /pattern/i)
  39. # flags 会被忽略,因为默认已启用 IGNORECASE
  40. regex_match = re.match(r'^/(.+)/([gimsux]*)$', word)
  41. if regex_match:
  42. pattern_str = regex_match.group(1)
  43. # flags 参数被忽略,统一使用 IGNORECASE
  44. try:
  45. pattern = re.compile(pattern_str, re.IGNORECASE)
  46. return {
  47. "word": pattern_str,
  48. "is_regex": True,
  49. "pattern": pattern,
  50. "display_name": display_name,
  51. }
  52. except re.error:
  53. # 正则表达式无效,当作普通词处理
  54. pass
  55. return {"word": word, "is_regex": False, "pattern": None, "display_name": display_name}
  56. def _word_matches(word_config: Union[str, Dict], title_lower: str) -> bool:
  57. """
  58. 检查词是否在标题中匹配
  59. Args:
  60. word_config: 词配置(字符串或字典)
  61. title_lower: 小写的标题
  62. Returns:
  63. 是否匹配
  64. """
  65. if isinstance(word_config, str):
  66. # 向后兼容:纯字符串
  67. return word_config.lower() in title_lower
  68. if word_config.get("is_regex") and word_config.get("pattern"):
  69. # 正则匹配
  70. return bool(word_config["pattern"].search(title_lower))
  71. else:
  72. # 子字符串匹配
  73. return word_config["word"].lower() in title_lower
  74. def load_frequency_words(
  75. frequency_file: Optional[str] = None,
  76. ) -> Tuple[List[Dict], List[str], List[str]]:
  77. """
  78. 加载频率词配置
  79. 配置文件格式说明:
  80. - 每个词组由空行分隔
  81. - [GLOBAL_FILTER] 区域定义全局过滤词
  82. - [WORD_GROUPS] 区域定义词组(默认)
  83. 词组语法:
  84. - 普通词:直接写入,任意匹配即可
  85. - +词:必须词,所有必须词都要匹配
  86. - !词:过滤词,匹配则排除
  87. - @数字:该词组最多显示的条数
  88. Args:
  89. frequency_file: 频率词配置文件路径,默认从环境变量 FREQUENCY_WORDS_PATH 获取或使用 config/frequency_words.txt
  90. Returns:
  91. (词组列表, 词组内过滤词, 全局过滤词)
  92. Raises:
  93. FileNotFoundError: 频率词文件不存在
  94. """
  95. if frequency_file is None:
  96. frequency_file = os.environ.get(
  97. "FREQUENCY_WORDS_PATH", "config/frequency_words.txt"
  98. )
  99. frequency_path = Path(frequency_file)
  100. if not frequency_path.exists():
  101. raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在")
  102. with open(frequency_path, "r", encoding="utf-8") as f:
  103. content = f.read()
  104. word_groups = [group.strip() for group in content.split("\n\n") if group.strip()]
  105. processed_groups = []
  106. filter_words = []
  107. global_filters = []
  108. # 默认区域(向后兼容)
  109. current_section = "WORD_GROUPS"
  110. for group in word_groups:
  111. lines = [line.strip() for line in group.split("\n") if line.strip()]
  112. if not lines:
  113. continue
  114. # 检查是否为区域标记
  115. if lines[0].startswith("[") and lines[0].endswith("]"):
  116. section_name = lines[0][1:-1].upper()
  117. if section_name in ("GLOBAL_FILTER", "WORD_GROUPS"):
  118. current_section = section_name
  119. lines = lines[1:] # 移除标记行
  120. # 处理全局过滤区域
  121. if current_section == "GLOBAL_FILTER":
  122. # 直接添加所有非空行到全局过滤列表
  123. for line in lines:
  124. # 忽略特殊语法前缀,只提取纯文本
  125. if line.startswith(("!", "+", "@")):
  126. continue # 全局过滤区不支持特殊语法
  127. if line:
  128. global_filters.append(line)
  129. continue
  130. # 处理词组区域
  131. words = lines
  132. group_required_words = []
  133. group_normal_words = []
  134. group_filter_words = []
  135. group_max_count = 0 # 默认不限制
  136. for word in words:
  137. if word.startswith("@"):
  138. # 解析最大显示数量(只接受正整数)
  139. try:
  140. count = int(word[1:])
  141. if count > 0:
  142. group_max_count = count
  143. except (ValueError, IndexError):
  144. pass # 忽略无效的@数字格式
  145. elif word.startswith("!"):
  146. # 过滤词(支持正则语法)
  147. filter_word = word[1:]
  148. parsed = _parse_word(filter_word)
  149. filter_words.append(parsed)
  150. group_filter_words.append(parsed)
  151. elif word.startswith("+"):
  152. # 必须词(支持正则语法)
  153. req_word = word[1:]
  154. group_required_words.append(_parse_word(req_word))
  155. else:
  156. # 普通词(支持正则语法)
  157. group_normal_words.append(_parse_word(word))
  158. if group_required_words or group_normal_words:
  159. if group_normal_words:
  160. group_key = " ".join(w["word"] for w in group_normal_words)
  161. else:
  162. group_key = " ".join(w["word"] for w in group_required_words)
  163. # 提取显示名称:优先使用第一个有 display_name 的词
  164. display_name = None
  165. for w in group_normal_words + group_required_words:
  166. if w.get("display_name"):
  167. display_name = w["display_name"]
  168. break
  169. processed_groups.append(
  170. {
  171. "required": group_required_words,
  172. "normal": group_normal_words,
  173. "group_key": group_key,
  174. "display_name": display_name, # 可能为 None
  175. "max_count": group_max_count,
  176. }
  177. )
  178. return processed_groups, filter_words, global_filters
  179. def matches_word_groups(
  180. title: str,
  181. word_groups: List[Dict],
  182. filter_words: List,
  183. global_filters: Optional[List[str]] = None
  184. ) -> bool:
  185. """
  186. 检查标题是否匹配词组规则
  187. Args:
  188. title: 标题文本
  189. word_groups: 词组列表
  190. filter_words: 过滤词列表(可以是字符串列表或字典列表)
  191. global_filters: 全局过滤词列表
  192. Returns:
  193. 是否匹配
  194. """
  195. # 防御性类型检查:确保 title 是有效字符串
  196. if not isinstance(title, str):
  197. title = str(title) if title is not None else ""
  198. if not title.strip():
  199. return False
  200. title_lower = title.lower()
  201. # 全局过滤检查(优先级最高)
  202. if global_filters:
  203. if any(global_word.lower() in title_lower for global_word in global_filters):
  204. return False
  205. # 如果没有配置词组,则匹配所有标题(支持显示全部新闻)
  206. if not word_groups:
  207. return True
  208. # 过滤词检查(兼容新旧格式)
  209. for filter_item in filter_words:
  210. if _word_matches(filter_item, title_lower):
  211. return False
  212. # 词组匹配检查
  213. for group in word_groups:
  214. required_words = group["required"]
  215. normal_words = group["normal"]
  216. # 必须词检查
  217. if required_words:
  218. all_required_present = all(
  219. _word_matches(req_item, title_lower) for req_item in required_words
  220. )
  221. if not all_required_present:
  222. continue
  223. # 普通词检查
  224. if normal_words:
  225. any_normal_present = any(
  226. _word_matches(normal_item, title_lower) for normal_item in normal_words
  227. )
  228. if not any_normal_present:
  229. continue
  230. return True
  231. return False