frequency.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # coding=utf-8
  2. """
  3. 频率词配置加载模块
  4. 负责从配置文件加载频率词规则,支持:
  5. - 普通词组
  6. - 必须词(+前缀)
  7. - 过滤词(!前缀)
  8. - 全局过滤词([GLOBAL_FILTER] 区域)
  9. - 最大显示数量(@前缀)
  10. """
  11. import os
  12. from pathlib import Path
  13. from typing import Dict, List, Tuple, Optional
  14. def load_frequency_words(
  15. frequency_file: Optional[str] = None,
  16. ) -> Tuple[List[Dict], List[str], List[str]]:
  17. """
  18. 加载频率词配置
  19. 配置文件格式说明:
  20. - 每个词组由空行分隔
  21. - [GLOBAL_FILTER] 区域定义全局过滤词
  22. - [WORD_GROUPS] 区域定义词组(默认)
  23. 词组语法:
  24. - 普通词:直接写入,任意匹配即可
  25. - +词:必须词,所有必须词都要匹配
  26. - !词:过滤词,匹配则排除
  27. - @数字:该词组最多显示的条数
  28. Args:
  29. frequency_file: 频率词配置文件路径,默认从环境变量 FREQUENCY_WORDS_PATH 获取或使用 config/frequency_words.txt
  30. Returns:
  31. (词组列表, 词组内过滤词, 全局过滤词)
  32. Raises:
  33. FileNotFoundError: 频率词文件不存在
  34. """
  35. if frequency_file is None:
  36. frequency_file = os.environ.get(
  37. "FREQUENCY_WORDS_PATH", "config/frequency_words.txt"
  38. )
  39. frequency_path = Path(frequency_file)
  40. if not frequency_path.exists():
  41. raise FileNotFoundError(f"频率词文件 {frequency_file} 不存在")
  42. with open(frequency_path, "r", encoding="utf-8") as f:
  43. content = f.read()
  44. word_groups = [group.strip() for group in content.split("\n\n") if group.strip()]
  45. processed_groups = []
  46. filter_words = []
  47. global_filters = []
  48. # 默认区域(向后兼容)
  49. current_section = "WORD_GROUPS"
  50. for group in word_groups:
  51. lines = [line.strip() for line in group.split("\n") if line.strip()]
  52. if not lines:
  53. continue
  54. # 检查是否为区域标记
  55. if lines[0].startswith("[") and lines[0].endswith("]"):
  56. section_name = lines[0][1:-1].upper()
  57. if section_name in ("GLOBAL_FILTER", "WORD_GROUPS"):
  58. current_section = section_name
  59. lines = lines[1:] # 移除标记行
  60. # 处理全局过滤区域
  61. if current_section == "GLOBAL_FILTER":
  62. # 直接添加所有非空行到全局过滤列表
  63. for line in lines:
  64. # 忽略特殊语法前缀,只提取纯文本
  65. if line.startswith(("!", "+", "@")):
  66. continue # 全局过滤区不支持特殊语法
  67. if line:
  68. global_filters.append(line)
  69. continue
  70. # 处理词组区域
  71. words = lines
  72. group_required_words = []
  73. group_normal_words = []
  74. group_filter_words = []
  75. group_max_count = 0 # 默认不限制
  76. for word in words:
  77. if word.startswith("@"):
  78. # 解析最大显示数量(只接受正整数)
  79. try:
  80. count = int(word[1:])
  81. if count > 0:
  82. group_max_count = count
  83. except (ValueError, IndexError):
  84. pass # 忽略无效的@数字格式
  85. elif word.startswith("!"):
  86. filter_words.append(word[1:])
  87. group_filter_words.append(word[1:])
  88. elif word.startswith("+"):
  89. group_required_words.append(word[1:])
  90. else:
  91. group_normal_words.append(word)
  92. if group_required_words or group_normal_words:
  93. if group_normal_words:
  94. group_key = " ".join(group_normal_words)
  95. else:
  96. group_key = " ".join(group_required_words)
  97. processed_groups.append(
  98. {
  99. "required": group_required_words,
  100. "normal": group_normal_words,
  101. "group_key": group_key,
  102. "max_count": group_max_count,
  103. }
  104. )
  105. return processed_groups, filter_words, global_filters
  106. def matches_word_groups(
  107. title: str,
  108. word_groups: List[Dict],
  109. filter_words: List[str],
  110. global_filters: Optional[List[str]] = None
  111. ) -> bool:
  112. """
  113. 检查标题是否匹配词组规则
  114. Args:
  115. title: 标题文本
  116. word_groups: 词组列表
  117. filter_words: 过滤词列表
  118. global_filters: 全局过滤词列表
  119. Returns:
  120. 是否匹配
  121. """
  122. # 防御性类型检查:确保 title 是有效字符串
  123. if not isinstance(title, str):
  124. title = str(title) if title is not None else ""
  125. if not title.strip():
  126. return False
  127. title_lower = title.lower()
  128. # 全局过滤检查(优先级最高)
  129. if global_filters:
  130. if any(global_word.lower() in title_lower for global_word in global_filters):
  131. return False
  132. # 如果没有配置词组,则匹配所有标题(支持显示全部新闻)
  133. if not word_groups:
  134. return True
  135. # 过滤词检查
  136. if any(filter_word.lower() in title_lower for filter_word in filter_words):
  137. return False
  138. # 词组匹配检查
  139. for group in word_groups:
  140. required_words = group["required"]
  141. normal_words = group["normal"]
  142. # 必须词检查
  143. if required_words:
  144. all_required_present = all(
  145. req_word.lower() in title_lower for req_word in required_words
  146. )
  147. if not all_required_present:
  148. continue
  149. # 普通词检查
  150. if normal_words:
  151. any_normal_present = any(
  152. normal_word.lower() in title_lower for normal_word in normal_words
  153. )
  154. if not any_normal_present:
  155. continue
  156. return True
  157. return False