parser_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. """
  2. 文件解析服务
  3. 提供txt格式新闻数据和YAML配置文件的解析功能。
  4. """
  5. import re
  6. from pathlib import Path
  7. from typing import Dict, List, Tuple, Optional
  8. from datetime import datetime
  9. import yaml
  10. from ..utils.errors import FileParseError, DataNotFoundError
  11. from .cache_service import get_cache
  12. class ParserService:
  13. """文件解析服务类"""
  14. def __init__(self, project_root: str = None):
  15. """
  16. 初始化解析服务
  17. Args:
  18. project_root: 项目根目录,默认为当前目录的父目录
  19. """
  20. if project_root is None:
  21. # 获取当前文件所在目录的父目录的父目录
  22. current_file = Path(__file__)
  23. self.project_root = current_file.parent.parent.parent
  24. else:
  25. self.project_root = Path(project_root)
  26. # 初始化缓存服务
  27. self.cache = get_cache()
  28. @staticmethod
  29. def clean_title(title: str) -> str:
  30. """
  31. 清理标题文本
  32. Args:
  33. title: 原始标题
  34. Returns:
  35. 清理后的标题
  36. """
  37. # 移除多余空白
  38. title = re.sub(r'\s+', ' ', title)
  39. # 移除特殊字符
  40. title = title.strip()
  41. return title
  42. def parse_txt_file(self, file_path: Path) -> Tuple[Dict, Dict]:
  43. """
  44. 解析单个txt文件的标题数据
  45. Args:
  46. file_path: txt文件路径
  47. Returns:
  48. (titles_by_id, id_to_name) 元组
  49. - titles_by_id: {platform_id: {title: {ranks, url, mobileUrl}}}
  50. - id_to_name: {platform_id: platform_name}
  51. Raises:
  52. FileParseError: 文件解析错误
  53. """
  54. if not file_path.exists():
  55. raise FileParseError(str(file_path), "文件不存在")
  56. titles_by_id = {}
  57. id_to_name = {}
  58. try:
  59. with open(file_path, "r", encoding="utf-8") as f:
  60. content = f.read()
  61. sections = content.split("\n\n")
  62. for section in sections:
  63. if not section.strip() or "==== 以下ID请求失败 ====" in section:
  64. continue
  65. lines = section.strip().split("\n")
  66. if len(lines) < 2:
  67. continue
  68. # 解析header: id | name 或 id
  69. header_line = lines[0].strip()
  70. if " | " in header_line:
  71. parts = header_line.split(" | ", 1)
  72. source_id = parts[0].strip()
  73. name = parts[1].strip()
  74. id_to_name[source_id] = name
  75. else:
  76. source_id = header_line
  77. id_to_name[source_id] = source_id
  78. titles_by_id[source_id] = {}
  79. # 解析标题行
  80. for line in lines[1:]:
  81. if line.strip():
  82. try:
  83. title_part = line.strip()
  84. rank = None
  85. # 提取排名
  86. if ". " in title_part and title_part.split(". ")[0].isdigit():
  87. rank_str, title_part = title_part.split(". ", 1)
  88. rank = int(rank_str)
  89. # 提取 MOBILE URL
  90. mobile_url = ""
  91. if " [MOBILE:" in title_part:
  92. title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
  93. if mobile_part.endswith("]"):
  94. mobile_url = mobile_part[:-1]
  95. # 提取 URL
  96. url = ""
  97. if " [URL:" in title_part:
  98. title_part, url_part = title_part.rsplit(" [URL:", 1)
  99. if url_part.endswith("]"):
  100. url = url_part[:-1]
  101. title = self.clean_title(title_part.strip())
  102. ranks = [rank] if rank is not None else [1]
  103. titles_by_id[source_id][title] = {
  104. "ranks": ranks,
  105. "url": url,
  106. "mobileUrl": mobile_url,
  107. }
  108. except Exception as e:
  109. # 忽略单行解析错误
  110. continue
  111. except Exception as e:
  112. raise FileParseError(str(file_path), str(e))
  113. return titles_by_id, id_to_name
  114. def get_date_folder_name(self, date: datetime = None) -> str:
  115. """
  116. 获取日期文件夹名称
  117. Args:
  118. date: 日期对象,默认为今天
  119. Returns:
  120. 文件夹名称,格式: YYYY年MM月DD日
  121. """
  122. if date is None:
  123. date = datetime.now()
  124. return date.strftime("%Y年%m月%d日")
  125. def read_all_titles_for_date(
  126. self,
  127. date: datetime = None,
  128. platform_ids: Optional[List[str]] = None
  129. ) -> Tuple[Dict, Dict, Dict]:
  130. """
  131. 读取指定日期的所有标题文件(带缓存)
  132. Args:
  133. date: 日期对象,默认为今天
  134. platform_ids: 平台ID列表,None表示所有平台
  135. Returns:
  136. (all_titles, id_to_name, all_timestamps) 元组
  137. - all_titles: {platform_id: {title: {ranks, url, mobileUrl, ...}}}
  138. - id_to_name: {platform_id: platform_name}
  139. - all_timestamps: {filename: timestamp}
  140. Raises:
  141. DataNotFoundError: 数据不存在
  142. """
  143. # 生成缓存键
  144. date_str = self.get_date_folder_name(date)
  145. platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
  146. cache_key = f"read_all_titles:{date_str}:{platform_key}"
  147. # 尝试从缓存获取
  148. # 对于历史数据(非今天),使用更长的缓存时间(1小时)
  149. # 对于今天的数据,使用较短的缓存时间(15分钟),因为可能有新数据
  150. is_today = (date is None) or (date.date() == datetime.now().date())
  151. ttl = 900 if is_today else 3600 # 15分钟 vs 1小时
  152. cached = self.cache.get(cache_key, ttl=ttl)
  153. if cached:
  154. return cached
  155. # 缓存未命中,读取文件
  156. date_folder = self.get_date_folder_name(date)
  157. txt_dir = self.project_root / "output" / date_folder / "txt"
  158. if not txt_dir.exists():
  159. raise DataNotFoundError(
  160. f"未找到 {date_folder} 的数据目录",
  161. suggestion="请先运行爬虫或检查日期是否正确"
  162. )
  163. all_titles = {}
  164. id_to_name = {}
  165. all_timestamps = {}
  166. # 读取所有txt文件
  167. txt_files = sorted(txt_dir.glob("*.txt"))
  168. if not txt_files:
  169. raise DataNotFoundError(
  170. f"{date_folder} 没有数据文件",
  171. suggestion="请等待爬虫任务完成"
  172. )
  173. for txt_file in txt_files:
  174. try:
  175. titles_by_id, file_id_to_name = self.parse_txt_file(txt_file)
  176. # 更新id_to_name
  177. id_to_name.update(file_id_to_name)
  178. # 合并标题数据
  179. for platform_id, titles in titles_by_id.items():
  180. # 如果指定了平台过滤
  181. if platform_ids and platform_id not in platform_ids:
  182. continue
  183. if platform_id not in all_titles:
  184. all_titles[platform_id] = {}
  185. for title, info in titles.items():
  186. if title in all_titles[platform_id]:
  187. # 合并排名
  188. all_titles[platform_id][title]["ranks"].extend(info["ranks"])
  189. else:
  190. all_titles[platform_id][title] = info.copy()
  191. # 记录文件时间戳
  192. all_timestamps[txt_file.name] = txt_file.stat().st_mtime
  193. except Exception as e:
  194. # 忽略单个文件的解析错误,继续处理其他文件
  195. print(f"Warning: 解析文件 {txt_file} 失败: {e}")
  196. continue
  197. if not all_titles:
  198. raise DataNotFoundError(
  199. f"{date_folder} 没有有效的数据",
  200. suggestion="请检查数据文件格式或重新运行爬虫"
  201. )
  202. # 缓存结果
  203. result = (all_titles, id_to_name, all_timestamps)
  204. self.cache.set(cache_key, result)
  205. return result
  206. def parse_yaml_config(self, config_path: str = None) -> dict:
  207. """
  208. 解析YAML配置文件
  209. Args:
  210. config_path: 配置文件路径,默认为 config/config.yaml
  211. Returns:
  212. 配置字典
  213. Raises:
  214. FileParseError: 配置文件解析错误
  215. """
  216. if config_path is None:
  217. config_path = self.project_root / "config" / "config.yaml"
  218. else:
  219. config_path = Path(config_path)
  220. if not config_path.exists():
  221. raise FileParseError(str(config_path), "配置文件不存在")
  222. try:
  223. with open(config_path, "r", encoding="utf-8") as f:
  224. config_data = yaml.safe_load(f)
  225. return config_data
  226. except Exception as e:
  227. raise FileParseError(str(config_path), str(e))
  228. def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
  229. """
  230. 解析关键词配置文件
  231. Args:
  232. words_file: 关键词文件路径,默认为 config/frequency_words.txt
  233. Returns:
  234. 词组列表
  235. Raises:
  236. FileParseError: 文件解析错误
  237. """
  238. if words_file is None:
  239. words_file = self.project_root / "config" / "frequency_words.txt"
  240. else:
  241. words_file = Path(words_file)
  242. if not words_file.exists():
  243. return []
  244. word_groups = []
  245. try:
  246. with open(words_file, "r", encoding="utf-8") as f:
  247. for line in f:
  248. line = line.strip()
  249. if not line or line.startswith("#"):
  250. continue
  251. # 使用 | 分隔符
  252. parts = [p.strip() for p in line.split("|")]
  253. if not parts:
  254. continue
  255. group = {
  256. "required": [],
  257. "normal": [],
  258. "filter_words": []
  259. }
  260. for part in parts:
  261. if not part:
  262. continue
  263. words = [w.strip() for w in part.split(",")]
  264. for word in words:
  265. if not word:
  266. continue
  267. if word.endswith("+"):
  268. # 必须词
  269. group["required"].append(word[:-1])
  270. elif word.endswith("!"):
  271. # 过滤词
  272. group["filter_words"].append(word[:-1])
  273. else:
  274. # 普通词
  275. group["normal"].append(word)
  276. if group["required"] or group["normal"]:
  277. word_groups.append(group)
  278. except Exception as e:
  279. raise FileParseError(str(words_file), str(e))
  280. return word_groups