sqlite_mixin.py 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735
  1. # coding=utf-8
  2. """
  3. SQLite 存储 Mixin
  4. 提供共用的 SQLite 数据库操作逻辑,供 LocalStorageBackend 和 RemoteStorageBackend 复用。
  5. """
  6. import sqlite3
  7. from abc import abstractmethod
  8. from datetime import datetime
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from trendradar.storage.base import NewsItem, NewsData, RSSItem, RSSData
  12. from trendradar.utils.url import normalize_url
  13. class SQLiteStorageMixin:
  14. """
  15. SQLite 存储操作 Mixin
  16. 子类需要实现以下抽象方法:
  17. - _get_connection(date, db_type) -> sqlite3.Connection
  18. - _get_configured_time() -> datetime
  19. - _format_date_folder(date) -> str
  20. - _format_time_filename() -> str
  21. """
  22. # ========================================
  23. # 抽象方法 - 子类必须实现
  24. # ========================================
  25. @abstractmethod
  26. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  27. """获取数据库连接"""
  28. pass
  29. @abstractmethod
  30. def _get_configured_time(self) -> datetime:
  31. """获取配置时区的当前时间"""
  32. pass
  33. @abstractmethod
  34. def _format_date_folder(self, date: Optional[str] = None) -> str:
  35. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  36. pass
  37. @abstractmethod
  38. def _format_time_filename(self) -> str:
  39. """格式化时间文件名 (格式: HH-MM)"""
  40. pass
  41. # ========================================
  42. # Schema 管理
  43. # ========================================
  44. def _get_schema_path(self, db_type: str = "news") -> Path:
  45. """
  46. 获取 schema.sql 文件路径
  47. Args:
  48. db_type: 数据库类型 ("news" 或 "rss")
  49. Returns:
  50. schema 文件路径
  51. """
  52. if db_type == "rss":
  53. return Path(__file__).parent / "rss_schema.sql"
  54. return Path(__file__).parent / "schema.sql"
  55. def _get_ai_filter_schema_path(self) -> Path:
  56. """获取 AI 筛选 schema 文件路径"""
  57. return Path(__file__).parent / "ai_filter_schema.sql"
  58. def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
  59. """
  60. 从 schema.sql 初始化数据库表结构
  61. Args:
  62. conn: 数据库连接
  63. db_type: 数据库类型 ("news" 或 "rss")
  64. """
  65. schema_path = self._get_schema_path(db_type)
  66. if schema_path.exists():
  67. with open(schema_path, "r", encoding="utf-8") as f:
  68. schema_sql = f.read()
  69. conn.executescript(schema_sql)
  70. else:
  71. raise FileNotFoundError(f"Schema file not found: {schema_path}")
  72. # news 库额外加载 AI 筛选表结构
  73. if db_type == "news":
  74. ai_filter_schema = self._get_ai_filter_schema_path()
  75. if ai_filter_schema.exists():
  76. with open(ai_filter_schema, "r", encoding="utf-8") as f:
  77. conn.executescript(f.read())
  78. if db_type == "rss":
  79. self._migrate_rss_schema(conn)
  80. conn.commit()
  81. def _migrate_rss_schema(self, conn: sqlite3.Connection) -> None:
  82. """迁移 rss_items 表结构(为已有数据库添加 guid 列)"""
  83. cursor = conn.execute("PRAGMA table_info(rss_items)")
  84. columns = {row[1] for row in cursor.fetchall()}
  85. if "guid" not in columns:
  86. conn.execute("ALTER TABLE rss_items ADD COLUMN guid TEXT DEFAULT ''")
  87. conn.execute("""
  88. CREATE UNIQUE INDEX IF NOT EXISTS idx_rss_guid_feed
  89. ON rss_items(guid, feed_id) WHERE guid != ''
  90. """)
  91. # ========================================
  92. # 新闻数据存储
  93. # ========================================
  94. def _save_news_data_impl(self, data: NewsData, log_prefix: str = "[存储]") -> tuple[bool, int, int, int, int]:
  95. """
  96. 保存新闻数据到 SQLite(核心实现)
  97. Args:
  98. data: 新闻数据
  99. log_prefix: 日志前缀
  100. Returns:
  101. (success, new_count, updated_count, title_changed_count, off_list_count)
  102. """
  103. try:
  104. conn = self._get_connection(data.date)
  105. cursor = conn.cursor()
  106. # 获取配置时区的当前时间
  107. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  108. # 首先同步平台信息到 platforms 表
  109. for source_id, source_name in data.id_to_name.items():
  110. cursor.execute("""
  111. INSERT INTO platforms (id, name, updated_at)
  112. VALUES (?, ?, ?)
  113. ON CONFLICT(id) DO UPDATE SET
  114. name = excluded.name,
  115. updated_at = excluded.updated_at
  116. """, (source_id, source_name, now_str))
  117. # 统计计数器
  118. new_count = 0
  119. updated_count = 0
  120. title_changed_count = 0
  121. success_sources = []
  122. for source_id, news_list in data.items.items():
  123. success_sources.append(source_id)
  124. for item in news_list:
  125. try:
  126. # 标准化 URL(去除动态参数,如微博的 band_rank)
  127. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  128. # 检查是否已存在(通过标准化 URL + platform_id)
  129. if normalized_url:
  130. cursor.execute("""
  131. SELECT id, title FROM news_items
  132. WHERE url = ? AND platform_id = ?
  133. """, (normalized_url, source_id))
  134. existing = cursor.fetchone()
  135. if existing:
  136. # 已存在,更新记录
  137. existing_id, existing_title = existing
  138. update_title = item.title
  139. if (update_title and update_title.strip().startswith(("http://", "https://", "//"))
  140. and existing_title and not existing_title.strip().startswith(("http://", "https://", "//"))):
  141. update_title = existing_title
  142. # 检查标题是否变化
  143. if existing_title != update_title:
  144. # 记录标题变更
  145. cursor.execute("""
  146. INSERT INTO title_changes
  147. (news_item_id, old_title, new_title, changed_at)
  148. VALUES (?, ?, ?, ?)
  149. """, (existing_id, existing_title, update_title, now_str))
  150. title_changed_count += 1
  151. # 记录排名历史
  152. cursor.execute("""
  153. INSERT INTO rank_history
  154. (news_item_id, rank, crawl_time, created_at)
  155. VALUES (?, ?, ?, ?)
  156. """, (existing_id, item.rank, data.crawl_time, now_str))
  157. # 更新现有记录
  158. cursor.execute("""
  159. UPDATE news_items SET
  160. title = ?,
  161. rank = ?,
  162. mobile_url = ?,
  163. last_crawl_time = ?,
  164. crawl_count = crawl_count + 1,
  165. updated_at = ?
  166. WHERE id = ?
  167. """, (update_title, item.rank, item.mobile_url,
  168. data.crawl_time, now_str, existing_id))
  169. updated_count += 1
  170. else:
  171. # 不存在,插入新记录(存储标准化后的 URL)
  172. cursor.execute("""
  173. INSERT INTO news_items
  174. (title, platform_id, rank, url, mobile_url,
  175. first_crawl_time, last_crawl_time, crawl_count,
  176. created_at, updated_at)
  177. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  178. """, (item.title, source_id, item.rank, normalized_url,
  179. item.mobile_url, data.crawl_time, data.crawl_time,
  180. now_str, now_str))
  181. new_id = cursor.lastrowid
  182. # 记录初始排名
  183. cursor.execute("""
  184. INSERT INTO rank_history
  185. (news_item_id, rank, crawl_time, created_at)
  186. VALUES (?, ?, ?, ?)
  187. """, (new_id, item.rank, data.crawl_time, now_str))
  188. new_count += 1
  189. else:
  190. # URL 为空的情况,直接插入(不做去重)
  191. cursor.execute("""
  192. INSERT INTO news_items
  193. (title, platform_id, rank, url, mobile_url,
  194. first_crawl_time, last_crawl_time, crawl_count,
  195. created_at, updated_at)
  196. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  197. """, (item.title, source_id, item.rank, "",
  198. item.mobile_url, data.crawl_time, data.crawl_time,
  199. now_str, now_str))
  200. new_id = cursor.lastrowid
  201. # 记录初始排名
  202. cursor.execute("""
  203. INSERT INTO rank_history
  204. (news_item_id, rank, crawl_time, created_at)
  205. VALUES (?, ?, ?, ?)
  206. """, (new_id, item.rank, data.crawl_time, now_str))
  207. new_count += 1
  208. except sqlite3.Error as e:
  209. print(f"{log_prefix} 保存新闻条目失败 [{item.title[:30]}...]: {e}")
  210. total_items = new_count + updated_count
  211. # ========================================
  212. # 脱榜检测:检测上次在榜但这次不在榜的新闻
  213. # ========================================
  214. off_list_count = 0
  215. # 获取上一次抓取时间
  216. cursor.execute("""
  217. SELECT crawl_time FROM crawl_records
  218. WHERE crawl_time < ?
  219. ORDER BY crawl_time DESC
  220. LIMIT 1
  221. """, (data.crawl_time,))
  222. prev_record = cursor.fetchone()
  223. if prev_record:
  224. prev_crawl_time = prev_record[0]
  225. # 对于每个成功抓取的平台,检测脱榜
  226. for source_id in success_sources:
  227. # 获取当前抓取中该平台的所有标准化 URL
  228. current_urls = set()
  229. for item in data.items.get(source_id, []):
  230. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  231. if normalized_url:
  232. current_urls.add(normalized_url)
  233. # 查询上次在榜(last_crawl_time = prev_crawl_time)但这次不在榜的新闻
  234. # 这些新闻是"第一次脱榜",需要记录
  235. cursor.execute("""
  236. SELECT id, url FROM news_items
  237. WHERE platform_id = ?
  238. AND last_crawl_time = ?
  239. AND url != ''
  240. """, (source_id, prev_crawl_time))
  241. for row in cursor.fetchall():
  242. news_id, url = row[0], row[1]
  243. if url not in current_urls:
  244. # 插入脱榜记录(rank=0 表示脱榜)
  245. cursor.execute("""
  246. INSERT INTO rank_history
  247. (news_item_id, rank, crawl_time, created_at)
  248. VALUES (?, 0, ?, ?)
  249. """, (news_id, data.crawl_time, now_str))
  250. off_list_count += 1
  251. # 记录抓取信息
  252. cursor.execute("""
  253. INSERT OR REPLACE INTO crawl_records
  254. (crawl_time, total_items, created_at)
  255. VALUES (?, ?, ?)
  256. """, (data.crawl_time, total_items, now_str))
  257. # 获取刚插入的 crawl_record 的 ID
  258. cursor.execute("""
  259. SELECT id FROM crawl_records WHERE crawl_time = ?
  260. """, (data.crawl_time,))
  261. record_row = cursor.fetchone()
  262. if record_row:
  263. crawl_record_id = record_row[0]
  264. # 记录成功的来源
  265. for source_id in success_sources:
  266. cursor.execute("""
  267. INSERT OR REPLACE INTO crawl_source_status
  268. (crawl_record_id, platform_id, status)
  269. VALUES (?, ?, 'success')
  270. """, (crawl_record_id, source_id))
  271. # 记录失败的来源
  272. for failed_id in data.failed_ids:
  273. # 确保失败的平台也在 platforms 表中
  274. cursor.execute("""
  275. INSERT OR IGNORE INTO platforms (id, name, updated_at)
  276. VALUES (?, ?, ?)
  277. """, (failed_id, failed_id, now_str))
  278. cursor.execute("""
  279. INSERT OR REPLACE INTO crawl_source_status
  280. (crawl_record_id, platform_id, status)
  281. VALUES (?, ?, 'failed')
  282. """, (crawl_record_id, failed_id))
  283. conn.commit()
  284. return True, new_count, updated_count, title_changed_count, off_list_count
  285. except Exception as e:
  286. print(f"{log_prefix} 保存失败: {e}")
  287. return False, 0, 0, 0, 0
  288. def _get_today_all_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  289. """
  290. 获取指定日期的所有新闻数据(合并后)
  291. Args:
  292. date: 日期字符串,默认为今天
  293. Returns:
  294. 合并后的新闻数据
  295. """
  296. try:
  297. conn = self._get_connection(date)
  298. cursor = conn.cursor()
  299. # 获取所有新闻数据(包含 id 用于查询排名历史)
  300. cursor.execute("""
  301. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  302. n.rank, n.url, n.mobile_url,
  303. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  304. FROM news_items n
  305. LEFT JOIN platforms p ON n.platform_id = p.id
  306. ORDER BY n.platform_id, n.last_crawl_time
  307. """)
  308. rows = cursor.fetchall()
  309. if not rows:
  310. return None
  311. # 收集所有 news_item_id
  312. news_ids = [row[0] for row in rows]
  313. # 批量查询排名历史(同时获取时间和排名)
  314. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  315. # 这样可以避免显示新闻永久脱榜后的无意义记录
  316. rank_history_map: Dict[int, List[int]] = {}
  317. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  318. if news_ids:
  319. placeholders = ",".join("?" * len(news_ids))
  320. cursor.execute(f"""
  321. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  322. FROM rank_history rh
  323. JOIN news_items ni ON rh.news_item_id = ni.id
  324. WHERE rh.news_item_id IN ({placeholders})
  325. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  326. ORDER BY rh.news_item_id, rh.crawl_time
  327. """, news_ids)
  328. for rh_row in cursor.fetchall():
  329. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  330. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  331. if news_id not in rank_history_map:
  332. rank_history_map[news_id] = []
  333. if rank != 0 and rank not in rank_history_map[news_id]:
  334. rank_history_map[news_id].append(rank)
  335. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  336. if news_id not in rank_timeline_map:
  337. rank_timeline_map[news_id] = []
  338. # 提取时间部分(HH:MM)
  339. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  340. rank_timeline_map[news_id].append({
  341. "time": time_part,
  342. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  343. })
  344. # 按 platform_id 分组
  345. items: Dict[str, List[NewsItem]] = {}
  346. id_to_name: Dict[str, str] = {}
  347. crawl_date = self._format_date_folder(date)
  348. for row in rows:
  349. news_id = row[0]
  350. platform_id = row[2]
  351. title = row[1]
  352. platform_name = row[3] or platform_id
  353. id_to_name[platform_id] = platform_name
  354. if platform_id not in items:
  355. items[platform_id] = []
  356. # 获取排名历史,如果没有则使用当前排名
  357. ranks = rank_history_map.get(news_id, [row[4]])
  358. rank_timeline = rank_timeline_map.get(news_id, [])
  359. items[platform_id].append(NewsItem(
  360. title=title,
  361. source_id=platform_id,
  362. source_name=platform_name,
  363. rank=row[4],
  364. url=row[5] or "",
  365. mobile_url=row[6] or "",
  366. crawl_time=row[8], # last_crawl_time
  367. ranks=ranks,
  368. first_time=row[7], # first_crawl_time
  369. last_time=row[8], # last_crawl_time
  370. count=row[9], # crawl_count
  371. rank_timeline=rank_timeline,
  372. ))
  373. final_items = items
  374. # 获取失败的来源
  375. cursor.execute("""
  376. SELECT DISTINCT css.platform_id
  377. FROM crawl_source_status css
  378. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  379. WHERE css.status = 'failed'
  380. """)
  381. failed_ids = [row[0] for row in cursor.fetchall()]
  382. # 获取最新的抓取时间
  383. cursor.execute("""
  384. SELECT crawl_time FROM crawl_records
  385. ORDER BY crawl_time DESC
  386. LIMIT 1
  387. """)
  388. time_row = cursor.fetchone()
  389. crawl_time = time_row[0] if time_row else self._format_time_filename()
  390. return NewsData(
  391. date=crawl_date,
  392. crawl_time=crawl_time,
  393. items=final_items,
  394. id_to_name=id_to_name,
  395. failed_ids=failed_ids,
  396. )
  397. except Exception as e:
  398. print(f"[存储] 读取数据失败: {e}")
  399. return None
  400. def _get_latest_crawl_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  401. """
  402. 获取最新一次抓取的数据
  403. Args:
  404. date: 日期字符串,默认为今天
  405. Returns:
  406. 最新抓取的新闻数据
  407. """
  408. try:
  409. conn = self._get_connection(date)
  410. cursor = conn.cursor()
  411. # 获取最新的抓取时间
  412. cursor.execute("""
  413. SELECT crawl_time FROM crawl_records
  414. ORDER BY crawl_time DESC
  415. LIMIT 1
  416. """)
  417. time_row = cursor.fetchone()
  418. if not time_row:
  419. return None
  420. latest_time = time_row[0]
  421. # 获取该时间的新闻数据(包含 id 用于查询排名历史)
  422. cursor.execute("""
  423. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  424. n.rank, n.url, n.mobile_url,
  425. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  426. FROM news_items n
  427. LEFT JOIN platforms p ON n.platform_id = p.id
  428. WHERE n.last_crawl_time = ?
  429. """, (latest_time,))
  430. rows = cursor.fetchall()
  431. if not rows:
  432. return None
  433. # 收集所有 news_item_id
  434. news_ids = [row[0] for row in rows]
  435. # 批量查询排名历史(同时获取时间和排名)
  436. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  437. # 这样可以避免显示新闻永久脱榜后的无意义记录
  438. rank_history_map: Dict[int, List[int]] = {}
  439. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  440. if news_ids:
  441. placeholders = ",".join("?" * len(news_ids))
  442. cursor.execute(f"""
  443. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  444. FROM rank_history rh
  445. JOIN news_items ni ON rh.news_item_id = ni.id
  446. WHERE rh.news_item_id IN ({placeholders})
  447. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  448. ORDER BY rh.news_item_id, rh.crawl_time
  449. """, news_ids)
  450. for rh_row in cursor.fetchall():
  451. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  452. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  453. if news_id not in rank_history_map:
  454. rank_history_map[news_id] = []
  455. if rank != 0 and rank not in rank_history_map[news_id]:
  456. rank_history_map[news_id].append(rank)
  457. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  458. if news_id not in rank_timeline_map:
  459. rank_timeline_map[news_id] = []
  460. # 提取时间部分(HH:MM)
  461. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  462. rank_timeline_map[news_id].append({
  463. "time": time_part,
  464. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  465. })
  466. items: Dict[str, List[NewsItem]] = {}
  467. id_to_name: Dict[str, str] = {}
  468. crawl_date = self._format_date_folder(date)
  469. for row in rows:
  470. news_id = row[0]
  471. platform_id = row[2]
  472. platform_name = row[3] or platform_id
  473. id_to_name[platform_id] = platform_name
  474. if platform_id not in items:
  475. items[platform_id] = []
  476. # 获取排名历史,如果没有则使用当前排名
  477. ranks = rank_history_map.get(news_id, [row[4]])
  478. rank_timeline = rank_timeline_map.get(news_id, [])
  479. items[platform_id].append(NewsItem(
  480. title=row[1],
  481. source_id=platform_id,
  482. source_name=platform_name,
  483. rank=row[4],
  484. url=row[5] or "",
  485. mobile_url=row[6] or "",
  486. crawl_time=row[8], # last_crawl_time
  487. ranks=ranks,
  488. first_time=row[7], # first_crawl_time
  489. last_time=row[8], # last_crawl_time
  490. count=row[9], # crawl_count
  491. rank_timeline=rank_timeline,
  492. ))
  493. # 获取失败的来源(针对最新一次抓取)
  494. cursor.execute("""
  495. SELECT css.platform_id
  496. FROM crawl_source_status css
  497. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  498. WHERE cr.crawl_time = ? AND css.status = 'failed'
  499. """, (latest_time,))
  500. failed_ids = [row[0] for row in cursor.fetchall()]
  501. return NewsData(
  502. date=crawl_date,
  503. crawl_time=latest_time,
  504. items=items,
  505. id_to_name=id_to_name,
  506. failed_ids=failed_ids,
  507. )
  508. except Exception as e:
  509. print(f"[存储] 获取最新数据失败: {e}")
  510. return None
  511. def _detect_new_titles_impl(self, current_data: NewsData) -> Dict[str, Dict]:
  512. """
  513. 检测新增的标题
  514. 该方法比较当前抓取数据与历史数据,找出新增的标题。
  515. 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
  516. Args:
  517. current_data: 当前抓取的数据
  518. Returns:
  519. 新增的标题数据 {source_id: {title: NewsItem}}
  520. """
  521. try:
  522. # 获取历史数据
  523. historical_data = self._get_today_all_data_impl(current_data.date)
  524. if not historical_data:
  525. # 没有历史数据,所有都是新的
  526. new_titles = {}
  527. for source_id, news_list in current_data.items.items():
  528. new_titles[source_id] = {item.title: item for item in news_list}
  529. return new_titles
  530. # 获取当前批次时间
  531. current_time = current_data.crawl_time
  532. # 收集历史标题(first_time < current_time 的标题)
  533. # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
  534. historical_titles: Dict[str, set] = {}
  535. for source_id, news_list in historical_data.items.items():
  536. historical_titles[source_id] = set()
  537. for item in news_list:
  538. first_time = item.first_time or item.crawl_time
  539. if first_time < current_time:
  540. historical_titles[source_id].add(item.title)
  541. # 检查是否有历史数据
  542. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  543. if not has_historical_data:
  544. # 第一次抓取,没有"新增"概念
  545. return {}
  546. # 检测新增
  547. new_titles = {}
  548. for source_id, news_list in current_data.items.items():
  549. hist_set = historical_titles.get(source_id, set())
  550. for item in news_list:
  551. if item.title not in hist_set:
  552. if source_id not in new_titles:
  553. new_titles[source_id] = {}
  554. new_titles[source_id][item.title] = item
  555. return new_titles
  556. except Exception as e:
  557. print(f"[存储] 检测新标题失败: {e}")
  558. return {}
  559. def _is_first_crawl_today_impl(self, date: Optional[str] = None) -> bool:
  560. """
  561. 检查是否是当天第一次抓取
  562. Args:
  563. date: 日期字符串,默认为今天
  564. Returns:
  565. 是否是第一次抓取
  566. """
  567. try:
  568. conn = self._get_connection(date)
  569. cursor = conn.cursor()
  570. cursor.execute("""
  571. SELECT COUNT(*) as count FROM crawl_records
  572. """)
  573. row = cursor.fetchone()
  574. count = row[0] if row else 0
  575. # 如果只有一条或没有记录,视为第一次抓取
  576. return count <= 1
  577. except Exception as e:
  578. print(f"[存储] 检查首次抓取失败: {e}")
  579. return True
  580. def _get_crawl_times_impl(self, date: Optional[str] = None) -> List[str]:
  581. """
  582. 获取指定日期的所有抓取时间列表
  583. Args:
  584. date: 日期字符串,默认为今天
  585. Returns:
  586. 抓取时间列表(按时间排序)
  587. """
  588. try:
  589. conn = self._get_connection(date)
  590. cursor = conn.cursor()
  591. cursor.execute("""
  592. SELECT crawl_time FROM crawl_records
  593. ORDER BY crawl_time
  594. """)
  595. rows = cursor.fetchall()
  596. return [row[0] for row in rows]
  597. except Exception as e:
  598. print(f"[存储] 获取抓取时间列表失败: {e}")
  599. return []
  600. # ========================================
  601. # 时间段执行记录(调度系统)
  602. # ========================================
  603. def _has_period_executed_impl(self, date_str: str, period_key: str, action: str) -> bool:
  604. """
  605. 检查指定时间段的某个 action 今天是否已执行
  606. Args:
  607. date_str: 日期字符串 YYYY-MM-DD
  608. period_key: 时间段 key
  609. action: 动作类型 (analyze / push)
  610. Returns:
  611. 是否已执行
  612. """
  613. try:
  614. conn = self._get_connection(date_str)
  615. cursor = conn.cursor()
  616. # 先检查表是否存在
  617. cursor.execute("""
  618. SELECT name FROM sqlite_master
  619. WHERE type='table' AND name='period_executions'
  620. """)
  621. if not cursor.fetchone():
  622. return False
  623. cursor.execute("""
  624. SELECT 1 FROM period_executions
  625. WHERE execution_date = ? AND period_key = ? AND action = ?
  626. """, (date_str, period_key, action))
  627. return cursor.fetchone() is not None
  628. except Exception as e:
  629. print(f"[存储] 检查时间段执行记录失败: {e}")
  630. return False
  631. def _record_period_execution_impl(self, date_str: str, period_key: str, action: str) -> bool:
  632. """
  633. 记录时间段的 action 执行
  634. Args:
  635. date_str: 日期字符串 YYYY-MM-DD
  636. period_key: 时间段 key
  637. action: 动作类型 (analyze / push)
  638. Returns:
  639. 是否记录成功
  640. """
  641. try:
  642. conn = self._get_connection(date_str)
  643. cursor = conn.cursor()
  644. # 确保表存在
  645. cursor.execute("""
  646. CREATE TABLE IF NOT EXISTS period_executions (
  647. id INTEGER PRIMARY KEY AUTOINCREMENT,
  648. execution_date TEXT NOT NULL,
  649. period_key TEXT NOT NULL,
  650. action TEXT NOT NULL,
  651. executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  652. UNIQUE(execution_date, period_key, action)
  653. )
  654. """)
  655. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  656. cursor.execute("""
  657. INSERT OR IGNORE INTO period_executions (execution_date, period_key, action, executed_at)
  658. VALUES (?, ?, ?, ?)
  659. """, (date_str, period_key, action, now_str))
  660. conn.commit()
  661. return True
  662. except Exception as e:
  663. print(f"[存储] 记录时间段执行失败: {e}")
  664. return False
  665. # ========================================
  666. # RSS 数据存储
  667. # ========================================
  668. def _save_rss_data_impl(self, data: RSSData, log_prefix: str = "[存储]") -> tuple[bool, int, int]:
  669. """
  670. 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
  671. Args:
  672. data: RSS 数据
  673. log_prefix: 日志前缀
  674. Returns:
  675. (success, new_count, updated_count)
  676. """
  677. try:
  678. conn = self._get_connection(data.date, db_type="rss")
  679. cursor = conn.cursor()
  680. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  681. # 同步 RSS 源信息到 rss_feeds 表
  682. for feed_id, feed_name in data.id_to_name.items():
  683. cursor.execute("""
  684. INSERT INTO rss_feeds (id, name, updated_at)
  685. VALUES (?, ?, ?)
  686. ON CONFLICT(id) DO UPDATE SET
  687. name = excluded.name,
  688. updated_at = excluded.updated_at
  689. """, (feed_id, feed_name, now_str))
  690. # 统计计数器
  691. new_count = 0
  692. updated_count = 0
  693. for feed_id, rss_list in data.items.items():
  694. for item in rss_list:
  695. try:
  696. item_guid = getattr(item, "guid", "") or ""
  697. existing = None
  698. # 去重优先级:guid > url
  699. if item_guid:
  700. cursor.execute("""
  701. SELECT id, title FROM rss_items
  702. WHERE guid = ? AND feed_id = ?
  703. """, (item_guid, feed_id))
  704. existing = cursor.fetchone()
  705. if not existing and item.url:
  706. cursor.execute("""
  707. SELECT id, title FROM rss_items
  708. WHERE url = ? AND feed_id = ?
  709. """, (item.url, feed_id))
  710. existing = cursor.fetchone()
  711. if existing:
  712. existing_id = existing[0]
  713. existing_title = existing[1]
  714. update_title = item.title
  715. if (update_title and update_title.strip().startswith(("http://", "https://", "//"))
  716. and existing_title and not existing_title.strip().startswith(("http://", "https://", "//"))):
  717. update_title = existing_title
  718. cursor.execute("""
  719. UPDATE rss_items SET
  720. title = ?,
  721. url = CASE WHEN ? != '' THEN ? ELSE url END,
  722. guid = CASE WHEN ? != '' THEN ? ELSE guid END,
  723. published_at = ?,
  724. summary = ?,
  725. author = ?,
  726. last_crawl_time = ?,
  727. crawl_count = crawl_count + 1,
  728. updated_at = ?
  729. WHERE id = ?
  730. """, (update_title,
  731. item.url, item.url,
  732. item_guid, item_guid,
  733. item.published_at, item.summary,
  734. item.author, data.crawl_time, now_str, existing_id))
  735. updated_count += 1
  736. elif item.url or item_guid:
  737. try:
  738. cursor.execute("""
  739. INSERT INTO rss_items
  740. (title, feed_id, url, guid, published_at, summary, author,
  741. first_crawl_time, last_crawl_time, crawl_count,
  742. created_at, updated_at)
  743. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  744. """, (item.title, feed_id, item.url, item_guid,
  745. item.published_at, item.summary, item.author,
  746. data.crawl_time, data.crawl_time, now_str, now_str))
  747. new_count += 1
  748. except sqlite3.IntegrityError:
  749. pass
  750. except sqlite3.Error as e:
  751. print(f"{log_prefix} 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
  752. total_items = new_count + updated_count
  753. # 记录抓取信息
  754. cursor.execute("""
  755. INSERT OR REPLACE INTO rss_crawl_records
  756. (crawl_time, total_items, created_at)
  757. VALUES (?, ?, ?)
  758. """, (data.crawl_time, total_items, now_str))
  759. # 记录抓取状态
  760. cursor.execute("""
  761. SELECT id FROM rss_crawl_records WHERE crawl_time = ?
  762. """, (data.crawl_time,))
  763. record_row = cursor.fetchone()
  764. if record_row:
  765. crawl_record_id = record_row[0]
  766. # 记录成功的源
  767. for feed_id in data.items.keys():
  768. cursor.execute("""
  769. INSERT OR REPLACE INTO rss_crawl_status
  770. (crawl_record_id, feed_id, status)
  771. VALUES (?, ?, 'success')
  772. """, (crawl_record_id, feed_id))
  773. # 记录失败的源
  774. for failed_id in data.failed_ids:
  775. cursor.execute("""
  776. INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
  777. VALUES (?, ?, ?)
  778. """, (failed_id, failed_id, now_str))
  779. cursor.execute("""
  780. INSERT OR REPLACE INTO rss_crawl_status
  781. (crawl_record_id, feed_id, status)
  782. VALUES (?, ?, 'failed')
  783. """, (crawl_record_id, failed_id))
  784. conn.commit()
  785. return True, new_count, updated_count
  786. except Exception as e:
  787. print(f"{log_prefix} 保存 RSS 数据失败: {e}")
  788. return False, 0, 0
  789. def _get_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  790. """
  791. 获取指定日期的所有 RSS 数据
  792. Args:
  793. date: 日期字符串(YYYY-MM-DD),默认为今天
  794. Returns:
  795. RSSData 对象,如果没有数据返回 None
  796. """
  797. try:
  798. conn = self._get_connection(date, db_type="rss")
  799. cursor = conn.cursor()
  800. # 获取所有 RSS 数据
  801. cursor.execute("""
  802. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  803. i.url, i.published_at, i.summary, i.author,
  804. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  805. FROM rss_items i
  806. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  807. ORDER BY i.published_at DESC
  808. """)
  809. rows = cursor.fetchall()
  810. if not rows:
  811. return None
  812. items: Dict[str, List[RSSItem]] = {}
  813. id_to_name: Dict[str, str] = {}
  814. crawl_date = self._format_date_folder(date)
  815. for row in rows:
  816. feed_id = row[2]
  817. feed_name = row[3] or feed_id
  818. id_to_name[feed_id] = feed_name
  819. if feed_id not in items:
  820. items[feed_id] = []
  821. items[feed_id].append(RSSItem(
  822. title=row[1],
  823. feed_id=feed_id,
  824. feed_name=feed_name,
  825. url=row[4] or "",
  826. published_at=row[5] or "",
  827. summary=row[6] or "",
  828. author=row[7] or "",
  829. crawl_time=row[9],
  830. first_time=row[8],
  831. last_time=row[9],
  832. count=row[10],
  833. ))
  834. # 获取最新的抓取时间
  835. cursor.execute("""
  836. SELECT crawl_time FROM rss_crawl_records
  837. ORDER BY crawl_time DESC
  838. LIMIT 1
  839. """)
  840. time_row = cursor.fetchone()
  841. crawl_time = time_row[0] if time_row else self._format_time_filename()
  842. # 获取失败的源
  843. cursor.execute("""
  844. SELECT DISTINCT cs.feed_id
  845. FROM rss_crawl_status cs
  846. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  847. WHERE cs.status = 'failed'
  848. """)
  849. failed_ids = [row[0] for row in cursor.fetchall()]
  850. return RSSData(
  851. date=crawl_date,
  852. crawl_time=crawl_time,
  853. items=items,
  854. id_to_name=id_to_name,
  855. failed_ids=failed_ids,
  856. )
  857. except Exception as e:
  858. print(f"[存储] 读取 RSS 数据失败: {e}")
  859. return None
  860. def _detect_new_rss_items_impl(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  861. """
  862. 检测新增的 RSS 条目(增量模式)
  863. 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
  864. 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
  865. Args:
  866. current_data: 当前抓取的 RSS 数据
  867. Returns:
  868. 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
  869. """
  870. try:
  871. # 获取历史数据
  872. historical_data = self._get_rss_data_impl(current_data.date)
  873. if not historical_data:
  874. # 没有历史数据,所有都是新的
  875. return current_data.items.copy()
  876. # 获取当前批次时间
  877. current_time = current_data.crawl_time
  878. # 收集历史 URL(first_time < current_time 的条目)
  879. historical_urls: Dict[str, set] = {}
  880. for feed_id, rss_list in historical_data.items.items():
  881. historical_urls[feed_id] = set()
  882. for item in rss_list:
  883. first_time = item.first_time or item.crawl_time
  884. if first_time < current_time:
  885. if item.url:
  886. historical_urls[feed_id].add(item.url)
  887. # 检查是否有历史数据
  888. has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
  889. if not has_historical_data:
  890. # 第一次抓取,没有"新增"概念
  891. return {}
  892. # 检测新增
  893. new_items: Dict[str, List[RSSItem]] = {}
  894. for feed_id, rss_list in current_data.items.items():
  895. hist_set = historical_urls.get(feed_id, set())
  896. for item in rss_list:
  897. # 通过 URL 判断是否新增
  898. if item.url and item.url not in hist_set:
  899. if feed_id not in new_items:
  900. new_items[feed_id] = []
  901. new_items[feed_id].append(item)
  902. return new_items
  903. except Exception as e:
  904. print(f"[存储] 检测新 RSS 条目失败: {e}")
  905. return {}
  906. def _get_latest_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  907. """
  908. 获取最新一次抓取的 RSS 数据(当前榜单模式)
  909. Args:
  910. date: 日期字符串(YYYY-MM-DD),默认为今天
  911. Returns:
  912. 最新抓取的 RSS 数据,如果没有数据返回 None
  913. """
  914. try:
  915. conn = self._get_connection(date, db_type="rss")
  916. cursor = conn.cursor()
  917. # 获取最新的抓取时间
  918. cursor.execute("""
  919. SELECT crawl_time FROM rss_crawl_records
  920. ORDER BY crawl_time DESC
  921. LIMIT 1
  922. """)
  923. time_row = cursor.fetchone()
  924. if not time_row:
  925. return None
  926. latest_time = time_row[0]
  927. # 获取该时间的 RSS 数据
  928. cursor.execute("""
  929. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  930. i.url, i.published_at, i.summary, i.author,
  931. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  932. FROM rss_items i
  933. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  934. WHERE i.last_crawl_time = ?
  935. ORDER BY i.published_at DESC
  936. """, (latest_time,))
  937. rows = cursor.fetchall()
  938. if not rows:
  939. return None
  940. items: Dict[str, List[RSSItem]] = {}
  941. id_to_name: Dict[str, str] = {}
  942. crawl_date = self._format_date_folder(date)
  943. for row in rows:
  944. feed_id = row[2]
  945. feed_name = row[3] or feed_id
  946. id_to_name[feed_id] = feed_name
  947. if feed_id not in items:
  948. items[feed_id] = []
  949. items[feed_id].append(RSSItem(
  950. title=row[1],
  951. feed_id=feed_id,
  952. feed_name=feed_name,
  953. url=row[4] or "",
  954. published_at=row[5] or "",
  955. summary=row[6] or "",
  956. author=row[7] or "",
  957. crawl_time=row[9],
  958. first_time=row[8],
  959. last_time=row[9],
  960. count=row[10],
  961. ))
  962. # 获取失败的源(针对最新一次抓取)
  963. cursor.execute("""
  964. SELECT cs.feed_id
  965. FROM rss_crawl_status cs
  966. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  967. WHERE cr.crawl_time = ? AND cs.status = 'failed'
  968. """, (latest_time,))
  969. failed_ids = [row[0] for row in cursor.fetchall()]
  970. return RSSData(
  971. date=crawl_date,
  972. crawl_time=latest_time,
  973. items=items,
  974. id_to_name=id_to_name,
  975. failed_ids=failed_ids,
  976. )
  977. except Exception as e:
  978. print(f"[存储] 获取最新 RSS 数据失败: {e}")
  979. return None
  980. # ========================================
  981. # AI 智能筛选 - 标签管理
  982. # ========================================
  983. def _get_active_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
  984. """获取指定兴趣文件的 active 标签列表"""
  985. try:
  986. conn = self._get_connection(date)
  987. cursor = conn.cursor()
  988. cursor.execute("""
  989. SELECT id, tag, description, version, prompt_hash, priority
  990. FROM ai_filter_tags
  991. WHERE status = 'active' AND interests_file = ?
  992. ORDER BY priority ASC, id ASC
  993. """, (interests_file,))
  994. return [
  995. {
  996. "id": row[0], "tag": row[1], "description": row[2],
  997. "version": row[3], "prompt_hash": row[4], "priority": row[5],
  998. }
  999. for row in cursor.fetchall()
  1000. ]
  1001. except Exception as e:
  1002. print(f"[AI筛选] 获取标签失败: {e}")
  1003. return []
  1004. def _get_latest_prompt_hash_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> Optional[str]:
  1005. """获取指定兴趣文件最新版本标签的 prompt_hash"""
  1006. try:
  1007. conn = self._get_connection(date)
  1008. cursor = conn.cursor()
  1009. cursor.execute("""
  1010. SELECT prompt_hash FROM ai_filter_tags
  1011. WHERE status = 'active' AND interests_file = ?
  1012. ORDER BY version DESC
  1013. LIMIT 1
  1014. """, (interests_file,))
  1015. row = cursor.fetchone()
  1016. return row[0] if row else None
  1017. except Exception as e:
  1018. print(f"[AI筛选] 获取 prompt_hash 失败: {e}")
  1019. return None
  1020. def _get_latest_tag_version_impl(self, date: Optional[str] = None) -> int:
  1021. """获取最新版本号"""
  1022. try:
  1023. conn = self._get_connection(date)
  1024. cursor = conn.cursor()
  1025. cursor.execute("""
  1026. SELECT MAX(version) FROM ai_filter_tags
  1027. """)
  1028. row = cursor.fetchone()
  1029. return row[0] if row and row[0] is not None else 0
  1030. except Exception as e:
  1031. print(f"[AI筛选] 获取版本号失败: {e}")
  1032. return 0
  1033. def _deprecate_all_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  1034. """将指定兴趣文件的 active 标签和关联的分类结果标记为 deprecated"""
  1035. try:
  1036. conn = self._get_connection(date)
  1037. cursor = conn.cursor()
  1038. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1039. # 获取该兴趣文件的 active 标签 id
  1040. cursor.execute(
  1041. "SELECT id FROM ai_filter_tags WHERE status = 'active' AND interests_file = ?",
  1042. (interests_file,)
  1043. )
  1044. tag_ids = [row[0] for row in cursor.fetchall()]
  1045. if not tag_ids:
  1046. return 0
  1047. # 废弃标签
  1048. placeholders = ",".join("?" * len(tag_ids))
  1049. cursor.execute(f"""
  1050. UPDATE ai_filter_tags
  1051. SET status = 'deprecated', deprecated_at = ?
  1052. WHERE id IN ({placeholders})
  1053. """, [now_str] + tag_ids)
  1054. tag_count = cursor.rowcount
  1055. # 废弃关联的分类结果
  1056. placeholders = ",".join("?" * len(tag_ids))
  1057. cursor.execute(f"""
  1058. UPDATE ai_filter_results
  1059. SET status = 'deprecated', deprecated_at = ?
  1060. WHERE tag_id IN ({placeholders}) AND status = 'active'
  1061. """, [now_str] + tag_ids)
  1062. conn.commit()
  1063. print(f"[AI筛选] 已废弃 {tag_count} 个标签及关联分类结果")
  1064. return tag_count
  1065. except Exception as e:
  1066. print(f"[AI筛选] 废弃标签失败: {e}")
  1067. return 0
  1068. def _save_tags_impl(
  1069. self, date: Optional[str], tags: List[Dict], version: int, prompt_hash: str,
  1070. interests_file: str = "ai_interests.txt"
  1071. ) -> int:
  1072. """保存新提取的标签"""
  1073. try:
  1074. conn = self._get_connection(date)
  1075. cursor = conn.cursor()
  1076. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1077. count = 0
  1078. for idx, tag_data in enumerate(tags, start=1):
  1079. priority = tag_data.get("priority", idx)
  1080. try:
  1081. priority = int(priority)
  1082. except (TypeError, ValueError):
  1083. priority = idx
  1084. cursor.execute("""
  1085. INSERT INTO ai_filter_tags
  1086. (tag, description, priority, version, prompt_hash, interests_file, created_at)
  1087. VALUES (?, ?, ?, ?, ?, ?, ?)
  1088. """, (
  1089. tag_data["tag"],
  1090. tag_data.get("description", ""),
  1091. priority,
  1092. version,
  1093. prompt_hash,
  1094. interests_file,
  1095. now_str,
  1096. ))
  1097. count += 1
  1098. conn.commit()
  1099. return count
  1100. except Exception as e:
  1101. print(f"[AI筛选] 保存标签失败: {e}")
  1102. return 0
  1103. def _deprecate_specific_tags_impl(
  1104. self, date: Optional[str], tag_ids: List[int]
  1105. ) -> int:
  1106. """废弃指定 ID 的标签及其关联分类结果(增量更新时使用)"""
  1107. if not tag_ids:
  1108. return 0
  1109. try:
  1110. conn = self._get_connection(date)
  1111. cursor = conn.cursor()
  1112. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1113. placeholders = ",".join("?" * len(tag_ids))
  1114. cursor.execute(f"""
  1115. UPDATE ai_filter_tags
  1116. SET status = 'deprecated', deprecated_at = ?
  1117. WHERE id IN ({placeholders})
  1118. """, [now_str] + tag_ids)
  1119. tag_count = cursor.rowcount
  1120. cursor.execute(f"""
  1121. UPDATE ai_filter_results
  1122. SET status = 'deprecated', deprecated_at = ?
  1123. WHERE tag_id IN ({placeholders}) AND status = 'active'
  1124. """, [now_str] + tag_ids)
  1125. conn.commit()
  1126. return tag_count
  1127. except Exception as e:
  1128. print(f"[AI筛选] 废弃指定标签失败: {e}")
  1129. return 0
  1130. def _update_tags_hash_impl(
  1131. self, date: Optional[str], interests_file: str, new_hash: str
  1132. ) -> int:
  1133. """更新指定兴趣文件所有 active 标签的 prompt_hash(增量更新时使用)"""
  1134. try:
  1135. conn = self._get_connection(date)
  1136. cursor = conn.cursor()
  1137. cursor.execute("""
  1138. UPDATE ai_filter_tags
  1139. SET prompt_hash = ?
  1140. WHERE interests_file = ? AND status = 'active'
  1141. """, (new_hash, interests_file))
  1142. count = cursor.rowcount
  1143. conn.commit()
  1144. return count
  1145. except Exception as e:
  1146. print(f"[AI筛选] 更新标签 hash 失败: {e}")
  1147. return 0
  1148. # ========================================
  1149. # AI 智能筛选 - 分类结果管理
  1150. # ========================================
  1151. def _update_tag_descriptions_impl(
  1152. self, date: Optional[str], tag_updates: List[Dict],
  1153. interests_file: str = "ai_interests.txt"
  1154. ) -> int:
  1155. """按 tag 名匹配,更新 active 标签的 description 字段"""
  1156. try:
  1157. conn = self._get_connection(date)
  1158. cursor = conn.cursor()
  1159. count = 0
  1160. for t in tag_updates:
  1161. tag_name = t.get("tag", "")
  1162. description = t.get("description", "")
  1163. if not tag_name:
  1164. continue
  1165. cursor.execute("""
  1166. UPDATE ai_filter_tags
  1167. SET description = ?
  1168. WHERE tag = ? AND interests_file = ? AND status = 'active'
  1169. """, (description, tag_name, interests_file))
  1170. count += cursor.rowcount
  1171. conn.commit()
  1172. return count
  1173. except Exception as e:
  1174. print(f"[AI筛选] 更新标签描述失败: {e}")
  1175. return 0
  1176. def _update_tag_priorities_impl(
  1177. self, date: Optional[str], tag_priorities: List[Dict],
  1178. interests_file: str = "ai_interests.txt"
  1179. ) -> int:
  1180. """按 tag 名匹配,更新 active 标签的 priority 字段"""
  1181. try:
  1182. conn = self._get_connection(date)
  1183. cursor = conn.cursor()
  1184. count = 0
  1185. for t in tag_priorities:
  1186. tag_name = t.get("tag", "")
  1187. priority = t.get("priority")
  1188. if not tag_name:
  1189. continue
  1190. try:
  1191. priority = int(priority)
  1192. except (TypeError, ValueError):
  1193. continue
  1194. cursor.execute("""
  1195. UPDATE ai_filter_tags
  1196. SET priority = ?
  1197. WHERE tag = ? AND interests_file = ? AND status = 'active'
  1198. """, (priority, tag_name, interests_file))
  1199. count += cursor.rowcount
  1200. conn.commit()
  1201. return count
  1202. except Exception as e:
  1203. print(f"[AI筛选] 更新标签优先级失败: {e}")
  1204. return 0
  1205. # ========================================
  1206. # AI 智能筛选 - 已分析新闻追踪
  1207. # ========================================
  1208. def _save_analyzed_news_impl(
  1209. self, date: Optional[str], news_ids: List[int], source_type: str,
  1210. interests_file: str, prompt_hash: str, matched_ids: set
  1211. ) -> int:
  1212. """批量记录已分析的新闻(匹配与不匹配都记录)"""
  1213. try:
  1214. conn = self._get_connection(date)
  1215. cursor = conn.cursor()
  1216. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1217. count = 0
  1218. for nid in news_ids:
  1219. try:
  1220. cursor.execute("""
  1221. INSERT OR REPLACE INTO ai_filter_analyzed_news
  1222. (news_item_id, source_type, interests_file, prompt_hash, matched, created_at)
  1223. VALUES (?, ?, ?, ?, ?, ?)
  1224. """, (
  1225. nid, source_type, interests_file, prompt_hash,
  1226. 1 if nid in matched_ids else 0,
  1227. now_str,
  1228. ))
  1229. count += 1
  1230. except Exception:
  1231. pass
  1232. conn.commit()
  1233. return count
  1234. except Exception as e:
  1235. print(f"[AI筛选] 保存已分析记录失败: {e}")
  1236. return 0
  1237. def _get_analyzed_news_ids_impl(
  1238. self, date: Optional[str] = None, source_type: str = "hotlist",
  1239. interests_file: str = "ai_interests.txt"
  1240. ) -> set:
  1241. """获取已分析过的新闻 ID 集合(用于去重)"""
  1242. try:
  1243. conn = self._get_connection(date)
  1244. cursor = conn.cursor()
  1245. cursor.execute("""
  1246. SELECT news_item_id FROM ai_filter_analyzed_news
  1247. WHERE source_type = ? AND interests_file = ?
  1248. """, (source_type, interests_file))
  1249. return {row[0] for row in cursor.fetchall()}
  1250. except Exception as e:
  1251. print(f"[AI筛选] 获取已分析ID失败: {e}")
  1252. return set()
  1253. def _clear_analyzed_news_impl(
  1254. self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
  1255. ) -> int:
  1256. """清除指定兴趣文件的所有已分析记录(全量重分类时使用)"""
  1257. try:
  1258. conn = self._get_connection(date)
  1259. cursor = conn.cursor()
  1260. cursor.execute("""
  1261. DELETE FROM ai_filter_analyzed_news
  1262. WHERE interests_file = ?
  1263. """, (interests_file,))
  1264. count = cursor.rowcount
  1265. conn.commit()
  1266. return count
  1267. except Exception as e:
  1268. print(f"[AI筛选] 清除已分析记录失败: {e}")
  1269. return 0
  1270. def _clear_unmatched_analyzed_news_impl(
  1271. self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
  1272. ) -> int:
  1273. """清除不匹配的已分析记录,让这些新闻有机会被新标签重新分析"""
  1274. try:
  1275. conn = self._get_connection(date)
  1276. cursor = conn.cursor()
  1277. cursor.execute("""
  1278. DELETE FROM ai_filter_analyzed_news
  1279. WHERE interests_file = ? AND matched = 0
  1280. """, (interests_file,))
  1281. count = cursor.rowcount
  1282. conn.commit()
  1283. return count
  1284. except Exception as e:
  1285. print(f"[AI筛选] 清除不匹配记录失败: {e}")
  1286. return 0
  1287. # ========================================
  1288. # AI 智能筛选 - 分类结果管理(原有)
  1289. # ========================================
  1290. def _save_filter_results_impl(
  1291. self, date: Optional[str], results: List[Dict]
  1292. ) -> int:
  1293. """批量保存分类结果"""
  1294. try:
  1295. conn = self._get_connection(date)
  1296. cursor = conn.cursor()
  1297. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1298. count = 0
  1299. for r in results:
  1300. try:
  1301. cursor.execute("""
  1302. INSERT INTO ai_filter_results
  1303. (news_item_id, source_type, tag_id, relevance_score, created_at)
  1304. VALUES (?, ?, ?, ?, ?)
  1305. """, (
  1306. r["news_item_id"],
  1307. r.get("source_type", "hotlist"),
  1308. r["tag_id"],
  1309. r.get("relevance_score", 0.0),
  1310. now_str,
  1311. ))
  1312. count += 1
  1313. except sqlite3.IntegrityError:
  1314. pass # 重复记录,跳过
  1315. conn.commit()
  1316. return count
  1317. except Exception as e:
  1318. print(f"[AI筛选] 保存分类结果失败: {e}")
  1319. return 0
  1320. def _get_active_filter_results_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
  1321. """获取指定兴趣文件的 active 分类结果,JOIN news_items 获取新闻详情"""
  1322. try:
  1323. conn = self._get_connection(date)
  1324. cursor = conn.cursor()
  1325. # 热榜结果
  1326. cursor.execute("""
  1327. SELECT
  1328. r.news_item_id, r.source_type, r.tag_id, r.relevance_score,
  1329. t.tag, t.description as tag_description, t.priority,
  1330. n.title, n.platform_id as source_id, p.name as source_name,
  1331. n.url, n.mobile_url, n.rank,
  1332. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  1333. FROM ai_filter_results r
  1334. JOIN ai_filter_tags t ON r.tag_id = t.id
  1335. JOIN news_items n ON r.news_item_id = n.id
  1336. LEFT JOIN platforms p ON n.platform_id = p.id
  1337. WHERE r.status = 'active' AND r.source_type = 'hotlist'
  1338. AND t.status = 'active' AND t.interests_file = ?
  1339. ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
  1340. """, (interests_file,))
  1341. results = []
  1342. hotlist_news_ids = []
  1343. for row in cursor.fetchall():
  1344. results.append({
  1345. "news_item_id": row[0], "source_type": row[1],
  1346. "tag_id": row[2], "relevance_score": row[3],
  1347. "tag": row[4], "tag_description": row[5], "tag_priority": row[6],
  1348. "title": row[7], "source_id": row[8],
  1349. "source_name": row[9] or row[8],
  1350. "url": row[10] or "", "mobile_url": row[11] or "",
  1351. "rank": row[12],
  1352. "first_time": row[13], "last_time": row[14],
  1353. "count": row[15],
  1354. })
  1355. hotlist_news_ids.append(row[0])
  1356. # 批量查排名历史(热榜)
  1357. ranks_map: Dict[int, List[int]] = {}
  1358. if hotlist_news_ids:
  1359. unique_ids = list(set(hotlist_news_ids))
  1360. placeholders = ",".join("?" * len(unique_ids))
  1361. cursor.execute(f"""
  1362. SELECT news_item_id, rank FROM rank_history
  1363. WHERE news_item_id IN ({placeholders}) AND rank != 0
  1364. """, unique_ids)
  1365. for rh_row in cursor.fetchall():
  1366. nid, rank = rh_row[0], rh_row[1]
  1367. if nid not in ranks_map:
  1368. ranks_map[nid] = []
  1369. if rank not in ranks_map[nid]:
  1370. ranks_map[nid].append(rank)
  1371. for item in results:
  1372. item["ranks"] = ranks_map.get(item["news_item_id"], [item["rank"]])
  1373. # RSS 结果(如果有 rss 库)
  1374. try:
  1375. rss_conn = self._get_connection(date, db_type="rss")
  1376. rss_cursor = rss_conn.cursor()
  1377. # 从 news 库获取 rss 类型的分类结果 ID
  1378. cursor.execute("""
  1379. SELECT r.news_item_id, r.tag_id, r.relevance_score,
  1380. t.tag, t.description, t.priority
  1381. FROM ai_filter_results r
  1382. JOIN ai_filter_tags t ON r.tag_id = t.id
  1383. WHERE r.status = 'active' AND r.source_type = 'rss'
  1384. AND t.status = 'active' AND t.interests_file = ?
  1385. ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
  1386. """, (interests_file,))
  1387. rss_filter_rows = cursor.fetchall()
  1388. if rss_filter_rows:
  1389. rss_ids = [row[0] for row in rss_filter_rows]
  1390. placeholders = ",".join("?" * len(rss_ids))
  1391. rss_cursor.execute(f"""
  1392. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  1393. i.url, i.published_at
  1394. FROM rss_items i
  1395. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1396. WHERE i.id IN ({placeholders})
  1397. """, rss_ids)
  1398. rss_info = {row[0]: row for row in rss_cursor.fetchall()}
  1399. for fr_row in rss_filter_rows:
  1400. rss_id = fr_row[0]
  1401. info = rss_info.get(rss_id)
  1402. if info:
  1403. results.append({
  1404. "news_item_id": rss_id,
  1405. "source_type": "rss",
  1406. "tag_id": fr_row[1],
  1407. "relevance_score": fr_row[2],
  1408. "tag": fr_row[3],
  1409. "tag_description": fr_row[4],
  1410. "tag_priority": fr_row[5],
  1411. "title": info[1],
  1412. "source_id": info[2],
  1413. "source_name": info[3] or info[2],
  1414. "url": info[4] or "",
  1415. "mobile_url": "",
  1416. "rank": 0,
  1417. "ranks": [],
  1418. "first_time": info[5] or "",
  1419. "last_time": info[5] or "",
  1420. "count": 1,
  1421. })
  1422. except Exception:
  1423. pass # RSS 库不存在时静默跳过
  1424. return results
  1425. except Exception as e:
  1426. print(f"[AI筛选] 获取分类结果失败: {e}")
  1427. return []
  1428. def _get_all_news_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
  1429. """获取当日所有新闻的 id 和标题(用于 AI 筛选分类)"""
  1430. try:
  1431. conn = self._get_connection(date)
  1432. cursor = conn.cursor()
  1433. cursor.execute("""
  1434. SELECT n.id, n.title, n.platform_id, p.name as platform_name
  1435. FROM news_items n
  1436. LEFT JOIN platforms p ON n.platform_id = p.id
  1437. ORDER BY n.id
  1438. """)
  1439. return [
  1440. {
  1441. "id": row[0], "title": row[1],
  1442. "source_id": row[2], "source_name": row[3] or row[2],
  1443. }
  1444. for row in cursor.fetchall()
  1445. ]
  1446. except Exception as e:
  1447. print(f"[AI筛选] 获取新闻列表失败: {e}")
  1448. return []
  1449. def _get_all_rss_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
  1450. """获取当日所有 RSS 条目的 id 和标题(用于 AI 筛选分类)"""
  1451. try:
  1452. conn = self._get_connection(date, db_type="rss")
  1453. cursor = conn.cursor()
  1454. cursor.execute("""
  1455. SELECT i.id, i.title, i.feed_id, f.name as feed_name, i.published_at
  1456. FROM rss_items i
  1457. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1458. ORDER BY i.id
  1459. """)
  1460. return [
  1461. {
  1462. "id": row[0], "title": row[1],
  1463. "source_id": row[2], "source_name": row[3] or row[2],
  1464. "published_at": row[4] or "",
  1465. }
  1466. for row in cursor.fetchall()
  1467. ]
  1468. except Exception as e:
  1469. print(f"[AI筛选] 获取 RSS 列表失败: {e}")
  1470. return []