parser_service.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. """
  2. 文件解析服务
  3. 提供txt格式新闻数据和YAML配置文件的解析功能。
  4. 支持从 SQLite 数据库和 TXT 文件两种数据源读取。
  5. """
  6. import re
  7. import sqlite3
  8. from pathlib import Path
  9. from typing import Dict, List, Tuple, Optional
  10. from datetime import datetime
  11. import yaml
  12. from ..utils.errors import FileParseError, DataNotFoundError
  13. from .cache_service import get_cache
  14. class ParserService:
  15. """文件解析服务类"""
  16. def __init__(self, project_root: str = None):
  17. """
  18. 初始化解析服务
  19. Args:
  20. project_root: 项目根目录,默认为当前目录的父目录
  21. """
  22. if project_root is None:
  23. # 获取当前文件所在目录的父目录的父目录
  24. current_file = Path(__file__)
  25. self.project_root = current_file.parent.parent.parent
  26. else:
  27. self.project_root = Path(project_root)
  28. # 初始化缓存服务
  29. self.cache = get_cache()
  30. @staticmethod
  31. def clean_title(title: str) -> str:
  32. """
  33. 清理标题文本
  34. Args:
  35. title: 原始标题
  36. Returns:
  37. 清理后的标题
  38. """
  39. # 移除多余空白
  40. title = re.sub(r'\s+', ' ', title)
  41. # 移除特殊字符
  42. title = title.strip()
  43. return title
  44. def parse_txt_file(self, file_path: Path) -> Tuple[Dict, Dict]:
  45. """
  46. 解析单个txt文件的标题数据
  47. Args:
  48. file_path: txt文件路径
  49. Returns:
  50. (titles_by_id, id_to_name) 元组
  51. - titles_by_id: {platform_id: {title: {ranks, url, mobileUrl}}}
  52. - id_to_name: {platform_id: platform_name}
  53. Raises:
  54. FileParseError: 文件解析错误
  55. """
  56. if not file_path.exists():
  57. raise FileParseError(str(file_path), "文件不存在")
  58. titles_by_id = {}
  59. id_to_name = {}
  60. try:
  61. with open(file_path, "r", encoding="utf-8") as f:
  62. content = f.read()
  63. sections = content.split("\n\n")
  64. for section in sections:
  65. if not section.strip() or "==== 以下ID请求失败 ====" in section:
  66. continue
  67. lines = section.strip().split("\n")
  68. if len(lines) < 2:
  69. continue
  70. # 解析header: id | name 或 id
  71. header_line = lines[0].strip()
  72. if " | " in header_line:
  73. parts = header_line.split(" | ", 1)
  74. source_id = parts[0].strip()
  75. name = parts[1].strip()
  76. id_to_name[source_id] = name
  77. else:
  78. source_id = header_line
  79. id_to_name[source_id] = source_id
  80. titles_by_id[source_id] = {}
  81. # 解析标题行
  82. for line in lines[1:]:
  83. if line.strip():
  84. try:
  85. title_part = line.strip()
  86. rank = None
  87. # 提取排名
  88. if ". " in title_part and title_part.split(". ")[0].isdigit():
  89. rank_str, title_part = title_part.split(". ", 1)
  90. rank = int(rank_str)
  91. # 提取 MOBILE URL
  92. mobile_url = ""
  93. if " [MOBILE:" in title_part:
  94. title_part, mobile_part = title_part.rsplit(" [MOBILE:", 1)
  95. if mobile_part.endswith("]"):
  96. mobile_url = mobile_part[:-1]
  97. # 提取 URL
  98. url = ""
  99. if " [URL:" in title_part:
  100. title_part, url_part = title_part.rsplit(" [URL:", 1)
  101. if url_part.endswith("]"):
  102. url = url_part[:-1]
  103. title = self.clean_title(title_part.strip())
  104. ranks = [rank] if rank is not None else [1]
  105. titles_by_id[source_id][title] = {
  106. "ranks": ranks,
  107. "url": url,
  108. "mobileUrl": mobile_url,
  109. }
  110. except Exception as e:
  111. # 忽略单行解析错误
  112. continue
  113. except Exception as e:
  114. raise FileParseError(str(file_path), str(e))
  115. return titles_by_id, id_to_name
  116. def get_date_folder_name(self, date: datetime = None) -> str:
  117. """
  118. 获取日期文件夹名称(兼容中文和ISO格式)
  119. Args:
  120. date: 日期对象,默认为今天
  121. Returns:
  122. 实际存在的文件夹名称,优先返回中文格式(YYYY年MM月DD日),
  123. 若不存在则返回 ISO 格式(YYYY-MM-DD)
  124. """
  125. if date is None:
  126. date = datetime.now()
  127. return self._find_date_folder(date)
  128. def _get_date_folder_name(self, date: datetime = None) -> str:
  129. """
  130. 获取日期文件夹名称(兼容中文和ISO格式)
  131. Args:
  132. date: 日期对象,默认为今天
  133. Returns:
  134. 实际存在的文件夹名称,优先返回中文格式(YYYY年MM月DD日),
  135. 若不存在则返回 ISO 格式(YYYY-MM-DD)
  136. """
  137. if date is None:
  138. date = datetime.now()
  139. return self._find_date_folder(date)
  140. def _find_date_folder(self, date: datetime) -> str:
  141. """
  142. 查找实际存在的日期文件夹
  143. 支持两种格式:
  144. - 中文格式:YYYY年MM月DD日(优先)
  145. - ISO格式:YYYY-MM-DD
  146. Args:
  147. date: 日期对象
  148. Returns:
  149. 实际存在的文件夹名称,若都不存在则返回中文格式
  150. """
  151. output_dir = self.project_root / "output"
  152. # 中文格式:YYYY年MM月DD日
  153. chinese_format = date.strftime("%Y年%m月%d日")
  154. # ISO格式:YYYY-MM-DD
  155. iso_format = date.strftime("%Y-%m-%d")
  156. # 优先检查中文格式
  157. if (output_dir / chinese_format).exists():
  158. return chinese_format
  159. # 其次检查 ISO 格式
  160. if (output_dir / iso_format).exists():
  161. return iso_format
  162. # 都不存在,返回中文格式(与项目现有风格一致)
  163. return chinese_format
  164. def _get_sqlite_db_path(self, date: datetime = None) -> Optional[Path]:
  165. """
  166. 获取 SQLite 数据库文件路径
  167. Args:
  168. date: 日期对象,默认为今天
  169. Returns:
  170. 数据库文件路径,如果不存在则返回 None
  171. """
  172. date_folder = self._get_date_folder_name(date)
  173. db_path = self.project_root / "output" / date_folder / "news.db"
  174. if db_path.exists():
  175. return db_path
  176. return None
  177. def _get_txt_folder_path(self, date: datetime = None) -> Optional[Path]:
  178. """
  179. 获取 TXT 文件夹路径
  180. Args:
  181. date: 日期对象,默认为今天
  182. Returns:
  183. TXT 文件夹路径,如果不存在则返回 None
  184. """
  185. date_folder = self._get_date_folder_name(date)
  186. txt_path = self.project_root / "output" / date_folder / "txt"
  187. if txt_path.exists() and txt_path.is_dir():
  188. return txt_path
  189. return None
  190. def _read_from_txt(
  191. self,
  192. date: datetime = None,
  193. platform_ids: Optional[List[str]] = None
  194. ) -> Optional[Tuple[Dict, Dict, Dict]]:
  195. """
  196. 从 TXT 文件夹读取新闻数据
  197. Args:
  198. date: 日期对象,默认为今天
  199. platform_ids: 平台ID列表,None表示所有平台
  200. Returns:
  201. (all_titles, id_to_name, all_timestamps) 元组,如果不存在返回 None
  202. """
  203. txt_folder = self._get_txt_folder_path(date)
  204. if txt_folder is None:
  205. return None
  206. # 获取所有 TXT 文件并按时间排序
  207. txt_files = sorted(txt_folder.glob("*.txt"))
  208. if not txt_files:
  209. return None
  210. all_titles = {}
  211. id_to_name = {}
  212. all_timestamps = {}
  213. for txt_file in txt_files:
  214. try:
  215. titles_by_id, file_id_to_name = self.parse_txt_file(txt_file)
  216. # 记录时间戳
  217. all_timestamps[txt_file.name] = txt_file.stat().st_mtime
  218. # 合并 id_to_name
  219. id_to_name.update(file_id_to_name)
  220. # 合并标题数据
  221. for source_id, titles in titles_by_id.items():
  222. # 如果指定了 platform_ids,过滤
  223. if platform_ids and source_id not in platform_ids:
  224. continue
  225. if source_id not in all_titles:
  226. all_titles[source_id] = {}
  227. for title, data in titles.items():
  228. if title not in all_titles[source_id]:
  229. # 新标题
  230. all_titles[source_id][title] = {
  231. "ranks": data.get("ranks", []),
  232. "url": data.get("url", ""),
  233. "mobileUrl": data.get("mobileUrl", ""),
  234. "first_time": txt_file.stem, # 使用文件名作为时间
  235. "last_time": txt_file.stem,
  236. "count": 1,
  237. }
  238. else:
  239. # 合并已存在的标题
  240. existing = all_titles[source_id][title]
  241. # 合并排名
  242. for rank in data.get("ranks", []):
  243. if rank not in existing["ranks"]:
  244. existing["ranks"].append(rank)
  245. # 更新 last_time
  246. existing["last_time"] = txt_file.stem
  247. existing["count"] += 1
  248. # 保留 URL
  249. if not existing["url"] and data.get("url"):
  250. existing["url"] = data["url"]
  251. if not existing["mobileUrl"] and data.get("mobileUrl"):
  252. existing["mobileUrl"] = data["mobileUrl"]
  253. except Exception as e:
  254. print(f"Warning: 解析 TXT 文件失败 {txt_file}: {e}")
  255. continue
  256. if not all_titles:
  257. return None
  258. return (all_titles, id_to_name, all_timestamps)
  259. def _read_from_sqlite(
  260. self,
  261. date: datetime = None,
  262. platform_ids: Optional[List[str]] = None
  263. ) -> Optional[Tuple[Dict, Dict, Dict]]:
  264. """
  265. 从 SQLite 数据库读取新闻数据
  266. 新表结构数据已按 URL 去重,包含:
  267. - first_crawl_time: 首次抓取时间
  268. - last_crawl_time: 最后抓取时间
  269. - crawl_count: 抓取次数
  270. Args:
  271. date: 日期对象,默认为今天
  272. platform_ids: 平台ID列表,None表示所有平台
  273. Returns:
  274. (all_titles, id_to_name, all_timestamps) 元组,如果数据库不存在返回 None
  275. """
  276. db_path = self._get_sqlite_db_path(date)
  277. if db_path is None:
  278. return None
  279. all_titles = {}
  280. id_to_name = {}
  281. all_timestamps = {}
  282. try:
  283. conn = sqlite3.connect(str(db_path))
  284. conn.row_factory = sqlite3.Row
  285. cursor = conn.cursor()
  286. # 检查表是否存在
  287. cursor.execute("""
  288. SELECT name FROM sqlite_master
  289. WHERE type='table' AND name='news_items'
  290. """)
  291. if not cursor.fetchone():
  292. conn.close()
  293. return None
  294. # 构建查询
  295. if platform_ids:
  296. placeholders = ','.join(['?' for _ in platform_ids])
  297. query = f"""
  298. SELECT n.id, n.platform_id, p.name as platform_name, n.title,
  299. n.rank, n.url, n.mobile_url,
  300. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  301. FROM news_items n
  302. LEFT JOIN platforms p ON n.platform_id = p.id
  303. WHERE n.platform_id IN ({placeholders})
  304. """
  305. cursor.execute(query, platform_ids)
  306. else:
  307. cursor.execute("""
  308. SELECT n.id, n.platform_id, p.name as platform_name, n.title,
  309. n.rank, n.url, n.mobile_url,
  310. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  311. FROM news_items n
  312. LEFT JOIN platforms p ON n.platform_id = p.id
  313. """)
  314. rows = cursor.fetchall()
  315. # 收集所有 news_item_id 用于查询历史排名
  316. news_ids = [row['id'] for row in rows]
  317. rank_history_map = {}
  318. if news_ids:
  319. placeholders = ",".join("?" * len(news_ids))
  320. cursor.execute(f"""
  321. SELECT news_item_id, rank FROM rank_history
  322. WHERE news_item_id IN ({placeholders})
  323. ORDER BY news_item_id, crawl_time
  324. """, news_ids)
  325. for rh_row in cursor.fetchall():
  326. news_id = rh_row['news_item_id']
  327. rank = rh_row['rank']
  328. if news_id not in rank_history_map:
  329. rank_history_map[news_id] = []
  330. rank_history_map[news_id].append(rank)
  331. for row in rows:
  332. news_id = row['id']
  333. platform_id = row['platform_id']
  334. platform_name = row['platform_name'] or platform_id
  335. title = row['title']
  336. # 更新 id_to_name
  337. if platform_id not in id_to_name:
  338. id_to_name[platform_id] = platform_name
  339. # 初始化平台字典
  340. if platform_id not in all_titles:
  341. all_titles[platform_id] = {}
  342. # 获取排名历史,如果为空则使用当前排名
  343. ranks = rank_history_map.get(news_id, [row['rank']])
  344. # 直接使用数据(已去重)
  345. all_titles[platform_id][title] = {
  346. "ranks": ranks,
  347. "url": row['url'] or "",
  348. "mobileUrl": row['mobile_url'] or "",
  349. "first_time": row['first_crawl_time'] or "",
  350. "last_time": row['last_crawl_time'] or "",
  351. "count": row['crawl_count'] or 1,
  352. }
  353. # 获取抓取时间作为 timestamps
  354. cursor.execute("""
  355. SELECT crawl_time, created_at FROM crawl_records
  356. ORDER BY crawl_time
  357. """)
  358. for row in cursor.fetchall():
  359. crawl_time = row['crawl_time']
  360. created_at = row['created_at']
  361. # 将 created_at 转换为 Unix 时间戳
  362. try:
  363. ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
  364. except (ValueError, TypeError):
  365. ts = datetime.now().timestamp()
  366. all_timestamps[f"{crawl_time}.db"] = ts
  367. conn.close()
  368. if not all_titles:
  369. return None
  370. return (all_titles, id_to_name, all_timestamps)
  371. except Exception as e:
  372. print(f"Warning: 从 SQLite 读取数据失败: {e}")
  373. return None
  374. def read_all_titles_for_date(
  375. self,
  376. date: datetime = None,
  377. platform_ids: Optional[List[str]] = None
  378. ) -> Tuple[Dict, Dict, Dict]:
  379. """
  380. 读取指定日期的所有标题(带缓存)
  381. Args:
  382. date: 日期对象,默认为今天
  383. platform_ids: 平台ID列表,None表示所有平台
  384. Returns:
  385. (all_titles, id_to_name, all_timestamps) 元组
  386. - all_titles: {platform_id: {title: {ranks, url, mobileUrl, ...}}}
  387. - id_to_name: {platform_id: platform_name}
  388. - all_timestamps: {filename: timestamp}
  389. Raises:
  390. DataNotFoundError: 数据不存在
  391. """
  392. # 生成缓存键
  393. date_str = self.get_date_folder_name(date)
  394. platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
  395. cache_key = f"read_all_titles:{date_str}:{platform_key}"
  396. # 尝试从缓存获取
  397. # 对于历史数据(非今天),使用更长的缓存时间(1小时)
  398. # 对于今天的数据,使用较短的缓存时间(15分钟),因为可能有新数据
  399. is_today = (date is None) or (date.date() == datetime.now().date())
  400. ttl = 900 if is_today else 3600 # 15分钟 vs 1小时
  401. cached = self.cache.get(cache_key, ttl=ttl)
  402. if cached:
  403. return cached
  404. # 优先从 SQLite 读取
  405. sqlite_result = self._read_from_sqlite(date, platform_ids)
  406. if sqlite_result:
  407. self.cache.set(cache_key, sqlite_result)
  408. return sqlite_result
  409. # SQLite 不存在,尝试从 TXT 读取
  410. txt_result = self._read_from_txt(date, platform_ids)
  411. if txt_result:
  412. self.cache.set(cache_key, txt_result)
  413. return txt_result
  414. # 两种数据源都不存在
  415. raise DataNotFoundError(
  416. f"未找到 {date_str} 的数据",
  417. suggestion="请先运行爬虫或检查日期是否正确"
  418. )
  419. def parse_yaml_config(self, config_path: str = None) -> dict:
  420. """
  421. 解析YAML配置文件
  422. Args:
  423. config_path: 配置文件路径,默认为 config/config.yaml
  424. Returns:
  425. 配置字典
  426. Raises:
  427. FileParseError: 配置文件解析错误
  428. """
  429. if config_path is None:
  430. config_path = self.project_root / "config" / "config.yaml"
  431. else:
  432. config_path = Path(config_path)
  433. if not config_path.exists():
  434. raise FileParseError(str(config_path), "配置文件不存在")
  435. try:
  436. with open(config_path, "r", encoding="utf-8") as f:
  437. config_data = yaml.safe_load(f)
  438. return config_data
  439. except Exception as e:
  440. raise FileParseError(str(config_path), str(e))
  441. def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
  442. """
  443. 解析关键词配置文件
  444. Args:
  445. words_file: 关键词文件路径,默认为 config/frequency_words.txt
  446. Returns:
  447. 词组列表
  448. Raises:
  449. FileParseError: 文件解析错误
  450. """
  451. if words_file is None:
  452. words_file = self.project_root / "config" / "frequency_words.txt"
  453. else:
  454. words_file = Path(words_file)
  455. if not words_file.exists():
  456. return []
  457. word_groups = []
  458. try:
  459. with open(words_file, "r", encoding="utf-8") as f:
  460. for line in f:
  461. line = line.strip()
  462. if not line or line.startswith("#"):
  463. continue
  464. # 使用 | 分隔符
  465. parts = [p.strip() for p in line.split("|")]
  466. if not parts:
  467. continue
  468. group = {
  469. "required": [],
  470. "normal": [],
  471. "filter_words": []
  472. }
  473. for part in parts:
  474. if not part:
  475. continue
  476. words = [w.strip() for w in part.split(",")]
  477. for word in words:
  478. if not word:
  479. continue
  480. if word.endswith("+"):
  481. # 必须词
  482. group["required"].append(word[:-1])
  483. elif word.endswith("!"):
  484. # 过滤词
  485. group["filter_words"].append(word[:-1])
  486. else:
  487. # 普通词
  488. group["normal"].append(word)
  489. if group["required"] or group["normal"]:
  490. word_groups.append(group)
  491. except Exception as e:
  492. raise FileParseError(str(words_file), str(e))
  493. return word_groups