sqlite_mixin.py 67 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719
  1. # coding=utf-8
  2. """
  3. SQLite 存储 Mixin
  4. 提供共用的 SQLite 数据库操作逻辑,供 LocalStorageBackend 和 RemoteStorageBackend 复用。
  5. """
  6. import sqlite3
  7. from abc import abstractmethod
  8. from datetime import datetime
  9. from pathlib import Path
  10. from typing import Any, Dict, List, Optional
  11. from trendradar.storage.base import NewsItem, NewsData, RSSItem, RSSData
  12. from trendradar.utils.url import normalize_url
  13. class SQLiteStorageMixin:
  14. """
  15. SQLite 存储操作 Mixin
  16. 子类需要实现以下抽象方法:
  17. - _get_connection(date, db_type) -> sqlite3.Connection
  18. - _get_configured_time() -> datetime
  19. - _format_date_folder(date) -> str
  20. - _format_time_filename() -> str
  21. """
  22. # ========================================
  23. # 抽象方法 - 子类必须实现
  24. # ========================================
  25. @abstractmethod
  26. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  27. """获取数据库连接"""
  28. pass
  29. @abstractmethod
  30. def _get_configured_time(self) -> datetime:
  31. """获取配置时区的当前时间"""
  32. pass
  33. @abstractmethod
  34. def _format_date_folder(self, date: Optional[str] = None) -> str:
  35. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  36. pass
  37. @abstractmethod
  38. def _format_time_filename(self) -> str:
  39. """格式化时间文件名 (格式: HH-MM)"""
  40. pass
  41. # ========================================
  42. # Schema 管理
  43. # ========================================
  44. def _get_schema_path(self, db_type: str = "news") -> Path:
  45. """
  46. 获取 schema.sql 文件路径
  47. Args:
  48. db_type: 数据库类型 ("news" 或 "rss")
  49. Returns:
  50. schema 文件路径
  51. """
  52. if db_type == "rss":
  53. return Path(__file__).parent / "rss_schema.sql"
  54. return Path(__file__).parent / "schema.sql"
  55. def _get_ai_filter_schema_path(self) -> Path:
  56. """获取 AI 筛选 schema 文件路径"""
  57. return Path(__file__).parent / "ai_filter_schema.sql"
  58. def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
  59. """
  60. 从 schema.sql 初始化数据库表结构
  61. Args:
  62. conn: 数据库连接
  63. db_type: 数据库类型 ("news" 或 "rss")
  64. """
  65. schema_path = self._get_schema_path(db_type)
  66. if schema_path.exists():
  67. with open(schema_path, "r", encoding="utf-8") as f:
  68. schema_sql = f.read()
  69. conn.executescript(schema_sql)
  70. else:
  71. raise FileNotFoundError(f"Schema file not found: {schema_path}")
  72. # news 库额外加载 AI 筛选表结构
  73. if db_type == "news":
  74. ai_filter_schema = self._get_ai_filter_schema_path()
  75. if ai_filter_schema.exists():
  76. with open(ai_filter_schema, "r", encoding="utf-8") as f:
  77. conn.executescript(f.read())
  78. conn.commit()
  79. # ========================================
  80. # 新闻数据存储
  81. # ========================================
  82. def _save_news_data_impl(self, data: NewsData, log_prefix: str = "[存储]") -> tuple[bool, int, int, int, int]:
  83. """
  84. 保存新闻数据到 SQLite(核心实现)
  85. Args:
  86. data: 新闻数据
  87. log_prefix: 日志前缀
  88. Returns:
  89. (success, new_count, updated_count, title_changed_count, off_list_count)
  90. """
  91. try:
  92. conn = self._get_connection(data.date)
  93. cursor = conn.cursor()
  94. # 获取配置时区的当前时间
  95. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  96. # 首先同步平台信息到 platforms 表
  97. for source_id, source_name in data.id_to_name.items():
  98. cursor.execute("""
  99. INSERT INTO platforms (id, name, updated_at)
  100. VALUES (?, ?, ?)
  101. ON CONFLICT(id) DO UPDATE SET
  102. name = excluded.name,
  103. updated_at = excluded.updated_at
  104. """, (source_id, source_name, now_str))
  105. # 统计计数器
  106. new_count = 0
  107. updated_count = 0
  108. title_changed_count = 0
  109. success_sources = []
  110. for source_id, news_list in data.items.items():
  111. success_sources.append(source_id)
  112. for item in news_list:
  113. try:
  114. # 标准化 URL(去除动态参数,如微博的 band_rank)
  115. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  116. # 检查是否已存在(通过标准化 URL + platform_id)
  117. if normalized_url:
  118. cursor.execute("""
  119. SELECT id, title FROM news_items
  120. WHERE url = ? AND platform_id = ?
  121. """, (normalized_url, source_id))
  122. existing = cursor.fetchone()
  123. if existing:
  124. # 已存在,更新记录
  125. existing_id, existing_title = existing
  126. # 检查标题是否变化
  127. if existing_title != item.title:
  128. # 记录标题变更
  129. cursor.execute("""
  130. INSERT INTO title_changes
  131. (news_item_id, old_title, new_title, changed_at)
  132. VALUES (?, ?, ?, ?)
  133. """, (existing_id, existing_title, item.title, now_str))
  134. title_changed_count += 1
  135. # 记录排名历史
  136. cursor.execute("""
  137. INSERT INTO rank_history
  138. (news_item_id, rank, crawl_time, created_at)
  139. VALUES (?, ?, ?, ?)
  140. """, (existing_id, item.rank, data.crawl_time, now_str))
  141. # 更新现有记录
  142. cursor.execute("""
  143. UPDATE news_items SET
  144. title = ?,
  145. rank = ?,
  146. mobile_url = ?,
  147. last_crawl_time = ?,
  148. crawl_count = crawl_count + 1,
  149. updated_at = ?
  150. WHERE id = ?
  151. """, (item.title, item.rank, item.mobile_url,
  152. data.crawl_time, now_str, existing_id))
  153. updated_count += 1
  154. else:
  155. # 不存在,插入新记录(存储标准化后的 URL)
  156. cursor.execute("""
  157. INSERT INTO news_items
  158. (title, platform_id, rank, url, mobile_url,
  159. first_crawl_time, last_crawl_time, crawl_count,
  160. created_at, updated_at)
  161. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  162. """, (item.title, source_id, item.rank, normalized_url,
  163. item.mobile_url, data.crawl_time, data.crawl_time,
  164. now_str, now_str))
  165. new_id = cursor.lastrowid
  166. # 记录初始排名
  167. cursor.execute("""
  168. INSERT INTO rank_history
  169. (news_item_id, rank, crawl_time, created_at)
  170. VALUES (?, ?, ?, ?)
  171. """, (new_id, item.rank, data.crawl_time, now_str))
  172. new_count += 1
  173. else:
  174. # URL 为空的情况,直接插入(不做去重)
  175. cursor.execute("""
  176. INSERT INTO news_items
  177. (title, platform_id, rank, url, mobile_url,
  178. first_crawl_time, last_crawl_time, crawl_count,
  179. created_at, updated_at)
  180. VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  181. """, (item.title, source_id, item.rank, "",
  182. item.mobile_url, data.crawl_time, data.crawl_time,
  183. now_str, now_str))
  184. new_id = cursor.lastrowid
  185. # 记录初始排名
  186. cursor.execute("""
  187. INSERT INTO rank_history
  188. (news_item_id, rank, crawl_time, created_at)
  189. VALUES (?, ?, ?, ?)
  190. """, (new_id, item.rank, data.crawl_time, now_str))
  191. new_count += 1
  192. except sqlite3.Error as e:
  193. print(f"{log_prefix} 保存新闻条目失败 [{item.title[:30]}...]: {e}")
  194. total_items = new_count + updated_count
  195. # ========================================
  196. # 脱榜检测:检测上次在榜但这次不在榜的新闻
  197. # ========================================
  198. off_list_count = 0
  199. # 获取上一次抓取时间
  200. cursor.execute("""
  201. SELECT crawl_time FROM crawl_records
  202. WHERE crawl_time < ?
  203. ORDER BY crawl_time DESC
  204. LIMIT 1
  205. """, (data.crawl_time,))
  206. prev_record = cursor.fetchone()
  207. if prev_record:
  208. prev_crawl_time = prev_record[0]
  209. # 对于每个成功抓取的平台,检测脱榜
  210. for source_id in success_sources:
  211. # 获取当前抓取中该平台的所有标准化 URL
  212. current_urls = set()
  213. for item in data.items.get(source_id, []):
  214. normalized_url = normalize_url(item.url, source_id) if item.url else ""
  215. if normalized_url:
  216. current_urls.add(normalized_url)
  217. # 查询上次在榜(last_crawl_time = prev_crawl_time)但这次不在榜的新闻
  218. # 这些新闻是"第一次脱榜",需要记录
  219. cursor.execute("""
  220. SELECT id, url FROM news_items
  221. WHERE platform_id = ?
  222. AND last_crawl_time = ?
  223. AND url != ''
  224. """, (source_id, prev_crawl_time))
  225. for row in cursor.fetchall():
  226. news_id, url = row[0], row[1]
  227. if url not in current_urls:
  228. # 插入脱榜记录(rank=0 表示脱榜)
  229. cursor.execute("""
  230. INSERT INTO rank_history
  231. (news_item_id, rank, crawl_time, created_at)
  232. VALUES (?, 0, ?, ?)
  233. """, (news_id, data.crawl_time, now_str))
  234. off_list_count += 1
  235. # 记录抓取信息
  236. cursor.execute("""
  237. INSERT OR REPLACE INTO crawl_records
  238. (crawl_time, total_items, created_at)
  239. VALUES (?, ?, ?)
  240. """, (data.crawl_time, total_items, now_str))
  241. # 获取刚插入的 crawl_record 的 ID
  242. cursor.execute("""
  243. SELECT id FROM crawl_records WHERE crawl_time = ?
  244. """, (data.crawl_time,))
  245. record_row = cursor.fetchone()
  246. if record_row:
  247. crawl_record_id = record_row[0]
  248. # 记录成功的来源
  249. for source_id in success_sources:
  250. cursor.execute("""
  251. INSERT OR REPLACE INTO crawl_source_status
  252. (crawl_record_id, platform_id, status)
  253. VALUES (?, ?, 'success')
  254. """, (crawl_record_id, source_id))
  255. # 记录失败的来源
  256. for failed_id in data.failed_ids:
  257. # 确保失败的平台也在 platforms 表中
  258. cursor.execute("""
  259. INSERT OR IGNORE INTO platforms (id, name, updated_at)
  260. VALUES (?, ?, ?)
  261. """, (failed_id, failed_id, now_str))
  262. cursor.execute("""
  263. INSERT OR REPLACE INTO crawl_source_status
  264. (crawl_record_id, platform_id, status)
  265. VALUES (?, ?, 'failed')
  266. """, (crawl_record_id, failed_id))
  267. conn.commit()
  268. return True, new_count, updated_count, title_changed_count, off_list_count
  269. except Exception as e:
  270. print(f"{log_prefix} 保存失败: {e}")
  271. return False, 0, 0, 0, 0
  272. def _get_today_all_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  273. """
  274. 获取指定日期的所有新闻数据(合并后)
  275. Args:
  276. date: 日期字符串,默认为今天
  277. Returns:
  278. 合并后的新闻数据
  279. """
  280. try:
  281. conn = self._get_connection(date)
  282. cursor = conn.cursor()
  283. # 获取所有新闻数据(包含 id 用于查询排名历史)
  284. cursor.execute("""
  285. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  286. n.rank, n.url, n.mobile_url,
  287. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  288. FROM news_items n
  289. LEFT JOIN platforms p ON n.platform_id = p.id
  290. ORDER BY n.platform_id, n.last_crawl_time
  291. """)
  292. rows = cursor.fetchall()
  293. if not rows:
  294. return None
  295. # 收集所有 news_item_id
  296. news_ids = [row[0] for row in rows]
  297. # 批量查询排名历史(同时获取时间和排名)
  298. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  299. # 这样可以避免显示新闻永久脱榜后的无意义记录
  300. rank_history_map: Dict[int, List[int]] = {}
  301. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  302. if news_ids:
  303. placeholders = ",".join("?" * len(news_ids))
  304. cursor.execute(f"""
  305. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  306. FROM rank_history rh
  307. JOIN news_items ni ON rh.news_item_id = ni.id
  308. WHERE rh.news_item_id IN ({placeholders})
  309. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  310. ORDER BY rh.news_item_id, rh.crawl_time
  311. """, news_ids)
  312. for rh_row in cursor.fetchall():
  313. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  314. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  315. if news_id not in rank_history_map:
  316. rank_history_map[news_id] = []
  317. if rank != 0 and rank not in rank_history_map[news_id]:
  318. rank_history_map[news_id].append(rank)
  319. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  320. if news_id not in rank_timeline_map:
  321. rank_timeline_map[news_id] = []
  322. # 提取时间部分(HH:MM)
  323. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  324. rank_timeline_map[news_id].append({
  325. "time": time_part,
  326. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  327. })
  328. # 按 platform_id 分组
  329. items: Dict[str, List[NewsItem]] = {}
  330. id_to_name: Dict[str, str] = {}
  331. crawl_date = self._format_date_folder(date)
  332. for row in rows:
  333. news_id = row[0]
  334. platform_id = row[2]
  335. title = row[1]
  336. platform_name = row[3] or platform_id
  337. id_to_name[platform_id] = platform_name
  338. if platform_id not in items:
  339. items[platform_id] = []
  340. # 获取排名历史,如果没有则使用当前排名
  341. ranks = rank_history_map.get(news_id, [row[4]])
  342. rank_timeline = rank_timeline_map.get(news_id, [])
  343. items[platform_id].append(NewsItem(
  344. title=title,
  345. source_id=platform_id,
  346. source_name=platform_name,
  347. rank=row[4],
  348. url=row[5] or "",
  349. mobile_url=row[6] or "",
  350. crawl_time=row[8], # last_crawl_time
  351. ranks=ranks,
  352. first_time=row[7], # first_crawl_time
  353. last_time=row[8], # last_crawl_time
  354. count=row[9], # crawl_count
  355. rank_timeline=rank_timeline,
  356. ))
  357. final_items = items
  358. # 获取失败的来源
  359. cursor.execute("""
  360. SELECT DISTINCT css.platform_id
  361. FROM crawl_source_status css
  362. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  363. WHERE css.status = 'failed'
  364. """)
  365. failed_ids = [row[0] for row in cursor.fetchall()]
  366. # 获取最新的抓取时间
  367. cursor.execute("""
  368. SELECT crawl_time FROM crawl_records
  369. ORDER BY crawl_time DESC
  370. LIMIT 1
  371. """)
  372. time_row = cursor.fetchone()
  373. crawl_time = time_row[0] if time_row else self._format_time_filename()
  374. return NewsData(
  375. date=crawl_date,
  376. crawl_time=crawl_time,
  377. items=final_items,
  378. id_to_name=id_to_name,
  379. failed_ids=failed_ids,
  380. )
  381. except Exception as e:
  382. print(f"[存储] 读取数据失败: {e}")
  383. return None
  384. def _get_latest_crawl_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
  385. """
  386. 获取最新一次抓取的数据
  387. Args:
  388. date: 日期字符串,默认为今天
  389. Returns:
  390. 最新抓取的新闻数据
  391. """
  392. try:
  393. conn = self._get_connection(date)
  394. cursor = conn.cursor()
  395. # 获取最新的抓取时间
  396. cursor.execute("""
  397. SELECT crawl_time FROM crawl_records
  398. ORDER BY crawl_time DESC
  399. LIMIT 1
  400. """)
  401. time_row = cursor.fetchone()
  402. if not time_row:
  403. return None
  404. latest_time = time_row[0]
  405. # 获取该时间的新闻数据(包含 id 用于查询排名历史)
  406. cursor.execute("""
  407. SELECT n.id, n.title, n.platform_id, p.name as platform_name,
  408. n.rank, n.url, n.mobile_url,
  409. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  410. FROM news_items n
  411. LEFT JOIN platforms p ON n.platform_id = p.id
  412. WHERE n.last_crawl_time = ?
  413. """, (latest_time,))
  414. rows = cursor.fetchall()
  415. if not rows:
  416. return None
  417. # 收集所有 news_item_id
  418. news_ids = [row[0] for row in rows]
  419. # 批量查询排名历史(同时获取时间和排名)
  420. # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
  421. # 这样可以避免显示新闻永久脱榜后的无意义记录
  422. rank_history_map: Dict[int, List[int]] = {}
  423. rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
  424. if news_ids:
  425. placeholders = ",".join("?" * len(news_ids))
  426. cursor.execute(f"""
  427. SELECT rh.news_item_id, rh.rank, rh.crawl_time
  428. FROM rank_history rh
  429. JOIN news_items ni ON rh.news_item_id = ni.id
  430. WHERE rh.news_item_id IN ({placeholders})
  431. AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
  432. ORDER BY rh.news_item_id, rh.crawl_time
  433. """, news_ids)
  434. for rh_row in cursor.fetchall():
  435. news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
  436. # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
  437. if news_id not in rank_history_map:
  438. rank_history_map[news_id] = []
  439. if rank != 0 and rank not in rank_history_map[news_id]:
  440. rank_history_map[news_id].append(rank)
  441. # 构建 rank_timeline 列表(完整时间线,包含脱榜)
  442. if news_id not in rank_timeline_map:
  443. rank_timeline_map[news_id] = []
  444. # 提取时间部分(HH:MM)
  445. time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
  446. rank_timeline_map[news_id].append({
  447. "time": time_part,
  448. "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
  449. })
  450. items: Dict[str, List[NewsItem]] = {}
  451. id_to_name: Dict[str, str] = {}
  452. crawl_date = self._format_date_folder(date)
  453. for row in rows:
  454. news_id = row[0]
  455. platform_id = row[2]
  456. platform_name = row[3] or platform_id
  457. id_to_name[platform_id] = platform_name
  458. if platform_id not in items:
  459. items[platform_id] = []
  460. # 获取排名历史,如果没有则使用当前排名
  461. ranks = rank_history_map.get(news_id, [row[4]])
  462. rank_timeline = rank_timeline_map.get(news_id, [])
  463. items[platform_id].append(NewsItem(
  464. title=row[1],
  465. source_id=platform_id,
  466. source_name=platform_name,
  467. rank=row[4],
  468. url=row[5] or "",
  469. mobile_url=row[6] or "",
  470. crawl_time=row[8], # last_crawl_time
  471. ranks=ranks,
  472. first_time=row[7], # first_crawl_time
  473. last_time=row[8], # last_crawl_time
  474. count=row[9], # crawl_count
  475. rank_timeline=rank_timeline,
  476. ))
  477. # 获取失败的来源(针对最新一次抓取)
  478. cursor.execute("""
  479. SELECT css.platform_id
  480. FROM crawl_source_status css
  481. JOIN crawl_records cr ON css.crawl_record_id = cr.id
  482. WHERE cr.crawl_time = ? AND css.status = 'failed'
  483. """, (latest_time,))
  484. failed_ids = [row[0] for row in cursor.fetchall()]
  485. return NewsData(
  486. date=crawl_date,
  487. crawl_time=latest_time,
  488. items=items,
  489. id_to_name=id_to_name,
  490. failed_ids=failed_ids,
  491. )
  492. except Exception as e:
  493. print(f"[存储] 获取最新数据失败: {e}")
  494. return None
  495. def _detect_new_titles_impl(self, current_data: NewsData) -> Dict[str, Dict]:
  496. """
  497. 检测新增的标题
  498. 该方法比较当前抓取数据与历史数据,找出新增的标题。
  499. 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
  500. Args:
  501. current_data: 当前抓取的数据
  502. Returns:
  503. 新增的标题数据 {source_id: {title: NewsItem}}
  504. """
  505. try:
  506. # 获取历史数据
  507. historical_data = self._get_today_all_data_impl(current_data.date)
  508. if not historical_data:
  509. # 没有历史数据,所有都是新的
  510. new_titles = {}
  511. for source_id, news_list in current_data.items.items():
  512. new_titles[source_id] = {item.title: item for item in news_list}
  513. return new_titles
  514. # 获取当前批次时间
  515. current_time = current_data.crawl_time
  516. # 收集历史标题(first_time < current_time 的标题)
  517. # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
  518. historical_titles: Dict[str, set] = {}
  519. for source_id, news_list in historical_data.items.items():
  520. historical_titles[source_id] = set()
  521. for item in news_list:
  522. first_time = item.first_time or item.crawl_time
  523. if first_time < current_time:
  524. historical_titles[source_id].add(item.title)
  525. # 检查是否有历史数据
  526. has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
  527. if not has_historical_data:
  528. # 第一次抓取,没有"新增"概念
  529. return {}
  530. # 检测新增
  531. new_titles = {}
  532. for source_id, news_list in current_data.items.items():
  533. hist_set = historical_titles.get(source_id, set())
  534. for item in news_list:
  535. if item.title not in hist_set:
  536. if source_id not in new_titles:
  537. new_titles[source_id] = {}
  538. new_titles[source_id][item.title] = item
  539. return new_titles
  540. except Exception as e:
  541. print(f"[存储] 检测新标题失败: {e}")
  542. return {}
  543. def _is_first_crawl_today_impl(self, date: Optional[str] = None) -> bool:
  544. """
  545. 检查是否是当天第一次抓取
  546. Args:
  547. date: 日期字符串,默认为今天
  548. Returns:
  549. 是否是第一次抓取
  550. """
  551. try:
  552. conn = self._get_connection(date)
  553. cursor = conn.cursor()
  554. cursor.execute("""
  555. SELECT COUNT(*) as count FROM crawl_records
  556. """)
  557. row = cursor.fetchone()
  558. count = row[0] if row else 0
  559. # 如果只有一条或没有记录,视为第一次抓取
  560. return count <= 1
  561. except Exception as e:
  562. print(f"[存储] 检查首次抓取失败: {e}")
  563. return True
  564. def _get_crawl_times_impl(self, date: Optional[str] = None) -> List[str]:
  565. """
  566. 获取指定日期的所有抓取时间列表
  567. Args:
  568. date: 日期字符串,默认为今天
  569. Returns:
  570. 抓取时间列表(按时间排序)
  571. """
  572. try:
  573. conn = self._get_connection(date)
  574. cursor = conn.cursor()
  575. cursor.execute("""
  576. SELECT crawl_time FROM crawl_records
  577. ORDER BY crawl_time
  578. """)
  579. rows = cursor.fetchall()
  580. return [row[0] for row in rows]
  581. except Exception as e:
  582. print(f"[存储] 获取抓取时间列表失败: {e}")
  583. return []
  584. # ========================================
  585. # 时间段执行记录(调度系统)
  586. # ========================================
  587. def _has_period_executed_impl(self, date_str: str, period_key: str, action: str) -> bool:
  588. """
  589. 检查指定时间段的某个 action 今天是否已执行
  590. Args:
  591. date_str: 日期字符串 YYYY-MM-DD
  592. period_key: 时间段 key
  593. action: 动作类型 (analyze / push)
  594. Returns:
  595. 是否已执行
  596. """
  597. try:
  598. conn = self._get_connection(date_str)
  599. cursor = conn.cursor()
  600. # 先检查表是否存在
  601. cursor.execute("""
  602. SELECT name FROM sqlite_master
  603. WHERE type='table' AND name='period_executions'
  604. """)
  605. if not cursor.fetchone():
  606. return False
  607. cursor.execute("""
  608. SELECT 1 FROM period_executions
  609. WHERE execution_date = ? AND period_key = ? AND action = ?
  610. """, (date_str, period_key, action))
  611. return cursor.fetchone() is not None
  612. except Exception as e:
  613. print(f"[存储] 检查时间段执行记录失败: {e}")
  614. return False
  615. def _record_period_execution_impl(self, date_str: str, period_key: str, action: str) -> bool:
  616. """
  617. 记录时间段的 action 执行
  618. Args:
  619. date_str: 日期字符串 YYYY-MM-DD
  620. period_key: 时间段 key
  621. action: 动作类型 (analyze / push)
  622. Returns:
  623. 是否记录成功
  624. """
  625. try:
  626. conn = self._get_connection(date_str)
  627. cursor = conn.cursor()
  628. # 确保表存在
  629. cursor.execute("""
  630. CREATE TABLE IF NOT EXISTS period_executions (
  631. id INTEGER PRIMARY KEY AUTOINCREMENT,
  632. execution_date TEXT NOT NULL,
  633. period_key TEXT NOT NULL,
  634. action TEXT NOT NULL,
  635. executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  636. UNIQUE(execution_date, period_key, action)
  637. )
  638. """)
  639. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  640. cursor.execute("""
  641. INSERT OR IGNORE INTO period_executions (execution_date, period_key, action, executed_at)
  642. VALUES (?, ?, ?, ?)
  643. """, (date_str, period_key, action, now_str))
  644. conn.commit()
  645. return True
  646. except Exception as e:
  647. print(f"[存储] 记录时间段执行失败: {e}")
  648. return False
  649. # ========================================
  650. # RSS 数据存储
  651. # ========================================
  652. def _save_rss_data_impl(self, data: RSSData, log_prefix: str = "[存储]") -> tuple[bool, int, int]:
  653. """
  654. 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
  655. Args:
  656. data: RSS 数据
  657. log_prefix: 日志前缀
  658. Returns:
  659. (success, new_count, updated_count)
  660. """
  661. try:
  662. conn = self._get_connection(data.date, db_type="rss")
  663. cursor = conn.cursor()
  664. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  665. # 同步 RSS 源信息到 rss_feeds 表
  666. for feed_id, feed_name in data.id_to_name.items():
  667. cursor.execute("""
  668. INSERT INTO rss_feeds (id, name, updated_at)
  669. VALUES (?, ?, ?)
  670. ON CONFLICT(id) DO UPDATE SET
  671. name = excluded.name,
  672. updated_at = excluded.updated_at
  673. """, (feed_id, feed_name, now_str))
  674. # 统计计数器
  675. new_count = 0
  676. updated_count = 0
  677. for feed_id, rss_list in data.items.items():
  678. for item in rss_list:
  679. try:
  680. # 检查是否已存在(通过 URL + feed_id)
  681. if item.url:
  682. cursor.execute("""
  683. SELECT id, title FROM rss_items
  684. WHERE url = ? AND feed_id = ?
  685. """, (item.url, feed_id))
  686. existing = cursor.fetchone()
  687. if existing:
  688. # 已存在,更新记录
  689. existing_id = existing[0]
  690. cursor.execute("""
  691. UPDATE rss_items SET
  692. title = ?,
  693. published_at = ?,
  694. summary = ?,
  695. author = ?,
  696. last_crawl_time = ?,
  697. crawl_count = crawl_count + 1,
  698. updated_at = ?
  699. WHERE id = ?
  700. """, (item.title, item.published_at, item.summary,
  701. item.author, data.crawl_time, now_str, existing_id))
  702. updated_count += 1
  703. else:
  704. # 不存在,插入新记录(使用 ON CONFLICT 兜底处理并发/竞争场景)
  705. cursor.execute("""
  706. INSERT INTO rss_items
  707. (title, feed_id, url, published_at, summary, author,
  708. first_crawl_time, last_crawl_time, crawl_count,
  709. created_at, updated_at)
  710. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  711. ON CONFLICT(url, feed_id) DO UPDATE SET
  712. title = excluded.title,
  713. published_at = excluded.published_at,
  714. summary = excluded.summary,
  715. author = excluded.author,
  716. last_crawl_time = excluded.last_crawl_time,
  717. crawl_count = crawl_count + 1,
  718. updated_at = excluded.updated_at
  719. """, (item.title, feed_id, item.url, item.published_at,
  720. item.summary, item.author, data.crawl_time,
  721. data.crawl_time, now_str, now_str))
  722. new_count += 1
  723. else:
  724. # URL 为空,用 try-except 处理重复
  725. try:
  726. cursor.execute("""
  727. INSERT INTO rss_items
  728. (title, feed_id, url, published_at, summary, author,
  729. first_crawl_time, last_crawl_time, crawl_count,
  730. created_at, updated_at)
  731. VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
  732. """, (item.title, feed_id, "", item.published_at,
  733. item.summary, item.author, data.crawl_time,
  734. data.crawl_time, now_str, now_str))
  735. new_count += 1
  736. except sqlite3.IntegrityError:
  737. # 重复的空 URL 条目,忽略
  738. pass
  739. except sqlite3.Error as e:
  740. print(f"{log_prefix} 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
  741. total_items = new_count + updated_count
  742. # 记录抓取信息
  743. cursor.execute("""
  744. INSERT OR REPLACE INTO rss_crawl_records
  745. (crawl_time, total_items, created_at)
  746. VALUES (?, ?, ?)
  747. """, (data.crawl_time, total_items, now_str))
  748. # 记录抓取状态
  749. cursor.execute("""
  750. SELECT id FROM rss_crawl_records WHERE crawl_time = ?
  751. """, (data.crawl_time,))
  752. record_row = cursor.fetchone()
  753. if record_row:
  754. crawl_record_id = record_row[0]
  755. # 记录成功的源
  756. for feed_id in data.items.keys():
  757. cursor.execute("""
  758. INSERT OR REPLACE INTO rss_crawl_status
  759. (crawl_record_id, feed_id, status)
  760. VALUES (?, ?, 'success')
  761. """, (crawl_record_id, feed_id))
  762. # 记录失败的源
  763. for failed_id in data.failed_ids:
  764. cursor.execute("""
  765. INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
  766. VALUES (?, ?, ?)
  767. """, (failed_id, failed_id, now_str))
  768. cursor.execute("""
  769. INSERT OR REPLACE INTO rss_crawl_status
  770. (crawl_record_id, feed_id, status)
  771. VALUES (?, ?, 'failed')
  772. """, (crawl_record_id, failed_id))
  773. conn.commit()
  774. return True, new_count, updated_count
  775. except Exception as e:
  776. print(f"{log_prefix} 保存 RSS 数据失败: {e}")
  777. return False, 0, 0
  778. def _get_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  779. """
  780. 获取指定日期的所有 RSS 数据
  781. Args:
  782. date: 日期字符串(YYYY-MM-DD),默认为今天
  783. Returns:
  784. RSSData 对象,如果没有数据返回 None
  785. """
  786. try:
  787. conn = self._get_connection(date, db_type="rss")
  788. cursor = conn.cursor()
  789. # 获取所有 RSS 数据
  790. cursor.execute("""
  791. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  792. i.url, i.published_at, i.summary, i.author,
  793. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  794. FROM rss_items i
  795. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  796. ORDER BY i.published_at DESC
  797. """)
  798. rows = cursor.fetchall()
  799. if not rows:
  800. return None
  801. items: Dict[str, List[RSSItem]] = {}
  802. id_to_name: Dict[str, str] = {}
  803. crawl_date = self._format_date_folder(date)
  804. for row in rows:
  805. feed_id = row[2]
  806. feed_name = row[3] or feed_id
  807. id_to_name[feed_id] = feed_name
  808. if feed_id not in items:
  809. items[feed_id] = []
  810. items[feed_id].append(RSSItem(
  811. title=row[1],
  812. feed_id=feed_id,
  813. feed_name=feed_name,
  814. url=row[4] or "",
  815. published_at=row[5] or "",
  816. summary=row[6] or "",
  817. author=row[7] or "",
  818. crawl_time=row[9],
  819. first_time=row[8],
  820. last_time=row[9],
  821. count=row[10],
  822. ))
  823. # 获取最新的抓取时间
  824. cursor.execute("""
  825. SELECT crawl_time FROM rss_crawl_records
  826. ORDER BY crawl_time DESC
  827. LIMIT 1
  828. """)
  829. time_row = cursor.fetchone()
  830. crawl_time = time_row[0] if time_row else self._format_time_filename()
  831. # 获取失败的源
  832. cursor.execute("""
  833. SELECT DISTINCT cs.feed_id
  834. FROM rss_crawl_status cs
  835. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  836. WHERE cs.status = 'failed'
  837. """)
  838. failed_ids = [row[0] for row in cursor.fetchall()]
  839. return RSSData(
  840. date=crawl_date,
  841. crawl_time=crawl_time,
  842. items=items,
  843. id_to_name=id_to_name,
  844. failed_ids=failed_ids,
  845. )
  846. except Exception as e:
  847. print(f"[存储] 读取 RSS 数据失败: {e}")
  848. return None
  849. def _detect_new_rss_items_impl(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  850. """
  851. 检测新增的 RSS 条目(增量模式)
  852. 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
  853. 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
  854. Args:
  855. current_data: 当前抓取的 RSS 数据
  856. Returns:
  857. 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
  858. """
  859. try:
  860. # 获取历史数据
  861. historical_data = self._get_rss_data_impl(current_data.date)
  862. if not historical_data:
  863. # 没有历史数据,所有都是新的
  864. return current_data.items.copy()
  865. # 获取当前批次时间
  866. current_time = current_data.crawl_time
  867. # 收集历史 URL(first_time < current_time 的条目)
  868. historical_urls: Dict[str, set] = {}
  869. for feed_id, rss_list in historical_data.items.items():
  870. historical_urls[feed_id] = set()
  871. for item in rss_list:
  872. first_time = item.first_time or item.crawl_time
  873. if first_time < current_time:
  874. if item.url:
  875. historical_urls[feed_id].add(item.url)
  876. # 检查是否有历史数据
  877. has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
  878. if not has_historical_data:
  879. # 第一次抓取,没有"新增"概念
  880. return {}
  881. # 检测新增
  882. new_items: Dict[str, List[RSSItem]] = {}
  883. for feed_id, rss_list in current_data.items.items():
  884. hist_set = historical_urls.get(feed_id, set())
  885. for item in rss_list:
  886. # 通过 URL 判断是否新增
  887. if item.url and item.url not in hist_set:
  888. if feed_id not in new_items:
  889. new_items[feed_id] = []
  890. new_items[feed_id].append(item)
  891. return new_items
  892. except Exception as e:
  893. print(f"[存储] 检测新 RSS 条目失败: {e}")
  894. return {}
  895. def _get_latest_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
  896. """
  897. 获取最新一次抓取的 RSS 数据(当前榜单模式)
  898. Args:
  899. date: 日期字符串(YYYY-MM-DD),默认为今天
  900. Returns:
  901. 最新抓取的 RSS 数据,如果没有数据返回 None
  902. """
  903. try:
  904. conn = self._get_connection(date, db_type="rss")
  905. cursor = conn.cursor()
  906. # 获取最新的抓取时间
  907. cursor.execute("""
  908. SELECT crawl_time FROM rss_crawl_records
  909. ORDER BY crawl_time DESC
  910. LIMIT 1
  911. """)
  912. time_row = cursor.fetchone()
  913. if not time_row:
  914. return None
  915. latest_time = time_row[0]
  916. # 获取该时间的 RSS 数据
  917. cursor.execute("""
  918. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  919. i.url, i.published_at, i.summary, i.author,
  920. i.first_crawl_time, i.last_crawl_time, i.crawl_count
  921. FROM rss_items i
  922. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  923. WHERE i.last_crawl_time = ?
  924. ORDER BY i.published_at DESC
  925. """, (latest_time,))
  926. rows = cursor.fetchall()
  927. if not rows:
  928. return None
  929. items: Dict[str, List[RSSItem]] = {}
  930. id_to_name: Dict[str, str] = {}
  931. crawl_date = self._format_date_folder(date)
  932. for row in rows:
  933. feed_id = row[2]
  934. feed_name = row[3] or feed_id
  935. id_to_name[feed_id] = feed_name
  936. if feed_id not in items:
  937. items[feed_id] = []
  938. items[feed_id].append(RSSItem(
  939. title=row[1],
  940. feed_id=feed_id,
  941. feed_name=feed_name,
  942. url=row[4] or "",
  943. published_at=row[5] or "",
  944. summary=row[6] or "",
  945. author=row[7] or "",
  946. crawl_time=row[9],
  947. first_time=row[8],
  948. last_time=row[9],
  949. count=row[10],
  950. ))
  951. # 获取失败的源(针对最新一次抓取)
  952. cursor.execute("""
  953. SELECT cs.feed_id
  954. FROM rss_crawl_status cs
  955. JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
  956. WHERE cr.crawl_time = ? AND cs.status = 'failed'
  957. """, (latest_time,))
  958. failed_ids = [row[0] for row in cursor.fetchall()]
  959. return RSSData(
  960. date=crawl_date,
  961. crawl_time=latest_time,
  962. items=items,
  963. id_to_name=id_to_name,
  964. failed_ids=failed_ids,
  965. )
  966. except Exception as e:
  967. print(f"[存储] 获取最新 RSS 数据失败: {e}")
  968. return None
  969. # ========================================
  970. # AI 智能筛选 - 标签管理
  971. # ========================================
  972. def _get_active_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
  973. """获取指定兴趣文件的 active 标签列表"""
  974. try:
  975. conn = self._get_connection(date)
  976. cursor = conn.cursor()
  977. cursor.execute("""
  978. SELECT id, tag, description, version, prompt_hash, priority
  979. FROM ai_filter_tags
  980. WHERE status = 'active' AND interests_file = ?
  981. ORDER BY priority ASC, id ASC
  982. """, (interests_file,))
  983. return [
  984. {
  985. "id": row[0], "tag": row[1], "description": row[2],
  986. "version": row[3], "prompt_hash": row[4], "priority": row[5],
  987. }
  988. for row in cursor.fetchall()
  989. ]
  990. except Exception as e:
  991. print(f"[AI筛选] 获取标签失败: {e}")
  992. return []
  993. def _get_latest_prompt_hash_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> Optional[str]:
  994. """获取指定兴趣文件最新版本标签的 prompt_hash"""
  995. try:
  996. conn = self._get_connection(date)
  997. cursor = conn.cursor()
  998. cursor.execute("""
  999. SELECT prompt_hash FROM ai_filter_tags
  1000. WHERE status = 'active' AND interests_file = ?
  1001. ORDER BY version DESC
  1002. LIMIT 1
  1003. """, (interests_file,))
  1004. row = cursor.fetchone()
  1005. return row[0] if row else None
  1006. except Exception as e:
  1007. print(f"[AI筛选] 获取 prompt_hash 失败: {e}")
  1008. return None
  1009. def _get_latest_tag_version_impl(self, date: Optional[str] = None) -> int:
  1010. """获取最新版本号"""
  1011. try:
  1012. conn = self._get_connection(date)
  1013. cursor = conn.cursor()
  1014. cursor.execute("""
  1015. SELECT MAX(version) FROM ai_filter_tags
  1016. """)
  1017. row = cursor.fetchone()
  1018. return row[0] if row and row[0] is not None else 0
  1019. except Exception as e:
  1020. print(f"[AI筛选] 获取版本号失败: {e}")
  1021. return 0
  1022. def _deprecate_all_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
  1023. """将指定兴趣文件的 active 标签和关联的分类结果标记为 deprecated"""
  1024. try:
  1025. conn = self._get_connection(date)
  1026. cursor = conn.cursor()
  1027. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1028. # 获取该兴趣文件的 active 标签 id
  1029. cursor.execute(
  1030. "SELECT id FROM ai_filter_tags WHERE status = 'active' AND interests_file = ?",
  1031. (interests_file,)
  1032. )
  1033. tag_ids = [row[0] for row in cursor.fetchall()]
  1034. if not tag_ids:
  1035. return 0
  1036. # 废弃标签
  1037. placeholders = ",".join("?" * len(tag_ids))
  1038. cursor.execute(f"""
  1039. UPDATE ai_filter_tags
  1040. SET status = 'deprecated', deprecated_at = ?
  1041. WHERE id IN ({placeholders})
  1042. """, [now_str] + tag_ids)
  1043. tag_count = cursor.rowcount
  1044. # 废弃关联的分类结果
  1045. placeholders = ",".join("?" * len(tag_ids))
  1046. cursor.execute(f"""
  1047. UPDATE ai_filter_results
  1048. SET status = 'deprecated', deprecated_at = ?
  1049. WHERE tag_id IN ({placeholders}) AND status = 'active'
  1050. """, [now_str] + tag_ids)
  1051. conn.commit()
  1052. print(f"[AI筛选] 已废弃 {tag_count} 个标签及关联分类结果")
  1053. return tag_count
  1054. except Exception as e:
  1055. print(f"[AI筛选] 废弃标签失败: {e}")
  1056. return 0
  1057. def _save_tags_impl(
  1058. self, date: Optional[str], tags: List[Dict], version: int, prompt_hash: str,
  1059. interests_file: str = "ai_interests.txt"
  1060. ) -> int:
  1061. """保存新提取的标签"""
  1062. try:
  1063. conn = self._get_connection(date)
  1064. cursor = conn.cursor()
  1065. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1066. count = 0
  1067. for idx, tag_data in enumerate(tags, start=1):
  1068. priority = tag_data.get("priority", idx)
  1069. try:
  1070. priority = int(priority)
  1071. except (TypeError, ValueError):
  1072. priority = idx
  1073. cursor.execute("""
  1074. INSERT INTO ai_filter_tags
  1075. (tag, description, priority, version, prompt_hash, interests_file, created_at)
  1076. VALUES (?, ?, ?, ?, ?, ?, ?)
  1077. """, (
  1078. tag_data["tag"],
  1079. tag_data.get("description", ""),
  1080. priority,
  1081. version,
  1082. prompt_hash,
  1083. interests_file,
  1084. now_str,
  1085. ))
  1086. count += 1
  1087. conn.commit()
  1088. return count
  1089. except Exception as e:
  1090. print(f"[AI筛选] 保存标签失败: {e}")
  1091. return 0
  1092. def _deprecate_specific_tags_impl(
  1093. self, date: Optional[str], tag_ids: List[int]
  1094. ) -> int:
  1095. """废弃指定 ID 的标签及其关联分类结果(增量更新时使用)"""
  1096. if not tag_ids:
  1097. return 0
  1098. try:
  1099. conn = self._get_connection(date)
  1100. cursor = conn.cursor()
  1101. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1102. placeholders = ",".join("?" * len(tag_ids))
  1103. cursor.execute(f"""
  1104. UPDATE ai_filter_tags
  1105. SET status = 'deprecated', deprecated_at = ?
  1106. WHERE id IN ({placeholders})
  1107. """, [now_str] + tag_ids)
  1108. tag_count = cursor.rowcount
  1109. cursor.execute(f"""
  1110. UPDATE ai_filter_results
  1111. SET status = 'deprecated', deprecated_at = ?
  1112. WHERE tag_id IN ({placeholders}) AND status = 'active'
  1113. """, [now_str] + tag_ids)
  1114. conn.commit()
  1115. return tag_count
  1116. except Exception as e:
  1117. print(f"[AI筛选] 废弃指定标签失败: {e}")
  1118. return 0
  1119. def _update_tags_hash_impl(
  1120. self, date: Optional[str], interests_file: str, new_hash: str
  1121. ) -> int:
  1122. """更新指定兴趣文件所有 active 标签的 prompt_hash(增量更新时使用)"""
  1123. try:
  1124. conn = self._get_connection(date)
  1125. cursor = conn.cursor()
  1126. cursor.execute("""
  1127. UPDATE ai_filter_tags
  1128. SET prompt_hash = ?
  1129. WHERE interests_file = ? AND status = 'active'
  1130. """, (new_hash, interests_file))
  1131. count = cursor.rowcount
  1132. conn.commit()
  1133. return count
  1134. except Exception as e:
  1135. print(f"[AI筛选] 更新标签 hash 失败: {e}")
  1136. return 0
  1137. # ========================================
  1138. # AI 智能筛选 - 分类结果管理
  1139. # ========================================
  1140. def _update_tag_descriptions_impl(
  1141. self, date: Optional[str], tag_updates: List[Dict],
  1142. interests_file: str = "ai_interests.txt"
  1143. ) -> int:
  1144. """按 tag 名匹配,更新 active 标签的 description 字段"""
  1145. try:
  1146. conn = self._get_connection(date)
  1147. cursor = conn.cursor()
  1148. count = 0
  1149. for t in tag_updates:
  1150. tag_name = t.get("tag", "")
  1151. description = t.get("description", "")
  1152. if not tag_name:
  1153. continue
  1154. cursor.execute("""
  1155. UPDATE ai_filter_tags
  1156. SET description = ?
  1157. WHERE tag = ? AND interests_file = ? AND status = 'active'
  1158. """, (description, tag_name, interests_file))
  1159. count += cursor.rowcount
  1160. conn.commit()
  1161. return count
  1162. except Exception as e:
  1163. print(f"[AI筛选] 更新标签描述失败: {e}")
  1164. return 0
  1165. def _update_tag_priorities_impl(
  1166. self, date: Optional[str], tag_priorities: List[Dict],
  1167. interests_file: str = "ai_interests.txt"
  1168. ) -> int:
  1169. """按 tag 名匹配,更新 active 标签的 priority 字段"""
  1170. try:
  1171. conn = self._get_connection(date)
  1172. cursor = conn.cursor()
  1173. count = 0
  1174. for t in tag_priorities:
  1175. tag_name = t.get("tag", "")
  1176. priority = t.get("priority")
  1177. if not tag_name:
  1178. continue
  1179. try:
  1180. priority = int(priority)
  1181. except (TypeError, ValueError):
  1182. continue
  1183. cursor.execute("""
  1184. UPDATE ai_filter_tags
  1185. SET priority = ?
  1186. WHERE tag = ? AND interests_file = ? AND status = 'active'
  1187. """, (priority, tag_name, interests_file))
  1188. count += cursor.rowcount
  1189. conn.commit()
  1190. return count
  1191. except Exception as e:
  1192. print(f"[AI筛选] 更新标签优先级失败: {e}")
  1193. return 0
  1194. # ========================================
  1195. # AI 智能筛选 - 已分析新闻追踪
  1196. # ========================================
  1197. def _save_analyzed_news_impl(
  1198. self, date: Optional[str], news_ids: List[int], source_type: str,
  1199. interests_file: str, prompt_hash: str, matched_ids: set
  1200. ) -> int:
  1201. """批量记录已分析的新闻(匹配与不匹配都记录)"""
  1202. try:
  1203. conn = self._get_connection(date)
  1204. cursor = conn.cursor()
  1205. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1206. count = 0
  1207. for nid in news_ids:
  1208. try:
  1209. cursor.execute("""
  1210. INSERT OR REPLACE INTO ai_filter_analyzed_news
  1211. (news_item_id, source_type, interests_file, prompt_hash, matched, created_at)
  1212. VALUES (?, ?, ?, ?, ?, ?)
  1213. """, (
  1214. nid, source_type, interests_file, prompt_hash,
  1215. 1 if nid in matched_ids else 0,
  1216. now_str,
  1217. ))
  1218. count += 1
  1219. except Exception:
  1220. pass
  1221. conn.commit()
  1222. return count
  1223. except Exception as e:
  1224. print(f"[AI筛选] 保存已分析记录失败: {e}")
  1225. return 0
  1226. def _get_analyzed_news_ids_impl(
  1227. self, date: Optional[str] = None, source_type: str = "hotlist",
  1228. interests_file: str = "ai_interests.txt"
  1229. ) -> set:
  1230. """获取已分析过的新闻 ID 集合(用于去重)"""
  1231. try:
  1232. conn = self._get_connection(date)
  1233. cursor = conn.cursor()
  1234. cursor.execute("""
  1235. SELECT news_item_id FROM ai_filter_analyzed_news
  1236. WHERE source_type = ? AND interests_file = ?
  1237. """, (source_type, interests_file))
  1238. return {row[0] for row in cursor.fetchall()}
  1239. except Exception as e:
  1240. print(f"[AI筛选] 获取已分析ID失败: {e}")
  1241. return set()
  1242. def _clear_analyzed_news_impl(
  1243. self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
  1244. ) -> int:
  1245. """清除指定兴趣文件的所有已分析记录(全量重分类时使用)"""
  1246. try:
  1247. conn = self._get_connection(date)
  1248. cursor = conn.cursor()
  1249. cursor.execute("""
  1250. DELETE FROM ai_filter_analyzed_news
  1251. WHERE interests_file = ?
  1252. """, (interests_file,))
  1253. count = cursor.rowcount
  1254. conn.commit()
  1255. return count
  1256. except Exception as e:
  1257. print(f"[AI筛选] 清除已分析记录失败: {e}")
  1258. return 0
  1259. def _clear_unmatched_analyzed_news_impl(
  1260. self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
  1261. ) -> int:
  1262. """清除不匹配的已分析记录,让这些新闻有机会被新标签重新分析"""
  1263. try:
  1264. conn = self._get_connection(date)
  1265. cursor = conn.cursor()
  1266. cursor.execute("""
  1267. DELETE FROM ai_filter_analyzed_news
  1268. WHERE interests_file = ? AND matched = 0
  1269. """, (interests_file,))
  1270. count = cursor.rowcount
  1271. conn.commit()
  1272. return count
  1273. except Exception as e:
  1274. print(f"[AI筛选] 清除不匹配记录失败: {e}")
  1275. return 0
  1276. # ========================================
  1277. # AI 智能筛选 - 分类结果管理(原有)
  1278. # ========================================
  1279. def _save_filter_results_impl(
  1280. self, date: Optional[str], results: List[Dict]
  1281. ) -> int:
  1282. """批量保存分类结果"""
  1283. try:
  1284. conn = self._get_connection(date)
  1285. cursor = conn.cursor()
  1286. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  1287. count = 0
  1288. for r in results:
  1289. try:
  1290. cursor.execute("""
  1291. INSERT INTO ai_filter_results
  1292. (news_item_id, source_type, tag_id, relevance_score, created_at)
  1293. VALUES (?, ?, ?, ?, ?)
  1294. """, (
  1295. r["news_item_id"],
  1296. r.get("source_type", "hotlist"),
  1297. r["tag_id"],
  1298. r.get("relevance_score", 0.0),
  1299. now_str,
  1300. ))
  1301. count += 1
  1302. except sqlite3.IntegrityError:
  1303. pass # 重复记录,跳过
  1304. conn.commit()
  1305. return count
  1306. except Exception as e:
  1307. print(f"[AI筛选] 保存分类结果失败: {e}")
  1308. return 0
  1309. def _get_active_filter_results_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
  1310. """获取指定兴趣文件的 active 分类结果,JOIN news_items 获取新闻详情"""
  1311. try:
  1312. conn = self._get_connection(date)
  1313. cursor = conn.cursor()
  1314. # 热榜结果
  1315. cursor.execute("""
  1316. SELECT
  1317. r.news_item_id, r.source_type, r.tag_id, r.relevance_score,
  1318. t.tag, t.description as tag_description, t.priority,
  1319. n.title, n.platform_id as source_id, p.name as source_name,
  1320. n.url, n.mobile_url, n.rank,
  1321. n.first_crawl_time, n.last_crawl_time, n.crawl_count
  1322. FROM ai_filter_results r
  1323. JOIN ai_filter_tags t ON r.tag_id = t.id
  1324. JOIN news_items n ON r.news_item_id = n.id
  1325. LEFT JOIN platforms p ON n.platform_id = p.id
  1326. WHERE r.status = 'active' AND r.source_type = 'hotlist'
  1327. AND t.status = 'active' AND t.interests_file = ?
  1328. ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
  1329. """, (interests_file,))
  1330. results = []
  1331. hotlist_news_ids = []
  1332. for row in cursor.fetchall():
  1333. results.append({
  1334. "news_item_id": row[0], "source_type": row[1],
  1335. "tag_id": row[2], "relevance_score": row[3],
  1336. "tag": row[4], "tag_description": row[5], "tag_priority": row[6],
  1337. "title": row[7], "source_id": row[8],
  1338. "source_name": row[9] or row[8],
  1339. "url": row[10] or "", "mobile_url": row[11] or "",
  1340. "rank": row[12],
  1341. "first_time": row[13], "last_time": row[14],
  1342. "count": row[15],
  1343. })
  1344. hotlist_news_ids.append(row[0])
  1345. # 批量查排名历史(热榜)
  1346. ranks_map: Dict[int, List[int]] = {}
  1347. if hotlist_news_ids:
  1348. unique_ids = list(set(hotlist_news_ids))
  1349. placeholders = ",".join("?" * len(unique_ids))
  1350. cursor.execute(f"""
  1351. SELECT news_item_id, rank FROM rank_history
  1352. WHERE news_item_id IN ({placeholders}) AND rank != 0
  1353. """, unique_ids)
  1354. for rh_row in cursor.fetchall():
  1355. nid, rank = rh_row[0], rh_row[1]
  1356. if nid not in ranks_map:
  1357. ranks_map[nid] = []
  1358. if rank not in ranks_map[nid]:
  1359. ranks_map[nid].append(rank)
  1360. for item in results:
  1361. item["ranks"] = ranks_map.get(item["news_item_id"], [item["rank"]])
  1362. # RSS 结果(如果有 rss 库)
  1363. try:
  1364. rss_conn = self._get_connection(date, db_type="rss")
  1365. rss_cursor = rss_conn.cursor()
  1366. # 从 news 库获取 rss 类型的分类结果 ID
  1367. cursor.execute("""
  1368. SELECT r.news_item_id, r.tag_id, r.relevance_score,
  1369. t.tag, t.description, t.priority
  1370. FROM ai_filter_results r
  1371. JOIN ai_filter_tags t ON r.tag_id = t.id
  1372. WHERE r.status = 'active' AND r.source_type = 'rss'
  1373. AND t.status = 'active' AND t.interests_file = ?
  1374. ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
  1375. """, (interests_file,))
  1376. rss_filter_rows = cursor.fetchall()
  1377. if rss_filter_rows:
  1378. rss_ids = [row[0] for row in rss_filter_rows]
  1379. placeholders = ",".join("?" * len(rss_ids))
  1380. rss_cursor.execute(f"""
  1381. SELECT i.id, i.title, i.feed_id, f.name as feed_name,
  1382. i.url, i.published_at
  1383. FROM rss_items i
  1384. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1385. WHERE i.id IN ({placeholders})
  1386. """, rss_ids)
  1387. rss_info = {row[0]: row for row in rss_cursor.fetchall()}
  1388. for fr_row in rss_filter_rows:
  1389. rss_id = fr_row[0]
  1390. info = rss_info.get(rss_id)
  1391. if info:
  1392. results.append({
  1393. "news_item_id": rss_id,
  1394. "source_type": "rss",
  1395. "tag_id": fr_row[1],
  1396. "relevance_score": fr_row[2],
  1397. "tag": fr_row[3],
  1398. "tag_description": fr_row[4],
  1399. "tag_priority": fr_row[5],
  1400. "title": info[1],
  1401. "source_id": info[2],
  1402. "source_name": info[3] or info[2],
  1403. "url": info[4] or "",
  1404. "mobile_url": "",
  1405. "rank": 0,
  1406. "ranks": [],
  1407. "first_time": info[5] or "",
  1408. "last_time": info[5] or "",
  1409. "count": 1,
  1410. })
  1411. except Exception:
  1412. pass # RSS 库不存在时静默跳过
  1413. return results
  1414. except Exception as e:
  1415. print(f"[AI筛选] 获取分类结果失败: {e}")
  1416. return []
  1417. def _get_all_news_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
  1418. """获取当日所有新闻的 id 和标题(用于 AI 筛选分类)"""
  1419. try:
  1420. conn = self._get_connection(date)
  1421. cursor = conn.cursor()
  1422. cursor.execute("""
  1423. SELECT n.id, n.title, n.platform_id, p.name as platform_name
  1424. FROM news_items n
  1425. LEFT JOIN platforms p ON n.platform_id = p.id
  1426. ORDER BY n.id
  1427. """)
  1428. return [
  1429. {
  1430. "id": row[0], "title": row[1],
  1431. "source_id": row[2], "source_name": row[3] or row[2],
  1432. }
  1433. for row in cursor.fetchall()
  1434. ]
  1435. except Exception as e:
  1436. print(f"[AI筛选] 获取新闻列表失败: {e}")
  1437. return []
  1438. def _get_all_rss_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
  1439. """获取当日所有 RSS 条目的 id 和标题(用于 AI 筛选分类)"""
  1440. try:
  1441. conn = self._get_connection(date, db_type="rss")
  1442. cursor = conn.cursor()
  1443. cursor.execute("""
  1444. SELECT i.id, i.title, i.feed_id, f.name as feed_name, i.published_at
  1445. FROM rss_items i
  1446. LEFT JOIN rss_feeds f ON i.feed_id = f.id
  1447. ORDER BY i.id
  1448. """)
  1449. return [
  1450. {
  1451. "id": row[0], "title": row[1],
  1452. "source_id": row[2], "source_name": row[3] or row[2],
  1453. "published_at": row[4] or "",
  1454. }
  1455. for row in cursor.fetchall()
  1456. ]
  1457. except Exception as e:
  1458. print(f"[AI筛选] 获取 RSS 列表失败: {e}")
  1459. return []