parser_service.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. """
  2. 数据解析服务
  3. v2.0.0: 仅支持 SQLite 数据库,移除 TXT 文件支持
  4. 新存储结构:output/{type}/{date}.db
  5. """
  6. import re
  7. import sqlite3
  8. from pathlib import Path
  9. from typing import Dict, List, Tuple, Optional
  10. from datetime import datetime
  11. import yaml
  12. from ..utils.errors import FileParseError, DataNotFoundError
  13. from .cache_service import get_cache
  14. class ParserService:
  15. """数据解析服务类"""
  16. def __init__(self, project_root: str = None):
  17. """
  18. 初始化解析服务
  19. Args:
  20. project_root: 项目根目录,默认为当前目录的父目录
  21. """
  22. if project_root is None:
  23. current_file = Path(__file__)
  24. self.project_root = current_file.parent.parent.parent
  25. else:
  26. self.project_root = Path(project_root)
  27. self.cache = get_cache()
  28. # frequency_words.txt mtime 缓存
  29. self._freq_words_cache: Optional[List[Dict]] = None
  30. self._freq_words_mtime: float = 0.0
  31. @staticmethod
  32. def clean_title(title: str) -> str:
  33. """清理标题文本"""
  34. title = re.sub(r'\s+', ' ', title)
  35. title = title.strip()
  36. return title
  37. def get_date_folder_name(self, date: datetime = None) -> str:
  38. """
  39. 获取日期字符串(ISO 格式)
  40. Args:
  41. date: 日期对象,默认为今天
  42. Returns:
  43. 日期字符串(YYYY-MM-DD)
  44. """
  45. if date is None:
  46. date = datetime.now()
  47. return date.strftime("%Y-%m-%d")
  48. def _get_db_path(self, date: datetime = None, db_type: str = "news") -> Optional[Path]:
  49. """
  50. 获取数据库文件路径
  51. 新结构:output/{type}/{date}.db
  52. Args:
  53. date: 日期对象,默认为今天
  54. db_type: 数据库类型 ("news" 或 "rss")
  55. Returns:
  56. 数据库文件路径,如果不存在则返回 None
  57. """
  58. date_str = self.get_date_folder_name(date)
  59. db_path = self.project_root / "output" / db_type / f"{date_str}.db"
  60. if db_path.exists():
  61. return db_path
  62. return None
  63. def _read_from_sqlite(
  64. self,
  65. date: datetime = None,
  66. platform_ids: Optional[List[str]] = None,
  67. db_type: str = "news"
  68. ) -> Optional[Tuple[Dict, Dict, Dict]]:
  69. """
  70. 从 SQLite 数据库读取数据
  71. Args:
  72. date: 日期对象,默认为今天
  73. platform_ids: 平台ID列表,None表示所有平台
  74. db_type: 数据库类型 ("news" 或 "rss")
  75. Returns:
  76. (all_titles, id_to_name, all_timestamps) 元组,如果数据库不存在返回 None
  77. """
  78. db_path = self._get_db_path(date, db_type)
  79. if db_path is None:
  80. return None
  81. all_titles = {}
  82. id_to_name = {}
  83. all_timestamps = {}
  84. try:
  85. conn = sqlite3.connect(str(db_path))
  86. conn.row_factory = sqlite3.Row
  87. cursor = conn.cursor()
  88. if db_type == "news":
  89. return self._read_news_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
  90. elif db_type == "rss":
  91. return self._read_rss_from_sqlite(cursor, platform_ids, all_titles, id_to_name, all_timestamps)
  92. except Exception as e:
  93. print(f"Warning: 从 SQLite 读取数据失败: {e}")
  94. return None
  95. finally:
  96. if 'conn' in locals():
  97. conn.close()
  98. def _read_news_from_sqlite(
  99. self,
  100. cursor,
  101. platform_ids: Optional[List[str]],
  102. all_titles: Dict,
  103. id_to_name: Dict,
  104. all_timestamps: Dict
  105. ) -> Optional[Tuple[Dict, Dict, Dict]]:
  106. """从热榜数据库读取数据"""
  107. # 检查表是否存在
  108. cursor.execute("""
  109. SELECT name FROM sqlite_master
  110. WHERE type='table' AND name='news_items'
  111. """)
  112. if not cursor.fetchone():
  113. return None
  114. # 构建查询
  115. if platform_ids:
  116. placeholders = ','.join(['?' for _ in platform_ids])
  117. query = f"""
  118. SELECT n.id, n.platform_id, p.name as platform_name, n.title,
  119. n.rank, n.url, n.mobile_url,
  120. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  121. FROM news_items n
  122. LEFT JOIN platforms p ON n.platform_id = p.id
  123. WHERE n.platform_id IN ({placeholders})
  124. """
  125. cursor.execute(query, platform_ids)
  126. else:
  127. cursor.execute("""
  128. SELECT n.id, n.platform_id, p.name as platform_name, n.title,
  129. n.rank, n.url, n.mobile_url,
  130. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  131. FROM news_items n
  132. LEFT JOIN platforms p ON n.platform_id = p.id
  133. """)
  134. rows = cursor.fetchall()
  135. # 收集所有 news_item_id 用于查询历史排名
  136. news_ids = [row['id'] for row in rows]
  137. rank_history_map = {}
  138. if news_ids:
  139. placeholders = ",".join("?" * len(news_ids))
  140. cursor.execute(f"""
  141. SELECT news_item_id, rank FROM rank_history
  142. WHERE news_item_id IN ({placeholders})
  143. ORDER BY news_item_id, crawl_time
  144. """, news_ids)
  145. for rh_row in cursor.fetchall():
  146. news_id = rh_row['news_item_id']
  147. rank = rh_row['rank']
  148. if news_id not in rank_history_map:
  149. rank_history_map[news_id] = []
  150. rank_history_map[news_id].append(rank)
  151. for row in rows:
  152. news_id = row['id']
  153. platform_id = row['platform_id']
  154. platform_name = row['platform_name'] or platform_id
  155. title = row['title']
  156. if platform_id not in id_to_name:
  157. id_to_name[platform_id] = platform_name
  158. if platform_id not in all_titles:
  159. all_titles[platform_id] = {}
  160. ranks = rank_history_map.get(news_id, [row['rank']])
  161. all_titles[platform_id][title] = {
  162. "ranks": ranks,
  163. "url": row['url'] or "",
  164. "mobileUrl": row['mobile_url'] or "",
  165. "first_time": row['first_crawl_time'] or "",
  166. "last_time": row['last_crawl_time'] or "",
  167. "count": row['crawl_count'] or 1,
  168. }
  169. # 获取抓取时间作为 timestamps
  170. cursor.execute("""
  171. SELECT crawl_time, created_at FROM crawl_records
  172. ORDER BY crawl_time
  173. """)
  174. for row in cursor.fetchall():
  175. crawl_time = row['crawl_time']
  176. created_at = row['created_at']
  177. try:
  178. ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
  179. except (ValueError, TypeError):
  180. ts = datetime.now().timestamp()
  181. all_timestamps[f"{crawl_time}.db"] = ts
  182. if not all_titles:
  183. return None
  184. return (all_titles, id_to_name, all_timestamps)
  185. def _read_rss_from_sqlite(
  186. self,
  187. cursor,
  188. feed_ids: Optional[List[str]],
  189. all_items: Dict,
  190. id_to_name: Dict,
  191. all_timestamps: Dict
  192. ) -> Optional[Tuple[Dict, Dict, Dict]]:
  193. """从 RSS 数据库读取数据"""
  194. # 检查表是否存在
  195. cursor.execute("""
  196. SELECT name FROM sqlite_master
  197. WHERE type='table' AND name='rss_items'
  198. """)
  199. if not cursor.fetchone():
  200. return None
  201. # 构建查询
  202. if feed_ids:
  203. placeholders = ','.join(['?' for _ in feed_ids])
  204. query = f"""
  205. SELECT i.id, i.feed_id, f.name as feed_name, i.title,
  206. i.url, i.published_at, i.summary, i.author,
  207. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  208. FROM rss_items i
  209. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  210. WHERE i.feed_id IN ({placeholders})
  211. ORDER BY i.published_at DESC
  212. """
  213. cursor.execute(query, feed_ids)
  214. else:
  215. cursor.execute("""
  216. SELECT i.id, i.feed_id, f.name as feed_name, i.title,
  217. i.url, i.published_at, i.summary, i.author,
  218. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  219. FROM rss_items i
  220. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  221. ORDER BY i.published_at DESC
  222. """)
  223. rows = cursor.fetchall()
  224. for row in rows:
  225. feed_id = row['feed_id']
  226. feed_name = row['feed_name'] or feed_id
  227. title = row['title']
  228. if feed_id not in id_to_name:
  229. id_to_name[feed_id] = feed_name
  230. if feed_id not in all_items:
  231. all_items[feed_id] = {}
  232. all_items[feed_id][title] = {
  233. "url": row['url'] or "",
  234. "published_at": row['published_at'] or "",
  235. "summary": row['summary'] or "",
  236. "author": row['author'] or "",
  237. "first_time": row['first_crawl_time'] or "",
  238. "last_time": row['last_crawl_time'] or "",
  239. "count": row['crawl_count'] or 1,
  240. }
  241. # 获取抓取时间
  242. cursor.execute("""
  243. SELECT crawl_time, created_at FROM rss_crawl_records
  244. ORDER BY crawl_time
  245. """)
  246. for row in cursor.fetchall():
  247. crawl_time = row['crawl_time']
  248. created_at = row['created_at']
  249. try:
  250. ts = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S").timestamp()
  251. except (ValueError, TypeError):
  252. ts = datetime.now().timestamp()
  253. all_timestamps[f"{crawl_time}.db"] = ts
  254. if not all_items:
  255. return None
  256. return (all_items, id_to_name, all_timestamps)
  257. def read_all_titles_for_date(
  258. self,
  259. date: datetime = None,
  260. platform_ids: Optional[List[str]] = None,
  261. db_type: str = "news"
  262. ) -> Tuple[Dict, Dict, Dict]:
  263. """
  264. 读取指定日期的所有数据(带缓存)
  265. Args:
  266. date: 日期对象,默认为今天
  267. platform_ids: 平台/Feed ID列表,None表示所有
  268. db_type: 数据库类型 ("news" 或 "rss")
  269. Returns:
  270. (all_titles, id_to_name, all_timestamps) 元组
  271. Raises:
  272. DataNotFoundError: 数据不存在
  273. """
  274. date_str = self.get_date_folder_name(date)
  275. platform_key = ','.join(sorted(platform_ids)) if platform_ids else 'all'
  276. cache_key = f"read_all:{db_type}:{date_str}:{platform_key}"
  277. is_today = (date is None) or (date.date() == datetime.now().date())
  278. ttl = 900 if is_today else 900
  279. cached = self.cache.get(cache_key, ttl=ttl)
  280. if cached:
  281. return cached
  282. result = self._read_from_sqlite(date, platform_ids, db_type)
  283. if result:
  284. self.cache.set(cache_key, result)
  285. return result
  286. raise DataNotFoundError(
  287. f"未找到 {date_str} 的 {db_type} 数据",
  288. suggestion="请先运行爬虫或检查日期是否正确"
  289. )
  290. def parse_yaml_config(self, config_path: str = None) -> dict:
  291. """
  292. 解析YAML配置文件
  293. Args:
  294. config_path: 配置文件路径,默认为 config/config.yaml
  295. Returns:
  296. 配置字典
  297. Raises:
  298. FileParseError: 配置文件解析错误
  299. """
  300. if config_path is None:
  301. config_path = self.project_root / "config" / "config.yaml"
  302. else:
  303. config_path = Path(config_path)
  304. if not config_path.exists():
  305. raise FileParseError(str(config_path), "配置文件不存在")
  306. try:
  307. with open(config_path, "r", encoding="utf-8") as f:
  308. config_data = yaml.safe_load(f)
  309. return config_data
  310. except Exception as e:
  311. raise FileParseError(str(config_path), str(e))
  312. def parse_frequency_words(self, words_file: str = None) -> List[Dict]:
  313. """
  314. 解析关键词配置文件(带 mtime 缓存)
  315. 仅当 frequency_words.txt 被修改时才重新解析,避免循环内重复 IO。
  316. 复用 trendradar.core.frequency 的解析逻辑,支持:
  317. - # 开头的注释行
  318. - 空行分隔词组
  319. - [组别名] 作为词组第一行,给整组指定别名
  320. - +前缀必须词、!前缀过滤词、@数量限制
  321. - /pattern/ 正则表达式语法
  322. - => 别名 显示名称语法
  323. - [GLOBAL_FILTER] 全局过滤区域
  324. 显示名称优先级:组别名 > 行别名拼接 > 关键词拼接
  325. Args:
  326. words_file: 关键词文件路径,默认为 config/frequency_words.txt
  327. Returns:
  328. 词组列表
  329. Raises:
  330. FileParseError: 文件解析错误
  331. """
  332. import os
  333. from trendradar.core.frequency import load_frequency_words
  334. if words_file is None:
  335. words_file = str(self.project_root / "config" / "frequency_words.txt")
  336. else:
  337. words_file = str(words_file)
  338. try:
  339. current_mtime = os.path.getmtime(words_file)
  340. if self._freq_words_cache is not None and current_mtime == self._freq_words_mtime:
  341. return self._freq_words_cache
  342. word_groups, filter_words, global_filters = load_frequency_words(words_file)
  343. self._freq_words_cache = word_groups
  344. self._freq_words_mtime = current_mtime
  345. return word_groups
  346. except FileNotFoundError:
  347. return []
  348. except Exception as e:
  349. raise FileParseError(words_file, str(e))
  350. def get_available_dates(self, db_type: str = "news") -> List[str]:
  351. """
  352. 获取可用的日期列表
  353. Args:
  354. db_type: 数据库类型 ("news" 或 "rss")
  355. Returns:
  356. 日期字符串列表(YYYY-MM-DD 格式,降序排列)
  357. """
  358. db_dir = self.project_root / "output" / db_type
  359. if not db_dir.exists():
  360. return []
  361. dates = []
  362. for db_file in db_dir.glob("*.db"):
  363. date_match = re.match(r'(\d{4}-\d{2}-\d{2})\.db$', db_file.name)
  364. if date_match:
  365. dates.append(date_match.group(1))
  366. return sorted(dates, reverse=True)
  367. def get_available_date_range(self, db_type: str = "news") -> Tuple[Optional[datetime], Optional[datetime]]:
  368. """
  369. 获取可用的日期范围
  370. Args:
  371. db_type: 数据库类型 ("news" 或 "rss")
  372. Returns:
  373. (最早日期, 最新日期) 元组,如果没有数据则返回 (None, None)
  374. """
  375. dates = self.get_available_dates(db_type)
  376. if not dates:
  377. return (None, None)
  378. earliest = datetime.strptime(dates[-1], "%Y-%m-%d")
  379. latest = datetime.strptime(dates[0], "%Y-%m-%d")
  380. return (earliest, latest)