frequency.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. # coding=utf-8
  2. """
  3. 频率词配置加载模块
  4. 负责从配置文件加载频率词规则,支持:
  5. - 普通词组
  6. - 必须词(+前缀)
  7. - 过滤词(!前缀)
  8. - 全局过滤词([GLOBAL_FILTER] 区域)
  9. - 最大显示数量(@前缀)
  10. - 正则表达式(/pattern/ 语法)
  11. - 显示名称(=> 别名 语法)
  12. - 组别名([组别名] 语法,作为词组第一行)
  13. """
  14. import os
  15. import re
  16. from pathlib import Path
  17. from typing import Dict, List, Tuple, Optional, Union
  18. def _parse_word(word: str) -> Dict:
  19. """
  20. 解析单个词,识别是否为正则表达式,支持显示名称
  21. Args:
  22. word: 原始配置行 (e.g. "/京东|刘强东/ => 京东")
  23. Returns:
  24. Dict: 包含 word, is_regex, pattern, display_name
  25. """
  26. display_name = None
  27. # 1. 优先处理显示名称 (=>)
  28. # 先切分出 "配置内容" 和 "显示名称"
  29. if '=>' in word:
  30. parts = re.split(r'\s*=>\s*', word, 1)
  31. word_config = parts[0].strip()
  32. # 只有当 => 右边有内容时才作为 display_name
  33. if len(parts) > 1 and parts[1].strip():
  34. display_name = parts[1].strip()
  35. else:
  36. word_config = word.strip()
  37. # 2. 解析正则表达式
  38. # 规则:以 / 开头,以 / 结尾(可能跟 flags),中间内容贪婪提取
  39. # [a-z]*$ 表示允许末尾有 flags (如 i, g),但在下面代码中会被忽略
  40. regex_match = re.match(r'^/(.+)/[a-z]*$', word_config)
  41. if regex_match:
  42. pattern_str = regex_match.group(1)
  43. try:
  44. pattern = re.compile(pattern_str, re.IGNORECASE)
  45. return {
  46. "word": pattern_str,
  47. "is_regex": True,
  48. "pattern": pattern,
  49. "display_name": display_name,
  50. }
  51. except re.error as e:
  52. print(f"Warning: Invalid regex pattern '/{pattern_str}/': {e}")
  53. pass
  54. return {
  55. "word": word_config,
  56. "is_regex": False,
  57. "pattern": None,
  58. "display_name": display_name
  59. }
  60. def _word_matches(word_config: Union[str, Dict], title_lower: str) -> bool:
  61. """
  62. 检查词是否在标题中匹配
  63. Args:
  64. word_config: 词配置(字符串或字典)
  65. title_lower: 小写的标题
  66. Returns:
  67. 是否匹配
  68. """
  69. if isinstance(word_config, str):
  70. # 向后兼容:纯字符串
  71. return word_config.lower() in title_lower
  72. if word_config.get("is_regex") and word_config.get("pattern"):
  73. # 正则匹配
  74. return bool(word_config["pattern"].search(title_lower))
  75. else:
  76. # 子字符串匹配
  77. return word_config["word"].lower() in title_lower
  78. def load_frequency_words(
  79. frequency_file: Optional[str] = None,
  80. ) -> Tuple[List[Dict], List[str], List[str]]:
  81. """
  82. 加载频率词配置
  83. 配置文件格式说明:
  84. - 每个词组由空行分隔
  85. - [GLOBAL_FILTER] 区域定义全局过滤词
  86. - [WORD_GROUPS] 区域定义词组(默认)
  87. 词组语法:
  88. - 普通词:直接写入,任意匹配即可
  89. - +词:必须词,所有必须词都要匹配
  90. - !词:过滤词,匹配则排除
  91. - @数字:该词组最多显示的条数
  92. Args:
  93. frequency_file: 频率词配置文件路径,默认从环境变量 FREQUENCY_WORDS_PATH 获取或使用 config/frequency_words.txt,短文件名从 config/custom/keyword/ 查找
  94. Returns:
  95. (词组列表, 词组内过滤词, 全局过滤词)
  96. Raises:
  97. FileNotFoundError: 频率词文件不存在
  98. """
  99. if frequency_file is None:
  100. frequency_file = os.environ.get(
  101. "FREQUENCY_WORDS_PATH", "config/frequency_words.txt"
  102. )
  103. frequency_path = Path(frequency_file)
  104. if not frequency_path.exists():
  105. # 尝试作为短文件名,拼接 config/custom/keyword/ 前缀
  106. custom_path = Path("config/custom/keyword") / frequency_file
  107. if custom_path.exists():
  108. frequency_path = custom_path
  109. else:
  110. raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在")
  111. with open(frequency_path, "r", encoding="utf-8") as f:
  112. content = f.read()
  113. word_groups = [group.strip() for group in content.split("\n\n") if group.strip()]
  114. processed_groups = []
  115. filter_words = []
  116. global_filters = []
  117. # 默认区域(向后兼容)
  118. current_section = "WORD_GROUPS"
  119. for group in word_groups:
  120. # 过滤空行和注释行(# 开头)
  121. lines = [line.strip() for line in group.split("\n") if line.strip() and not line.strip().startswith("#")]
  122. if not lines:
  123. continue
  124. # 检查是否为区域标记
  125. if lines[0].startswith("[") and lines[0].endswith("]"):
  126. section_name = lines[0][1:-1].upper()
  127. if section_name in ("GLOBAL_FILTER", "WORD_GROUPS"):
  128. current_section = section_name
  129. lines = lines[1:] # 移除标记行
  130. # 处理全局过滤区域
  131. if current_section == "GLOBAL_FILTER":
  132. # 直接添加所有非空行到全局过滤列表
  133. for line in lines:
  134. # 忽略特殊语法前缀,只提取纯文本
  135. if line.startswith(("!", "+", "@")):
  136. continue # 全局过滤区不支持特殊语法
  137. if line:
  138. global_filters.append(line)
  139. continue
  140. # 处理词组区域
  141. words = lines
  142. group_alias = None # 组别名([别名] 语法)
  143. # 检查第一行是否为组别名(非区域标记)
  144. if words and words[0].startswith("[") and words[0].endswith("]"):
  145. potential_alias = words[0][1:-1].strip()
  146. # 排除区域标记(GLOBAL_FILTER, WORD_GROUPS)
  147. if potential_alias.upper() not in ("GLOBAL_FILTER", "WORD_GROUPS"):
  148. group_alias = potential_alias
  149. words = words[1:] # 移除组别名行
  150. group_required_words = []
  151. group_normal_words = []
  152. group_max_count = 0 # 默认不限制
  153. for word in words:
  154. if word.startswith("@"):
  155. # 解析最大显示数量(只接受正整数)
  156. try:
  157. count = int(word[1:])
  158. if count > 0:
  159. group_max_count = count
  160. except (ValueError, IndexError):
  161. pass # 忽略无效的@数字格式
  162. elif word.startswith("!"):
  163. # 过滤词(支持正则语法)
  164. filter_word = word[1:]
  165. parsed = _parse_word(filter_word)
  166. filter_words.append(parsed)
  167. elif word.startswith("+"):
  168. # 必须词(支持正则语法)
  169. req_word = word[1:]
  170. group_required_words.append(_parse_word(req_word))
  171. else:
  172. # 普通词(支持正则语法)
  173. group_normal_words.append(_parse_word(word))
  174. if group_required_words or group_normal_words:
  175. if group_normal_words:
  176. group_key = " ".join(w["word"] for w in group_normal_words)
  177. else:
  178. group_key = " ".join(w["word"] for w in group_required_words)
  179. # 生成显示名称
  180. # 优先级:组别名 > 行别名拼接 > 关键词拼接
  181. if group_alias:
  182. # 有组别名,直接使用
  183. display_name = group_alias
  184. else:
  185. # 没有组别名,拼接每行的显示名(行别名或关键词本身)
  186. all_words = group_normal_words + group_required_words
  187. display_parts = []
  188. for w in all_words:
  189. # 优先使用行别名,否则使用关键词本身
  190. part = w.get("display_name") or w["word"]
  191. display_parts.append(part)
  192. # 用 " / " 拼接多个词
  193. display_name = " / ".join(display_parts) if display_parts else None
  194. processed_groups.append(
  195. {
  196. "required": group_required_words,
  197. "normal": group_normal_words,
  198. "group_key": group_key,
  199. "display_name": display_name, # 可能为 None
  200. "max_count": group_max_count,
  201. }
  202. )
  203. return processed_groups, filter_words, global_filters
  204. def matches_word_groups(
  205. title: str,
  206. word_groups: List[Dict],
  207. filter_words: List,
  208. global_filters: Optional[List[str]] = None
  209. ) -> bool:
  210. """
  211. 检查标题是否匹配词组规则
  212. Args:
  213. title: 标题文本
  214. word_groups: 词组列表
  215. filter_words: 过滤词列表(可以是字符串列表或字典列表)
  216. global_filters: 全局过滤词列表
  217. Returns:
  218. 是否匹配
  219. """
  220. # 防御性类型检查:确保 title 是有效字符串
  221. if not isinstance(title, str):
  222. title = str(title) if title is not None else ""
  223. if not title.strip():
  224. return False
  225. title_lower = title.lower()
  226. # 全局过滤检查(优先级最高)
  227. if global_filters:
  228. if any(global_word.lower() in title_lower for global_word in global_filters):
  229. return False
  230. # 如果没有配置词组,则匹配所有标题(支持显示全部新闻)
  231. if not word_groups:
  232. return True
  233. # 过滤词检查(兼容新旧格式)
  234. for filter_item in filter_words:
  235. if _word_matches(filter_item, title_lower):
  236. return False
  237. # 词组匹配检查
  238. for group in word_groups:
  239. required_words = group["required"]
  240. normal_words = group["normal"]
  241. # 必须词检查
  242. if required_words:
  243. all_required_present = all(
  244. _word_matches(req_item, title_lower) for req_item in required_words
  245. )
  246. if not all_required_present:
  247. continue
  248. # 普通词检查
  249. if normal_words:
  250. any_normal_present = any(
  251. _word_matches(normal_item, title_lower) for normal_item in normal_words
  252. )
  253. if not any_normal_present:
  254. continue
  255. return True
  256. return False