sqlite_mixin.py 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151
  1. # coding=utf-8
  2. """
  3. SQLite 存储 Mixin
  4. 提供共用的 SQLite 数据库操作逻辑,供 LocalStorageBackend 和 RemoteStorageBackend 复用。
  5. """
  6. import sqlite3
  7. from abc import abstractmethod
  8. from datetime import datetime
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from trendradar.storage.base import NewsItem, NewsData, RSSItem, RSSData
  12. from trendradar.utils.url import normalize_url
  13. class SQLiteStorageMixin:
  14. """
  15. SQLite 存储操作 Mixin
  16. 子类需要实现以下抽象方法:
  17. - _get_connection(date, db_type) -> sqlite3.Connection
  18. - _get_configured_time() -> datetime
  19. - _format_date_folder(date) -> str
  20. - _format_time_filename() -> str
  21. """
  22. # ========================================
  23. # 抽象方法 - 子类必须实现
  24. # ========================================
  25. @abstractmethod
  26. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  27. """获取数据库连接"""
  28. pass
  29. @abstractmethod
  30. def _get_configured_time(self) -> datetime:
  31. """获取配置时区的当前时间"""
  32. pass
  33. @abstractmethod
  34. def _format_date_folder(self, date: Optional[str] = None) -> str:
  35. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  36. pass
  37. @abstractmethod
  38. def _format_time_filename(self) -> str:
  39. """格式化时间文件名 (格式: HH-MM)"""
  40. pass
  41. # ========================================
  42. # Schema 管理
  43. # ========================================
  44. def _get_schema_path(self, db_type: str = "news") -> Path:
  45. """
  46. 获取 schema.sql 文件路径
  47. Args:
  48. db_type: 数据库类型 ("news" 或 "rss")
  49. Returns:
  50. schema 文件路径
  51. """
  52. if db_type == "rss":
  53. return Path(__file__).parent / "rss_schema.sql"
  54. return Path(__file__).parent / "schema.sql"
  55. def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
  56. """
  57. 从 schema.sql 初始化数据库表结构
  58. Args:
  59. conn: 数据库连接
  60. db_type: 数据库类型 ("news" 或 "rss")
  61. """
  62. schema_path = self._get_schema_path(db_type)
  63. if schema_path.exists():
  64. with open(schema_path, "r", encoding="utf-8") as f:
  65. schema_sql = f.read()
  66. conn.executescript(schema_sql)
  67. else:
  68. raise FileNotFoundError(f"Schema file not found: {schema_path}")
  69. conn.commit()
  70. # ========================================
  71. # 新闻数据存储
  72. # ========================================
  73. def _save_news_data_impl(self, data: NewsData, log_prefix: str = "[存储]") -> tuple[bool, int, int, int, int]:
  74. """
  75. 保存新闻数据到 SQLite(核心实现)
  76. Args:
  77. data: 新闻数据
  78. log_prefix: 日志前缀
  79. Returns:
  80. (success, new_count, updated_count, title_changed_count, off_list_count)
  81. """
  82. try:
  83. conn = self._get_connection(data.date)
  84. cursor = conn.cursor()
  85. # 获取配置时区的当前时间
  86. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  87. # 首先同步平台信息到 platforms 表
  88. for source_id, source_name in data.id_to_name.items():
  89. cursor.execute("""
  90. INSERT INTO platforms (id, name, updated_at)
  91. VALUES (?, ?, ?)
  92. ON CONFLICT(id) DO UPDATE SET
  93. name = excluded.name,
  94. updated_at = excluded.updated_at
  95. """, (source_id, source_name, now_str))
  96. # 统计计数器
  97. new_count = 0
  98. updated_count = 0
  99. title_changed_count = 0
  100. success_sources = []
  101. for source_id, news_list in data.items.items():
  102. success_sources.append(source_id)
  103. for item in news_list:
  104. try:
  105. # 标准化 URL(去除动态参数,如微博的 band_rank)
  106. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  107. # 检查是否已存在(通过标准化 URL + platform_id)
  108. if normalized_url:
  109. cursor.execute("""
  110. SELECT id, title FROM news_items
  111. WHERE url = ? AND platform_id = ?
  112. """, (normalized_url, source_id))
  113. existing = cursor.fetchone()
  114. if existing:
  115. # 已存在,更新记录
  116. existing_id, existing_title = existing
  117. # 检查标题是否变化
  118. if existing_title != item.title:
  119. # 记录标题变更
  120. cursor.execute("""
  121. INSERT INTO title_changes
  122. (news_item_id, old_title, new_title, changed_at)
  123. VALUES (?, ?, ?, ?)
  124. """, (existing_id, existing_title, item.title, now_str))
  125. title_changed_count += 1
  126. # 记录排名历史
  127. cursor.execute("""
  128. INSERT INTO rank_history
  129. (news_item_id, rank, crawl_time, created_at)
  130. VALUES (?, ?, ?, ?)
  131. """, (existing_id, item.rank, data.crawl_time, now_str))
  132. # 更新现有记录
  133. cursor.execute("""
  134. UPDATE news_items SET
  135. title = ?,
  136. rank = ?,
  137. mobile_url = ?,
  138. last_crawl_time = ?,
  139. crawl_count = crawl_count + 1,
  140. updated_at = ?
  141. WHERE id = ?
  142. """, (item.title, item.rank, item.mobile_url,
  143. data.crawl_time, now_str, existing_id))
  144. updated_count += 1
  145. else:
  146. # 不存在,插入新记录(存储标准化后的 URL)
  147. cursor.execute("""
  148. INSERT INTO news_items
  149. (title, platform_id, rank, url, mobile_url,
  150. first_crawl_time, last_crawl_time, crawl_count,
  151. created_at, updated_at)
  152. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  153. """, (item.title, source_id, item.rank, normalized_url,
  154. item.mobile_url, data.crawl_time, data.crawl_time,
  155. now_str, now_str))
  156. new_id = cursor.lastrowid
  157. # 记录初始排名
  158. cursor.execute("""
  159. INSERT INTO rank_history
  160. (news_item_id, rank, crawl_time, created_at)
  161. VALUES (?, ?, ?, ?)
  162. """, (new_id, item.rank, data.crawl_time, now_str))
  163. new_count += 1
  164. else:
  165. # URL 为空的情况,直接插入(不做去重)
  166. cursor.execute("""
  167. INSERT INTO news_items
  168. (title, platform_id, rank, url, mobile_url,
  169. first_crawl_time, last_crawl_time, crawl_count,
  170. created_at, updated_at)
  171. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  172. """, (item.title, source_id, item.rank, "",
  173. item.mobile_url, data.crawl_time, data.crawl_time,
  174. now_str, now_str))
  175. new_id = cursor.lastrowid
  176. # 记录初始排名
  177. cursor.execute("""
  178. INSERT INTO rank_history
  179. (news_item_id, rank, crawl_time, created_at)
  180. VALUES (?, ?, ?, ?)
  181. """, (new_id, item.rank, data.crawl_time, now_str))
  182. new_count += 1
  183. except sqlite3.Error as e:
  184. print(f"{log_prefix} 保存新闻条目失败 [{item.title[:30]}...]: {e}")
  185. total_items = new_count + updated_count
  186. # ========================================
  187. # 脱榜检测:检测上次在榜但这次不在榜的新闻
  188. # ========================================
  189. off_list_count = 0
  190. # 获取上一次抓取时间
  191. cursor.execute("""
  192. SELECT crawl_time FROM crawl_records
  193. WHERE crawl_time < ?
  194. ORDER BY crawl_time DESC
  195. LIMIT 1
  196. """, (data.crawl_time,))
  197. prev_record = cursor.fetchone()
  198. if prev_record:
  199. prev_crawl_time = prev_record[0]
  200. # 对于每个成功抓取的平台,检测脱榜
  201. for source_id in success_sources:
  202. # 获取当前抓取中该平台的所有标准化 URL
  203. current_urls = set()
  204. for item in data.items.get(source_id, []):
  205. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  206. if normalized_url:
  207. current_urls.add(normalized_url)
  208. # 查询上次在榜(last_crawl_time = prev_crawl_time)但这次不在榜的新闻
  209. # 这些新闻是"第一次脱榜",需要记录
  210. cursor.execute("""
  211. SELECT id, url FROM news_items
  212. WHERE platform_id = ?
  213. AND last_crawl_time = ?
  214. AND url != ''
  215. """, (source_id, prev_crawl_time))
  216. for row in cursor.fetchall():
  217. news_id, url = row[0], row[1]
  218. if url not in current_urls:
  219. # 插入脱榜记录(rank=0 表示脱榜)
  220. cursor.execute("""
  221. INSERT INTO rank_history
  222. (news_item_id, rank, crawl_time, created_at)
  223. VALUES (?, 0, ?, ?)
  224. """, (news_id, data.crawl_time, now_str))
  225. off_list_count += 1
  226. # 记录抓取信息
  227. cursor.execute("""
  228. INSERT OR REPLACE INTO crawl_records
  229. (crawl_time, total_items, created_at)
  230. VALUES (?, ?, ?)
  231. """, (data.crawl_time, total_items, now_str))
  232. # 获取刚插入的 crawl_record 的 ID
  233. cursor.execute("""
  234. SELECT id FROM crawl_records WHERE crawl_time = ?
  235. """, (data.crawl_time,))
  236. record_row = cursor.fetchone()
  237. if record_row:
  238. crawl_record_id = record_row[0]
  239. # 记录成功的来源
  240. for source_id in success_sources:
  241. cursor.execute("""
  242. INSERT OR REPLACE INTO crawl_source_status
  243. (crawl_record_id, platform_id, status)
  244. VALUES (?, ?, 'success')
  245. """, (crawl_record_id, source_id))
  246. # 记录失败的来源
  247. for failed_id in data.failed_ids:
  248. # 确保失败的平台也在 platforms 表中
  249. cursor.execute("""
  250. INSERT OR IGNORE INTO platforms (id, name, updated_at)
  251. VALUES (?, ?, ?)
  252. """, (failed_id, failed_id, now_str))
  253. cursor.execute("""
  254. INSERT OR REPLACE INTO crawl_source_status
  255. (crawl_record_id, platform_id, status)
  256. VALUES (?, ?, 'failed')
  257. """, (crawl_record_id, failed_id))
  258. conn.commit()
  259. return True, new_count, updated_count, title_changed_count, off_list_count
  260. except Exception as e:
  261. print(f"{log_prefix} 保存失败: {e}")
  262. return False, 0, 0, 0, 0
  263. def _get_today_all_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  264. """
  265. 获取指定日期的所有新闻数据(合并后)
  266. Args:
  267. date: 日期字符串,默认为今天
  268. Returns:
  269. 合并后的新闻数据
  270. """
  271. try:
  272. conn = self._get_connection(date)
  273. cursor = conn.cursor()
  274. # 获取所有新闻数据(包含 id 用于查询排名历史)
  275. cursor.execute("""
  276. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  277. n.rank, n.url, n.mobile_url,
  278. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  279. FROM news_items n
  280. LEFT JOIN platforms p ON n.platform_id = p.id
  281. ORDER BY n.platform_id, n.last_crawl_time
  282. """)
  283. rows = cursor.fetchall()
  284. if not rows:
  285. return None
  286. # 收集所有 news_item_id
  287. news_ids = [row[0] for row in rows]
  288. # 批量查询排名历史(同时获取时间和排名)
  289. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  290. # 这样可以避免显示新闻永久脱榜后的无意义记录
  291. rank_history_map: Dict[int, List[int]] = {}
  292. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  293. if news_ids:
  294. placeholders = ",".join("?" * len(news_ids))
  295. cursor.execute(f"""
  296. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  297. FROM rank_history rh
  298. JOIN news_items ni ON rh.news_item_id = ni.id
  299. WHERE rh.news_item_id IN ({placeholders})
  300. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  301. ORDER BY rh.news_item_id, rh.crawl_time
  302. """, news_ids)
  303. for rh_row in cursor.fetchall():
  304. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  305. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  306. if news_id not in rank_history_map:
  307. rank_history_map[news_id] = []
  308. if rank != 0 and rank not in rank_history_map[news_id]:
  309. rank_history_map[news_id].append(rank)
  310. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  311. if news_id not in rank_timeline_map:
  312. rank_timeline_map[news_id] = []
  313. # 提取时间部分(HH:MM)
  314. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  315. rank_timeline_map[news_id].append({
  316. "time": time_part,
  317. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  318. })
  319. # 按 platform_id 分组
  320. items: Dict[str, List[NewsItem]] = {}
  321. id_to_name: Dict[str, str] = {}
  322. crawl_date = self._format_date_folder(date)
  323. for row in rows:
  324. news_id = row[0]
  325. platform_id = row[2]
  326. title = row[1]
  327. platform_name = row[3] or platform_id
  328. id_to_name[platform_id] = platform_name
  329. if platform_id not in items:
  330. items[platform_id] = []
  331. # 获取排名历史,如果没有则使用当前排名
  332. ranks = rank_history_map.get(news_id, [row[4]])
  333. rank_timeline = rank_timeline_map.get(news_id, [])
  334. items[platform_id].append(NewsItem(
  335. title=title,
  336. source_id=platform_id,
  337. source_name=platform_name,
  338. rank=row[4],
  339. url=row[5] or "",
  340. mobile_url=row[6] or "",
  341. crawl_time=row[8], # last_crawl_time
  342. ranks=ranks,
  343. first_time=row[7], # first_crawl_time
  344. last_time=row[8], # last_crawl_time
  345. count=row[9], # crawl_count
  346. rank_timeline=rank_timeline,
  347. ))
  348. final_items = items
  349. # 获取失败的来源
  350. cursor.execute("""
  351. SELECT DISTINCT css.platform_id
  352. FROM crawl_source_status css
  353. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  354. WHERE css.status = 'failed'
  355. """)
  356. failed_ids = [row[0] for row in cursor.fetchall()]
  357. # 获取最新的抓取时间
  358. cursor.execute("""
  359. SELECT crawl_time FROM crawl_records
  360. ORDER BY crawl_time DESC
  361. LIMIT 1
  362. """)
  363. time_row = cursor.fetchone()
  364. crawl_time = time_row[0] if time_row else self._format_time_filename()
  365. return NewsData(
  366. date=crawl_date,
  367. crawl_time=crawl_time,
  368. items=final_items,
  369. id_to_name=id_to_name,
  370. failed_ids=failed_ids,
  371. )
  372. except Exception as e:
  373. print(f"[存储] 读取数据失败: {e}")
  374. return None
  375. def _get_latest_crawl_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  376. """
  377. 获取最新一次抓取的数据
  378. Args:
  379. date: 日期字符串,默认为今天
  380. Returns:
  381. 最新抓取的新闻数据
  382. """
  383. try:
  384. conn = self._get_connection(date)
  385. cursor = conn.cursor()
  386. # 获取最新的抓取时间
  387. cursor.execute("""
  388. SELECT crawl_time FROM crawl_records
  389. ORDER BY crawl_time DESC
  390. LIMIT 1
  391. """)
  392. time_row = cursor.fetchone()
  393. if not time_row:
  394. return None
  395. latest_time = time_row[0]
  396. # 获取该时间的新闻数据(包含 id 用于查询排名历史)
  397. cursor.execute("""
  398. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  399. n.rank, n.url, n.mobile_url,
  400. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  401. FROM news_items n
  402. LEFT JOIN platforms p ON n.platform_id = p.id
  403. WHERE n.last_crawl_time = ?
  404. """, (latest_time,))
  405. rows = cursor.fetchall()
  406. if not rows:
  407. return None
  408. # 收集所有 news_item_id
  409. news_ids = [row[0] for row in rows]
  410. # 批量查询排名历史(同时获取时间和排名)
  411. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  412. # 这样可以避免显示新闻永久脱榜后的无意义记录
  413. rank_history_map: Dict[int, List[int]] = {}
  414. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  415. if news_ids:
  416. placeholders = ",".join("?" * len(news_ids))
  417. cursor.execute(f"""
  418. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  419. FROM rank_history rh
  420. JOIN news_items ni ON rh.news_item_id = ni.id
  421. WHERE rh.news_item_id IN ({placeholders})
  422. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  423. ORDER BY rh.news_item_id, rh.crawl_time
  424. """, news_ids)
  425. for rh_row in cursor.fetchall():
  426. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  427. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  428. if news_id not in rank_history_map:
  429. rank_history_map[news_id] = []
  430. if rank != 0 and rank not in rank_history_map[news_id]:
  431. rank_history_map[news_id].append(rank)
  432. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  433. if news_id not in rank_timeline_map:
  434. rank_timeline_map[news_id] = []
  435. # 提取时间部分(HH:MM)
  436. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  437. rank_timeline_map[news_id].append({
  438. "time": time_part,
  439. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  440. })
  441. items: Dict[str, List[NewsItem]] = {}
  442. id_to_name: Dict[str, str] = {}
  443. crawl_date = self._format_date_folder(date)
  444. for row in rows:
  445. news_id = row[0]
  446. platform_id = row[2]
  447. platform_name = row[3] or platform_id
  448. id_to_name[platform_id] = platform_name
  449. if platform_id not in items:
  450. items[platform_id] = []
  451. # 获取排名历史,如果没有则使用当前排名
  452. ranks = rank_history_map.get(news_id, [row[4]])
  453. rank_timeline = rank_timeline_map.get(news_id, [])
  454. items[platform_id].append(NewsItem(
  455. title=row[1],
  456. source_id=platform_id,
  457. source_name=platform_name,
  458. rank=row[4],
  459. url=row[5] or "",
  460. mobile_url=row[6] or "",
  461. crawl_time=row[8], # last_crawl_time
  462. ranks=ranks,
  463. first_time=row[7], # first_crawl_time
  464. last_time=row[8], # last_crawl_time
  465. count=row[9], # crawl_count
  466. rank_timeline=rank_timeline,
  467. ))
  468. # 获取失败的来源(针对最新一次抓取)
  469. cursor.execute("""
  470. SELECT css.platform_id
  471. FROM crawl_source_status css
  472. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  473. WHERE cr.crawl_time = ? AND css.status = 'failed'
  474. """, (latest_time,))
  475. failed_ids = [row[0] for row in cursor.fetchall()]
  476. return NewsData(
  477. date=crawl_date,
  478. crawl_time=latest_time,
  479. items=items,
  480. id_to_name=id_to_name,
  481. failed_ids=failed_ids,
  482. )
  483. except Exception as e:
  484. print(f"[存储] 获取最新数据失败: {e}")
  485. return None
  486. def _detect_new_titles_impl(self, current_data: NewsData) -> Dict[str, Dict]:
  487. """
  488. 检测新增的标题
  489. 该方法比较当前抓取数据与历史数据,找出新增的标题。
  490. 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
  491. Args:
  492. current_data: 当前抓取的数据
  493. Returns:
  494. 新增的标题数据 {source_id: {title: NewsItem}}
  495. """
  496. try:
  497. # 获取历史数据
  498. historical_data = self._get_today_all_data_impl(current_data.date)
  499. if not historical_data:
  500. # 没有历史数据,所有都是新的
  501. new_titles = {}
  502. for source_id, news_list in current_data.items.items():
  503. new_titles[source_id] = {item.title: item for item in news_list}
  504. return new_titles
  505. # 获取当前批次时间
  506. current_time = current_data.crawl_time
  507. # 收集历史标题(first_time < current_time 的标题)
  508. # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
  509. historical_titles: Dict[str, set] = {}
  510. for source_id, news_list in historical_data.items.items():
  511. historical_titles[source_id] = set()
  512. for item in news_list:
  513. first_time = item.first_time or item.crawl_time
  514. if first_time < current_time:
  515. historical_titles[source_id].add(item.title)
  516. # 检查是否有历史数据
  517. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  518. if not has_historical_data:
  519. # 第一次抓取,没有"新增"概念
  520. return {}
  521. # 检测新增
  522. new_titles = {}
  523. for source_id, news_list in current_data.items.items():
  524. hist_set = historical_titles.get(source_id, set())
  525. for item in news_list:
  526. if item.title not in hist_set:
  527. if source_id not in new_titles:
  528. new_titles[source_id] = {}
  529. new_titles[source_id][item.title] = item
  530. return new_titles
  531. except Exception as e:
  532. print(f"[存储] 检测新标题失败: {e}")
  533. return {}
  534. def _is_first_crawl_today_impl(self, date: Optional[str] = None) -> bool:
  535. """
  536. 检查是否是当天第一次抓取
  537. Args:
  538. date: 日期字符串,默认为今天
  539. Returns:
  540. 是否是第一次抓取
  541. """
  542. try:
  543. conn = self._get_connection(date)
  544. cursor = conn.cursor()
  545. cursor.execute("""
  546. SELECT COUNT(*) as count FROM crawl_records
  547. """)
  548. row = cursor.fetchone()
  549. count = row[0] if row else 0
  550. # 如果只有一条或没有记录,视为第一次抓取
  551. return count <= 1
  552. except Exception as e:
  553. print(f"[存储] 检查首次抓取失败: {e}")
  554. return True
  555. def _get_crawl_times_impl(self, date: Optional[str] = None) -> List[str]:
  556. """
  557. 获取指定日期的所有抓取时间列表
  558. Args:
  559. date: 日期字符串,默认为今天
  560. Returns:
  561. 抓取时间列表(按时间排序)
  562. """
  563. try:
  564. conn = self._get_connection(date)
  565. cursor = conn.cursor()
  566. cursor.execute("""
  567. SELECT crawl_time FROM crawl_records
  568. ORDER BY crawl_time
  569. """)
  570. rows = cursor.fetchall()
  571. return [row[0] for row in rows]
  572. except Exception as e:
  573. print(f"[存储] 获取抓取时间列表失败: {e}")
  574. return []
  575. # ========================================
  576. # 时间段执行记录(调度系统)
  577. # ========================================
  578. def _has_period_executed_impl(self, date_str: str, period_key: str, action: str) -> bool:
  579. """
  580. 检查指定时间段的某个 action 今天是否已执行
  581. Args:
  582. date_str: 日期字符串 YYYY-MM-DD
  583. period_key: 时间段 key
  584. action: 动作类型 (analyze / push)
  585. Returns:
  586. 是否已执行
  587. """
  588. try:
  589. conn = self._get_connection(date_str)
  590. cursor = conn.cursor()
  591. # 先检查表是否存在
  592. cursor.execute("""
  593. SELECT name FROM sqlite_master
  594. WHERE type='table' AND name='period_executions'
  595. """)
  596. if not cursor.fetchone():
  597. return False
  598. cursor.execute("""
  599. SELECT 1 FROM period_executions
  600. WHERE execution_date = ? AND period_key = ? AND action = ?
  601. """, (date_str, period_key, action))
  602. return cursor.fetchone() is not None
  603. except Exception as e:
  604. print(f"[存储] 检查时间段执行记录失败: {e}")
  605. return False
  606. def _record_period_execution_impl(self, date_str: str, period_key: str, action: str) -> bool:
  607. """
  608. 记录时间段的 action 执行
  609. Args:
  610. date_str: 日期字符串 YYYY-MM-DD
  611. period_key: 时间段 key
  612. action: 动作类型 (analyze / push)
  613. Returns:
  614. 是否记录成功
  615. """
  616. try:
  617. conn = self._get_connection(date_str)
  618. cursor = conn.cursor()
  619. # 确保表存在
  620. cursor.execute("""
  621. CREATE TABLE IF NOT EXISTS period_executions (
  622. id INTEGER PRIMARY KEY AUTOINCREMENT,
  623. execution_date TEXT NOT NULL,
  624. period_key TEXT NOT NULL,
  625. action TEXT NOT NULL,
  626. executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  627. UNIQUE(execution_date, period_key, action)
  628. )
  629. """)
  630. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  631. cursor.execute("""
  632. INSERT OR IGNORE INTO period_executions (execution_date, period_key, action, executed_at)
  633. VALUES (?, ?, ?, ?)
  634. """, (date_str, period_key, action, now_str))
  635. conn.commit()
  636. return True
  637. except Exception as e:
  638. print(f"[存储] 记录时间段执行失败: {e}")
  639. return False
  640. # ========================================
  641. # RSS 数据存储
  642. # ========================================
  643. def _save_rss_data_impl(self, data: RSSData, log_prefix: str = "[存储]") -> tuple[bool, int, int]:
  644. """
  645. 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
  646. Args:
  647. data: RSS 数据
  648. log_prefix: 日志前缀
  649. Returns:
  650. (success, new_count, updated_count)
  651. """
  652. try:
  653. conn = self._get_connection(data.date, db_type="rss")
  654. cursor = conn.cursor()
  655. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  656. # 同步 RSS 源信息到 rss_feeds 表
  657. for feed_id, feed_name in data.id_to_name.items():
  658. cursor.execute("""
  659. INSERT INTO rss_feeds (id, name, updated_at)
  660. VALUES (?, ?, ?)
  661. ON CONFLICT(id) DO UPDATE SET
  662. name = excluded.name,
  663. updated_at = excluded.updated_at
  664. """, (feed_id, feed_name, now_str))
  665. # 统计计数器
  666. new_count = 0
  667. updated_count = 0
  668. for feed_id, rss_list in data.items.items():
  669. for item in rss_list:
  670. try:
  671. # 检查是否已存在(通过 URL + feed_id)
  672. if item.url:
  673. cursor.execute("""
  674. SELECT id, title FROM rss_items
  675. WHERE url = ? AND feed_id = ?
  676. """, (item.url, feed_id))
  677. existing = cursor.fetchone()
  678. if existing:
  679. # 已存在,更新记录
  680. existing_id = existing[0]
  681. cursor.execute("""
  682. UPDATE rss_items SET
  683. title = ?,
  684. published_at = ?,
  685. summary = ?,
  686. author = ?,
  687. last_crawl_time = ?,
  688. crawl_count = crawl_count + 1,
  689. updated_at = ?
  690. WHERE id = ?
  691. """, (item.title, item.published_at, item.summary,
  692. item.author, data.crawl_time, now_str, existing_id))
  693. updated_count += 1
  694. else:
  695. # 不存在,插入新记录(使用 ON CONFLICT 兜底处理并发/竞争场景)
  696. cursor.execute("""
  697. INSERT INTO rss_items
  698. (title, feed_id, url, published_at, summary, author,
  699. first_crawl_time, last_crawl_time, crawl_count,
  700. created_at, updated_at)
  701. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  702. ON CONFLICT(url, feed_id) DO UPDATE SET
  703. title = excluded.title,
  704. published_at = excluded.published_at,
  705. summary = excluded.summary,
  706. author = excluded.author,
  707. last_crawl_time = excluded.last_crawl_time,
  708. crawl_count = crawl_count + 1,
  709. updated_at = excluded.updated_at
  710. """, (item.title, feed_id, item.url, item.published_at,
  711. item.summary, item.author, data.crawl_time,
  712. data.crawl_time, now_str, now_str))
  713. new_count += 1
  714. else:
  715. # URL 为空,用 try-except 处理重复
  716. try:
  717. cursor.execute("""
  718. INSERT INTO rss_items
  719. (title, feed_id, url, published_at, summary, author,
  720. first_crawl_time, last_crawl_time, crawl_count,
  721. created_at, updated_at)
  722. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  723. """, (item.title, feed_id, "", item.published_at,
  724. item.summary, item.author, data.crawl_time,
  725. data.crawl_time, now_str, now_str))
  726. new_count += 1
  727. except sqlite3.IntegrityError:
  728. # 重复的空 URL 条目,忽略
  729. pass
  730. except sqlite3.Error as e:
  731. print(f"{log_prefix} 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
  732. total_items = new_count + updated_count
  733. # 记录抓取信息
  734. cursor.execute("""
  735. INSERT OR REPLACE INTO rss_crawl_records
  736. (crawl_time, total_items, created_at)
  737. VALUES (?, ?, ?)
  738. """, (data.crawl_time, total_items, now_str))
  739. # 记录抓取状态
  740. cursor.execute("""
  741. SELECT id FROM rss_crawl_records WHERE crawl_time = ?
  742. """, (data.crawl_time,))
  743. record_row = cursor.fetchone()
  744. if record_row:
  745. crawl_record_id = record_row[0]
  746. # 记录成功的源
  747. for feed_id in data.items.keys():
  748. cursor.execute("""
  749. INSERT OR REPLACE INTO rss_crawl_status
  750. (crawl_record_id, feed_id, status)
  751. VALUES (?, ?, 'success')
  752. """, (crawl_record_id, feed_id))
  753. # 记录失败的源
  754. for failed_id in data.failed_ids:
  755. cursor.execute("""
  756. INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
  757. VALUES (?, ?, ?)
  758. """, (failed_id, failed_id, now_str))
  759. cursor.execute("""
  760. INSERT OR REPLACE INTO rss_crawl_status
  761. (crawl_record_id, feed_id, status)
  762. VALUES (?, ?, 'failed')
  763. """, (crawl_record_id, failed_id))
  764. conn.commit()
  765. return True, new_count, updated_count
  766. except Exception as e:
  767. print(f"{log_prefix} 保存 RSS 数据失败: {e}")
  768. return False, 0, 0
  769. def _get_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  770. """
  771. 获取指定日期的所有 RSS 数据
  772. Args:
  773. date: 日期字符串(YYYY-MM-DD),默认为今天
  774. Returns:
  775. RSSData 对象,如果没有数据返回 None
  776. """
  777. try:
  778. conn = self._get_connection(date, db_type="rss")
  779. cursor = conn.cursor()
  780. # 获取所有 RSS 数据
  781. cursor.execute("""
  782. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  783. i.url, i.published_at, i.summary, i.author,
  784. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  785. FROM rss_items i
  786. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  787. ORDER BY i.published_at DESC
  788. """)
  789. rows = cursor.fetchall()
  790. if not rows:
  791. return None
  792. items: Dict[str, List[RSSItem]] = {}
  793. id_to_name: Dict[str, str] = {}
  794. crawl_date = self._format_date_folder(date)
  795. for row in rows:
  796. feed_id = row[2]
  797. feed_name = row[3] or feed_id
  798. id_to_name[feed_id] = feed_name
  799. if feed_id not in items:
  800. items[feed_id] = []
  801. items[feed_id].append(RSSItem(
  802. title=row[1],
  803. feed_id=feed_id,
  804. feed_name=feed_name,
  805. url=row[4] or "",
  806. published_at=row[5] or "",
  807. summary=row[6] or "",
  808. author=row[7] or "",
  809. crawl_time=row[9],
  810. first_time=row[8],
  811. last_time=row[9],
  812. count=row[10],
  813. ))
  814. # 获取最新的抓取时间
  815. cursor.execute("""
  816. SELECT crawl_time FROM rss_crawl_records
  817. ORDER BY crawl_time DESC
  818. LIMIT 1
  819. """)
  820. time_row = cursor.fetchone()
  821. crawl_time = time_row[0] if time_row else self._format_time_filename()
  822. # 获取失败的源
  823. cursor.execute("""
  824. SELECT DISTINCT cs.feed_id
  825. FROM rss_crawl_status cs
  826. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  827. WHERE cs.status = 'failed'
  828. """)
  829. failed_ids = [row[0] for row in cursor.fetchall()]
  830. return RSSData(
  831. date=crawl_date,
  832. crawl_time=crawl_time,
  833. items=items,
  834. id_to_name=id_to_name,
  835. failed_ids=failed_ids,
  836. )
  837. except Exception as e:
  838. print(f"[存储] 读取 RSS 数据失败: {e}")
  839. return None
  840. def _detect_new_rss_items_impl(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  841. """
  842. 检测新增的 RSS 条目(增量模式)
  843. 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
  844. 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
  845. Args:
  846. current_data: 当前抓取的 RSS 数据
  847. Returns:
  848. 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
  849. """
  850. try:
  851. # 获取历史数据
  852. historical_data = self._get_rss_data_impl(current_data.date)
  853. if not historical_data:
  854. # 没有历史数据,所有都是新的
  855. return current_data.items.copy()
  856. # 获取当前批次时间
  857. current_time = current_data.crawl_time
  858. # 收集历史 URL(first_time < current_time 的条目)
  859. historical_urls: Dict[str, set] = {}
  860. for feed_id, rss_list in historical_data.items.items():
  861. historical_urls[feed_id] = set()
  862. for item in rss_list:
  863. first_time = item.first_time or item.crawl_time
  864. if first_time < current_time:
  865. if item.url:
  866. historical_urls[feed_id].add(item.url)
  867. # 检查是否有历史数据
  868. has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
  869. if not has_historical_data:
  870. # 第一次抓取,没有"新增"概念
  871. return {}
  872. # 检测新增
  873. new_items: Dict[str, List[RSSItem]] = {}
  874. for feed_id, rss_list in current_data.items.items():
  875. hist_set = historical_urls.get(feed_id, set())
  876. for item in rss_list:
  877. # 通过 URL 判断是否新增
  878. if item.url and item.url not in hist_set:
  879. if feed_id not in new_items:
  880. new_items[feed_id] = []
  881. new_items[feed_id].append(item)
  882. return new_items
  883. except Exception as e:
  884. print(f"[存储] 检测新 RSS 条目失败: {e}")
  885. return {}
  886. def _get_latest_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  887. """
  888. 获取最新一次抓取的 RSS 数据(当前榜单模式)
  889. Args:
  890. date: 日期字符串(YYYY-MM-DD),默认为今天
  891. Returns:
  892. 最新抓取的 RSS 数据,如果没有数据返回 None
  893. """
  894. try:
  895. conn = self._get_connection(date, db_type="rss")
  896. cursor = conn.cursor()
  897. # 获取最新的抓取时间
  898. cursor.execute("""
  899. SELECT crawl_time FROM rss_crawl_records
  900. ORDER BY crawl_time DESC
  901. LIMIT 1
  902. """)
  903. time_row = cursor.fetchone()
  904. if not time_row:
  905. return None
  906. latest_time = time_row[0]
  907. # 获取该时间的 RSS 数据
  908. cursor.execute("""
  909. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  910. i.url, i.published_at, i.summary, i.author,
  911. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  912. FROM rss_items i
  913. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  914. WHERE i.last_crawl_time = ?
  915. ORDER BY i.published_at DESC
  916. """, (latest_time,))
  917. rows = cursor.fetchall()
  918. if not rows:
  919. return None
  920. items: Dict[str, List[RSSItem]] = {}
  921. id_to_name: Dict[str, str] = {}
  922. crawl_date = self._format_date_folder(date)
  923. for row in rows:
  924. feed_id = row[2]
  925. feed_name = row[3] or feed_id
  926. id_to_name[feed_id] = feed_name
  927. if feed_id not in items:
  928. items[feed_id] = []
  929. items[feed_id].append(RSSItem(
  930. title=row[1],
  931. feed_id=feed_id,
  932. feed_name=feed_name,
  933. url=row[4] or "",
  934. published_at=row[5] or "",
  935. summary=row[6] or "",
  936. author=row[7] or "",
  937. crawl_time=row[9],
  938. first_time=row[8],
  939. last_time=row[9],
  940. count=row[10],
  941. ))
  942. # 获取失败的源(针对最新一次抓取)
  943. cursor.execute("""
  944. SELECT cs.feed_id
  945. FROM rss_crawl_status cs
  946. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  947. WHERE cr.crawl_time = ? AND cs.status = 'failed'
  948. """, (latest_time,))
  949. failed_ids = [row[0] for row in cursor.fetchall()]
  950. return RSSData(
  951. date=crawl_date,
  952. crawl_time=latest_time,
  953. items=items,
  954. id_to_name=id_to_name,
  955. failed_ids=failed_ids,
  956. )
  957. except Exception as e:
  958. print(f"[存储] 获取最新 RSS 数据失败: {e}")
  959. return None