| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719 |
- # coding=utf-8
- """
- SQLite 存储 Mixin
- 提供共用的 SQLite 数据库操作逻辑,供 LocalStorageBackend 和 RemoteStorageBackend 复用。
- """
- import sqlite3
- from abc import abstractmethod
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from trendradar.storage.base import NewsItem, NewsData, RSSItem, RSSData
- from trendradar.utils.url import normalize_url
- class SQLiteStorageMixin:
- """
- SQLite 存储操作 Mixin
- 子类需要实现以下抽象方法:
- - _get_connection(date, db_type) -> sqlite3.Connection
- - _get_configured_time() -> datetime
- - _format_date_folder(date) -> str
- - _format_time_filename() -> str
- """
- # ========================================
- # 抽象方法 - 子类必须实现
- # ========================================
- @abstractmethod
- def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
- """获取数据库连接"""
- pass
- @abstractmethod
- def _get_configured_time(self) -> datetime:
- """获取配置时区的当前时间"""
- pass
- @abstractmethod
- def _format_date_folder(self, date: Optional[str] = None) -> str:
- """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
- pass
- @abstractmethod
- def _format_time_filename(self) -> str:
- """格式化时间文件名 (格式: HH-MM)"""
- pass
- # ========================================
- # Schema 管理
- # ========================================
- def _get_schema_path(self, db_type: str = "news") -> Path:
- """
- 获取 schema.sql 文件路径
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- schema 文件路径
- """
- if db_type == "rss":
- return Path(__file__).parent / "rss_schema.sql"
- return Path(__file__).parent / "schema.sql"
- def _get_ai_filter_schema_path(self) -> Path:
- """获取 AI 筛选 schema 文件路径"""
- return Path(__file__).parent / "ai_filter_schema.sql"
- def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
- """
- 从 schema.sql 初始化数据库表结构
- Args:
- conn: 数据库连接
- db_type: 数据库类型 ("news" 或 "rss")
- """
- schema_path = self._get_schema_path(db_type)
- if schema_path.exists():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_sql = f.read()
- conn.executescript(schema_sql)
- else:
- raise FileNotFoundError(f"Schema file not found: {schema_path}")
- # news 库额外加载 AI 筛选表结构
- if db_type == "news":
- ai_filter_schema = self._get_ai_filter_schema_path()
- if ai_filter_schema.exists():
- with open(ai_filter_schema, "r", encoding="utf-8") as f:
- conn.executescript(f.read())
- conn.commit()
- # ========================================
- # 新闻数据存储
- # ========================================
- def _save_news_data_impl(self, data: NewsData, log_prefix: str = "[存储]") -> tuple[bool, int, int, int, int]:
- """
- 保存新闻数据到 SQLite(核心实现)
- Args:
- data: 新闻数据
- log_prefix: 日志前缀
- Returns:
- (success, new_count, updated_count, title_changed_count, off_list_count)
- """
- try:
- conn = self._get_connection(data.date)
- cursor = conn.cursor()
- # 获取配置时区的当前时间
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 首先同步平台信息到 platforms 表
- for source_id, source_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (source_id, source_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- title_changed_count = 0
- success_sources = []
- for source_id, news_list in data.items.items():
- success_sources.append(source_id)
- for item in news_list:
- try:
- # 标准化 URL(去除动态参数,如微博的 band_rank)
- normalized_url = normalize_url(item.url, source_id) if item.url else ""
- # 检查是否已存在(通过标准化 URL + platform_id)
- if normalized_url:
- cursor.execute("""
- SELECT id, title FROM news_items
- WHERE url = ? AND platform_id = ?
- """, (normalized_url, source_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id, existing_title = existing
- # 检查标题是否变化
- if existing_title != item.title:
- # 记录标题变更
- cursor.execute("""
- INSERT INTO title_changes
- (news_item_id, old_title, new_title, changed_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, existing_title, item.title, now_str))
- title_changed_count += 1
- # 记录排名历史
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, item.rank, data.crawl_time, now_str))
- # 更新现有记录
- cursor.execute("""
- UPDATE news_items SET
- title = ?,
- rank = ?,
- mobile_url = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.rank, item.mobile_url,
- data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录(存储标准化后的 URL)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, normalized_url,
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- else:
- # URL 为空的情况,直接插入(不做去重)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, "",
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- except sqlite3.Error as e:
- print(f"{log_prefix} 保存新闻条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # ========================================
- # 脱榜检测:检测上次在榜但这次不在榜的新闻
- # ========================================
- off_list_count = 0
- # 获取上一次抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- WHERE crawl_time < ?
- ORDER BY crawl_time DESC
- LIMIT 1
- """, (data.crawl_time,))
- prev_record = cursor.fetchone()
- if prev_record:
- prev_crawl_time = prev_record[0]
- # 对于每个成功抓取的平台,检测脱榜
- for source_id in success_sources:
- # 获取当前抓取中该平台的所有标准化 URL
- current_urls = set()
- for item in data.items.get(source_id, []):
- normalized_url = normalize_url(item.url, source_id) if item.url else ""
- if normalized_url:
- current_urls.add(normalized_url)
- # 查询上次在榜(last_crawl_time = prev_crawl_time)但这次不在榜的新闻
- # 这些新闻是"第一次脱榜",需要记录
- cursor.execute("""
- SELECT id, url FROM news_items
- WHERE platform_id = ?
- AND last_crawl_time = ?
- AND url != ''
- """, (source_id, prev_crawl_time))
- for row in cursor.fetchall():
- news_id, url = row[0], row[1]
- if url not in current_urls:
- # 插入脱榜记录(rank=0 表示脱榜)
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, 0, ?, ?)
- """, (news_id, data.crawl_time, now_str))
- off_list_count += 1
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 获取刚插入的 crawl_record 的 ID
- cursor.execute("""
- SELECT id FROM crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的来源
- for source_id in success_sources:
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, source_id))
- # 记录失败的来源
- for failed_id in data.failed_ids:
- # 确保失败的平台也在 platforms 表中
- cursor.execute("""
- INSERT OR IGNORE INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- return True, new_count, updated_count, title_changed_count, off_list_count
- except Exception as e:
- print(f"{log_prefix} 保存失败: {e}")
- return False, 0, 0, 0, 0
- def _get_today_all_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取指定日期的所有新闻数据(合并后)
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 合并后的新闻数据
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取所有新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- ORDER BY n.platform_id, n.last_crawl_time
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史(同时获取时间和排名)
- # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
- # 这样可以避免显示新闻永久脱榜后的无意义记录
- rank_history_map: Dict[int, List[int]] = {}
- rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT rh.news_item_id, rh.rank, rh.crawl_time
- FROM rank_history rh
- JOIN news_items ni ON rh.news_item_id = ni.id
- WHERE rh.news_item_id IN ({placeholders})
- AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
- ORDER BY rh.news_item_id, rh.crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
- # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank != 0 and rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- # 构建 rank_timeline 列表(完整时间线,包含脱榜)
- if news_id not in rank_timeline_map:
- rank_timeline_map[news_id] = []
- # 提取时间部分(HH:MM)
- time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
- rank_timeline_map[news_id].append({
- "time": time_part,
- "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
- })
- # 按 platform_id 分组
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- title = row[1]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- rank_timeline = rank_timeline_map.get(news_id, [])
- items[platform_id].append(NewsItem(
- title=title,
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- rank_timeline=rank_timeline,
- ))
- final_items = items
- # 获取失败的来源
- cursor.execute("""
- SELECT DISTINCT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE css.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- return NewsData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=final_items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 读取数据失败: {e}")
- return None
- def _get_latest_crawl_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取最新一次抓取的数据
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 最新抓取的新闻数据
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- WHERE n.last_crawl_time = ?
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史(同时获取时间和排名)
- # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
- # 这样可以避免显示新闻永久脱榜后的无意义记录
- rank_history_map: Dict[int, List[int]] = {}
- rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT rh.news_item_id, rh.rank, rh.crawl_time
- FROM rank_history rh
- JOIN news_items ni ON rh.news_item_id = ni.id
- WHERE rh.news_item_id IN ({placeholders})
- AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
- ORDER BY rh.news_item_id, rh.crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
- # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank != 0 and rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- # 构建 rank_timeline 列表(完整时间线,包含脱榜)
- if news_id not in rank_timeline_map:
- rank_timeline_map[news_id] = []
- # 提取时间部分(HH:MM)
- time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
- rank_timeline_map[news_id].append({
- "time": time_part,
- "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
- })
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- rank_timeline = rank_timeline_map.get(news_id, [])
- items[platform_id].append(NewsItem(
- title=row[1],
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- rank_timeline=rank_timeline,
- ))
- # 获取失败的来源(针对最新一次抓取)
- cursor.execute("""
- SELECT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND css.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return NewsData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 获取最新数据失败: {e}")
- return None
- def _detect_new_titles_impl(self, current_data: NewsData) -> Dict[str, Dict]:
- """
- 检测新增的标题
- 该方法比较当前抓取数据与历史数据,找出新增的标题。
- 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
- Args:
- current_data: 当前抓取的数据
- Returns:
- 新增的标题数据 {source_id: {title: NewsItem}}
- """
- try:
- # 获取历史数据
- historical_data = self._get_today_all_data_impl(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- new_titles[source_id] = {item.title: item for item in news_list}
- return new_titles
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史标题(first_time < current_time 的标题)
- # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
- historical_titles: Dict[str, set] = {}
- for source_id, news_list in historical_data.items.items():
- historical_titles[source_id] = set()
- for item in news_list:
- first_time = item.first_time or item.crawl_time
- if first_time < current_time:
- historical_titles[source_id].add(item.title)
- # 检查是否有历史数据
- has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- hist_set = historical_titles.get(source_id, set())
- for item in news_list:
- if item.title not in hist_set:
- if source_id not in new_titles:
- new_titles[source_id] = {}
- new_titles[source_id][item.title] = item
- return new_titles
- except Exception as e:
- print(f"[存储] 检测新标题失败: {e}")
- return {}
- def _is_first_crawl_today_impl(self, date: Optional[str] = None) -> bool:
- """
- 检查是否是当天第一次抓取
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 是否是第一次抓取
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT COUNT(*) as count FROM crawl_records
- """)
- row = cursor.fetchone()
- count = row[0] if row else 0
- # 如果只有一条或没有记录,视为第一次抓取
- return count <= 1
- except Exception as e:
- print(f"[存储] 检查首次抓取失败: {e}")
- return True
- def _get_crawl_times_impl(self, date: Optional[str] = None) -> List[str]:
- """
- 获取指定日期的所有抓取时间列表
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 抓取时间列表(按时间排序)
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time
- """)
- rows = cursor.fetchall()
- return [row[0] for row in rows]
- except Exception as e:
- print(f"[存储] 获取抓取时间列表失败: {e}")
- return []
- # ========================================
- # 时间段执行记录(调度系统)
- # ========================================
- def _has_period_executed_impl(self, date_str: str, period_key: str, action: str) -> bool:
- """
- 检查指定时间段的某个 action 今天是否已执行
- Args:
- date_str: 日期字符串 YYYY-MM-DD
- period_key: 时间段 key
- action: 动作类型 (analyze / push)
- Returns:
- 是否已执行
- """
- try:
- conn = self._get_connection(date_str)
- cursor = conn.cursor()
- # 先检查表是否存在
- cursor.execute("""
- SELECT name FROM sqlite_master
- WHERE type='table' AND name='period_executions'
- """)
- if not cursor.fetchone():
- return False
- cursor.execute("""
- SELECT 1 FROM period_executions
- WHERE execution_date = ? AND period_key = ? AND action = ?
- """, (date_str, period_key, action))
- return cursor.fetchone() is not None
- except Exception as e:
- print(f"[存储] 检查时间段执行记录失败: {e}")
- return False
- def _record_period_execution_impl(self, date_str: str, period_key: str, action: str) -> bool:
- """
- 记录时间段的 action 执行
- Args:
- date_str: 日期字符串 YYYY-MM-DD
- period_key: 时间段 key
- action: 动作类型 (analyze / push)
- Returns:
- 是否记录成功
- """
- try:
- conn = self._get_connection(date_str)
- cursor = conn.cursor()
- # 确保表存在
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS period_executions (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- execution_date TEXT NOT NULL,
- period_key TEXT NOT NULL,
- action TEXT NOT NULL,
- executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- UNIQUE(execution_date, period_key, action)
- )
- """)
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- cursor.execute("""
- INSERT OR IGNORE INTO period_executions (execution_date, period_key, action, executed_at)
- VALUES (?, ?, ?, ?)
- """, (date_str, period_key, action, now_str))
- conn.commit()
- return True
- except Exception as e:
- print(f"[存储] 记录时间段执行失败: {e}")
- return False
- # ========================================
- # RSS 数据存储
- # ========================================
- def _save_rss_data_impl(self, data: RSSData, log_prefix: str = "[存储]") -> tuple[bool, int, int]:
- """
- 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
- Args:
- data: RSS 数据
- log_prefix: 日志前缀
- Returns:
- (success, new_count, updated_count)
- """
- try:
- conn = self._get_connection(data.date, db_type="rss")
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 同步 RSS 源信息到 rss_feeds 表
- for feed_id, feed_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (feed_id, feed_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- for feed_id, rss_list in data.items.items():
- for item in rss_list:
- try:
- # 检查是否已存在(通过 URL + feed_id)
- if item.url:
- cursor.execute("""
- SELECT id, title FROM rss_items
- WHERE url = ? AND feed_id = ?
- """, (item.url, feed_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id = existing[0]
- cursor.execute("""
- UPDATE rss_items SET
- title = ?,
- published_at = ?,
- summary = ?,
- author = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.published_at, item.summary,
- item.author, data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录(使用 ON CONFLICT 兜底处理并发/竞争场景)
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- ON CONFLICT(url, feed_id) DO UPDATE SET
- title = excluded.title,
- published_at = excluded.published_at,
- summary = excluded.summary,
- author = excluded.author,
- last_crawl_time = excluded.last_crawl_time,
- crawl_count = crawl_count + 1,
- updated_at = excluded.updated_at
- """, (item.title, feed_id, item.url, item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- else:
- # URL 为空,用 try-except 处理重复
- try:
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, feed_id, "", item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- except sqlite3.IntegrityError:
- # 重复的空 URL 条目,忽略
- pass
- except sqlite3.Error as e:
- print(f"{log_prefix} 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 记录抓取状态
- cursor.execute("""
- SELECT id FROM rss_crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的源
- for feed_id in data.items.keys():
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, feed_id))
- # 记录失败的源
- for failed_id in data.failed_ids:
- cursor.execute("""
- INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- return True, new_count, updated_count
- except Exception as e:
- print(f"{log_prefix} 保存 RSS 数据失败: {e}")
- return False, 0, 0
- def _get_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取指定日期的所有 RSS 数据
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- RSSData 对象,如果没有数据返回 None
- """
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取所有 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- ORDER BY i.published_at DESC
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- # 获取失败的源
- cursor.execute("""
- SELECT DISTINCT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cs.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 读取 RSS 数据失败: {e}")
- return None
- def _detect_new_rss_items_impl(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
- """
- 检测新增的 RSS 条目(增量模式)
- 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
- 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
- Args:
- current_data: 当前抓取的 RSS 数据
- Returns:
- 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
- """
- try:
- # 获取历史数据
- historical_data = self._get_rss_data_impl(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- return current_data.items.copy()
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史 URL(first_time < current_time 的条目)
- historical_urls: Dict[str, set] = {}
- for feed_id, rss_list in historical_data.items.items():
- historical_urls[feed_id] = set()
- for item in rss_list:
- first_time = item.first_time or item.crawl_time
- if first_time < current_time:
- if item.url:
- historical_urls[feed_id].add(item.url)
- # 检查是否有历史数据
- has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_items: Dict[str, List[RSSItem]] = {}
- for feed_id, rss_list in current_data.items.items():
- hist_set = historical_urls.get(feed_id, set())
- for item in rss_list:
- # 通过 URL 判断是否新增
- if item.url and item.url not in hist_set:
- if feed_id not in new_items:
- new_items[feed_id] = []
- new_items[feed_id].append(item)
- return new_items
- except Exception as e:
- print(f"[存储] 检测新 RSS 条目失败: {e}")
- return {}
- def _get_latest_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取最新一次抓取的 RSS 数据(当前榜单模式)
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 最新抓取的 RSS 数据,如果没有数据返回 None
- """
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- WHERE i.last_crawl_time = ?
- ORDER BY i.published_at DESC
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取失败的源(针对最新一次抓取)
- cursor.execute("""
- SELECT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND cs.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 获取最新 RSS 数据失败: {e}")
- return None
- # ========================================
- # AI 智能筛选 - 标签管理
- # ========================================
- def _get_active_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
- """获取指定兴趣文件的 active 标签列表"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT id, tag, description, version, prompt_hash, priority
- FROM ai_filter_tags
- WHERE status = 'active' AND interests_file = ?
- ORDER BY priority ASC, id ASC
- """, (interests_file,))
- return [
- {
- "id": row[0], "tag": row[1], "description": row[2],
- "version": row[3], "prompt_hash": row[4], "priority": row[5],
- }
- for row in cursor.fetchall()
- ]
- except Exception as e:
- print(f"[AI筛选] 获取标签失败: {e}")
- return []
- def _get_latest_prompt_hash_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> Optional[str]:
- """获取指定兴趣文件最新版本标签的 prompt_hash"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT prompt_hash FROM ai_filter_tags
- WHERE status = 'active' AND interests_file = ?
- ORDER BY version DESC
- LIMIT 1
- """, (interests_file,))
- row = cursor.fetchone()
- return row[0] if row else None
- except Exception as e:
- print(f"[AI筛选] 获取 prompt_hash 失败: {e}")
- return None
- def _get_latest_tag_version_impl(self, date: Optional[str] = None) -> int:
- """获取最新版本号"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT MAX(version) FROM ai_filter_tags
- """)
- row = cursor.fetchone()
- return row[0] if row and row[0] is not None else 0
- except Exception as e:
- print(f"[AI筛选] 获取版本号失败: {e}")
- return 0
- def _deprecate_all_tags_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> int:
- """将指定兴趣文件的 active 标签和关联的分类结果标记为 deprecated"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 获取该兴趣文件的 active 标签 id
- cursor.execute(
- "SELECT id FROM ai_filter_tags WHERE status = 'active' AND interests_file = ?",
- (interests_file,)
- )
- tag_ids = [row[0] for row in cursor.fetchall()]
- if not tag_ids:
- return 0
- # 废弃标签
- placeholders = ",".join("?" * len(tag_ids))
- cursor.execute(f"""
- UPDATE ai_filter_tags
- SET status = 'deprecated', deprecated_at = ?
- WHERE id IN ({placeholders})
- """, [now_str] + tag_ids)
- tag_count = cursor.rowcount
- # 废弃关联的分类结果
- placeholders = ",".join("?" * len(tag_ids))
- cursor.execute(f"""
- UPDATE ai_filter_results
- SET status = 'deprecated', deprecated_at = ?
- WHERE tag_id IN ({placeholders}) AND status = 'active'
- """, [now_str] + tag_ids)
- conn.commit()
- print(f"[AI筛选] 已废弃 {tag_count} 个标签及关联分类结果")
- return tag_count
- except Exception as e:
- print(f"[AI筛选] 废弃标签失败: {e}")
- return 0
- def _save_tags_impl(
- self, date: Optional[str], tags: List[Dict], version: int, prompt_hash: str,
- interests_file: str = "ai_interests.txt"
- ) -> int:
- """保存新提取的标签"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- count = 0
- for idx, tag_data in enumerate(tags, start=1):
- priority = tag_data.get("priority", idx)
- try:
- priority = int(priority)
- except (TypeError, ValueError):
- priority = idx
- cursor.execute("""
- INSERT INTO ai_filter_tags
- (tag, description, priority, version, prompt_hash, interests_file, created_at)
- VALUES (?, ?, ?, ?, ?, ?, ?)
- """, (
- tag_data["tag"],
- tag_data.get("description", ""),
- priority,
- version,
- prompt_hash,
- interests_file,
- now_str,
- ))
- count += 1
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 保存标签失败: {e}")
- return 0
- def _deprecate_specific_tags_impl(
- self, date: Optional[str], tag_ids: List[int]
- ) -> int:
- """废弃指定 ID 的标签及其关联分类结果(增量更新时使用)"""
- if not tag_ids:
- return 0
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- placeholders = ",".join("?" * len(tag_ids))
- cursor.execute(f"""
- UPDATE ai_filter_tags
- SET status = 'deprecated', deprecated_at = ?
- WHERE id IN ({placeholders})
- """, [now_str] + tag_ids)
- tag_count = cursor.rowcount
- cursor.execute(f"""
- UPDATE ai_filter_results
- SET status = 'deprecated', deprecated_at = ?
- WHERE tag_id IN ({placeholders}) AND status = 'active'
- """, [now_str] + tag_ids)
- conn.commit()
- return tag_count
- except Exception as e:
- print(f"[AI筛选] 废弃指定标签失败: {e}")
- return 0
- def _update_tags_hash_impl(
- self, date: Optional[str], interests_file: str, new_hash: str
- ) -> int:
- """更新指定兴趣文件所有 active 标签的 prompt_hash(增量更新时使用)"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- UPDATE ai_filter_tags
- SET prompt_hash = ?
- WHERE interests_file = ? AND status = 'active'
- """, (new_hash, interests_file))
- count = cursor.rowcount
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 更新标签 hash 失败: {e}")
- return 0
- # ========================================
- # AI 智能筛选 - 分类结果管理
- # ========================================
- def _update_tag_descriptions_impl(
- self, date: Optional[str], tag_updates: List[Dict],
- interests_file: str = "ai_interests.txt"
- ) -> int:
- """按 tag 名匹配,更新 active 标签的 description 字段"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- count = 0
- for t in tag_updates:
- tag_name = t.get("tag", "")
- description = t.get("description", "")
- if not tag_name:
- continue
- cursor.execute("""
- UPDATE ai_filter_tags
- SET description = ?
- WHERE tag = ? AND interests_file = ? AND status = 'active'
- """, (description, tag_name, interests_file))
- count += cursor.rowcount
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 更新标签描述失败: {e}")
- return 0
- def _update_tag_priorities_impl(
- self, date: Optional[str], tag_priorities: List[Dict],
- interests_file: str = "ai_interests.txt"
- ) -> int:
- """按 tag 名匹配,更新 active 标签的 priority 字段"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- count = 0
- for t in tag_priorities:
- tag_name = t.get("tag", "")
- priority = t.get("priority")
- if not tag_name:
- continue
- try:
- priority = int(priority)
- except (TypeError, ValueError):
- continue
- cursor.execute("""
- UPDATE ai_filter_tags
- SET priority = ?
- WHERE tag = ? AND interests_file = ? AND status = 'active'
- """, (priority, tag_name, interests_file))
- count += cursor.rowcount
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 更新标签优先级失败: {e}")
- return 0
- # ========================================
- # AI 智能筛选 - 已分析新闻追踪
- # ========================================
- def _save_analyzed_news_impl(
- self, date: Optional[str], news_ids: List[int], source_type: str,
- interests_file: str, prompt_hash: str, matched_ids: set
- ) -> int:
- """批量记录已分析的新闻(匹配与不匹配都记录)"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- count = 0
- for nid in news_ids:
- try:
- cursor.execute("""
- INSERT OR REPLACE INTO ai_filter_analyzed_news
- (news_item_id, source_type, interests_file, prompt_hash, matched, created_at)
- VALUES (?, ?, ?, ?, ?, ?)
- """, (
- nid, source_type, interests_file, prompt_hash,
- 1 if nid in matched_ids else 0,
- now_str,
- ))
- count += 1
- except Exception:
- pass
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 保存已分析记录失败: {e}")
- return 0
- def _get_analyzed_news_ids_impl(
- self, date: Optional[str] = None, source_type: str = "hotlist",
- interests_file: str = "ai_interests.txt"
- ) -> set:
- """获取已分析过的新闻 ID 集合(用于去重)"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT news_item_id FROM ai_filter_analyzed_news
- WHERE source_type = ? AND interests_file = ?
- """, (source_type, interests_file))
- return {row[0] for row in cursor.fetchall()}
- except Exception as e:
- print(f"[AI筛选] 获取已分析ID失败: {e}")
- return set()
- def _clear_analyzed_news_impl(
- self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
- ) -> int:
- """清除指定兴趣文件的所有已分析记录(全量重分类时使用)"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- DELETE FROM ai_filter_analyzed_news
- WHERE interests_file = ?
- """, (interests_file,))
- count = cursor.rowcount
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 清除已分析记录失败: {e}")
- return 0
- def _clear_unmatched_analyzed_news_impl(
- self, date: Optional[str] = None, interests_file: str = "ai_interests.txt"
- ) -> int:
- """清除不匹配的已分析记录,让这些新闻有机会被新标签重新分析"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- DELETE FROM ai_filter_analyzed_news
- WHERE interests_file = ? AND matched = 0
- """, (interests_file,))
- count = cursor.rowcount
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 清除不匹配记录失败: {e}")
- return 0
- # ========================================
- # AI 智能筛选 - 分类结果管理(原有)
- # ========================================
- def _save_filter_results_impl(
- self, date: Optional[str], results: List[Dict]
- ) -> int:
- """批量保存分类结果"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- count = 0
- for r in results:
- try:
- cursor.execute("""
- INSERT INTO ai_filter_results
- (news_item_id, source_type, tag_id, relevance_score, created_at)
- VALUES (?, ?, ?, ?, ?)
- """, (
- r["news_item_id"],
- r.get("source_type", "hotlist"),
- r["tag_id"],
- r.get("relevance_score", 0.0),
- now_str,
- ))
- count += 1
- except sqlite3.IntegrityError:
- pass # 重复记录,跳过
- conn.commit()
- return count
- except Exception as e:
- print(f"[AI筛选] 保存分类结果失败: {e}")
- return 0
- def _get_active_filter_results_impl(self, date: Optional[str] = None, interests_file: str = "ai_interests.txt") -> List[Dict[str, Any]]:
- """获取指定兴趣文件的 active 分类结果,JOIN news_items 获取新闻详情"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 热榜结果
- cursor.execute("""
- SELECT
- r.news_item_id, r.source_type, r.tag_id, r.relevance_score,
- t.tag, t.description as tag_description, t.priority,
- n.title, n.platform_id as source_id, p.name as source_name,
- n.url, n.mobile_url, n.rank,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM ai_filter_results r
- JOIN ai_filter_tags t ON r.tag_id = t.id
- JOIN news_items n ON r.news_item_id = n.id
- LEFT JOIN platforms p ON n.platform_id = p.id
- WHERE r.status = 'active' AND r.source_type = 'hotlist'
- AND t.status = 'active' AND t.interests_file = ?
- ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
- """, (interests_file,))
- results = []
- hotlist_news_ids = []
- for row in cursor.fetchall():
- results.append({
- "news_item_id": row[0], "source_type": row[1],
- "tag_id": row[2], "relevance_score": row[3],
- "tag": row[4], "tag_description": row[5], "tag_priority": row[6],
- "title": row[7], "source_id": row[8],
- "source_name": row[9] or row[8],
- "url": row[10] or "", "mobile_url": row[11] or "",
- "rank": row[12],
- "first_time": row[13], "last_time": row[14],
- "count": row[15],
- })
- hotlist_news_ids.append(row[0])
- # 批量查排名历史(热榜)
- ranks_map: Dict[int, List[int]] = {}
- if hotlist_news_ids:
- unique_ids = list(set(hotlist_news_ids))
- placeholders = ",".join("?" * len(unique_ids))
- cursor.execute(f"""
- SELECT news_item_id, rank FROM rank_history
- WHERE news_item_id IN ({placeholders}) AND rank != 0
- """, unique_ids)
- for rh_row in cursor.fetchall():
- nid, rank = rh_row[0], rh_row[1]
- if nid not in ranks_map:
- ranks_map[nid] = []
- if rank not in ranks_map[nid]:
- ranks_map[nid].append(rank)
- for item in results:
- item["ranks"] = ranks_map.get(item["news_item_id"], [item["rank"]])
- # RSS 结果(如果有 rss 库)
- try:
- rss_conn = self._get_connection(date, db_type="rss")
- rss_cursor = rss_conn.cursor()
- # 从 news 库获取 rss 类型的分类结果 ID
- cursor.execute("""
- SELECT r.news_item_id, r.tag_id, r.relevance_score,
- t.tag, t.description, t.priority
- FROM ai_filter_results r
- JOIN ai_filter_tags t ON r.tag_id = t.id
- WHERE r.status = 'active' AND r.source_type = 'rss'
- AND t.status = 'active' AND t.interests_file = ?
- ORDER BY t.priority ASC, t.id ASC, r.relevance_score DESC
- """, (interests_file,))
- rss_filter_rows = cursor.fetchall()
- if rss_filter_rows:
- rss_ids = [row[0] for row in rss_filter_rows]
- placeholders = ",".join("?" * len(rss_ids))
- rss_cursor.execute(f"""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- WHERE i.id IN ({placeholders})
- """, rss_ids)
- rss_info = {row[0]: row for row in rss_cursor.fetchall()}
- for fr_row in rss_filter_rows:
- rss_id = fr_row[0]
- info = rss_info.get(rss_id)
- if info:
- results.append({
- "news_item_id": rss_id,
- "source_type": "rss",
- "tag_id": fr_row[1],
- "relevance_score": fr_row[2],
- "tag": fr_row[3],
- "tag_description": fr_row[4],
- "tag_priority": fr_row[5],
- "title": info[1],
- "source_id": info[2],
- "source_name": info[3] or info[2],
- "url": info[4] or "",
- "mobile_url": "",
- "rank": 0,
- "ranks": [],
- "first_time": info[5] or "",
- "last_time": info[5] or "",
- "count": 1,
- })
- except Exception:
- pass # RSS 库不存在时静默跳过
- return results
- except Exception as e:
- print(f"[AI筛选] 获取分类结果失败: {e}")
- return []
- def _get_all_news_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
- """获取当日所有新闻的 id 和标题(用于 AI 筛选分类)"""
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- ORDER BY n.id
- """)
- return [
- {
- "id": row[0], "title": row[1],
- "source_id": row[2], "source_name": row[3] or row[2],
- }
- for row in cursor.fetchall()
- ]
- except Exception as e:
- print(f"[AI筛选] 获取新闻列表失败: {e}")
- return []
- def _get_all_rss_ids_impl(self, date: Optional[str] = None) -> List[Dict]:
- """获取当日所有 RSS 条目的 id 和标题(用于 AI 筛选分类)"""
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name, i.published_at
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- ORDER BY i.id
- """)
- return [
- {
- "id": row[0], "title": row[1],
- "source_id": row[2], "source_name": row[3] or row[2],
- "published_at": row[4] or "",
- }
- for row in cursor.fetchall()
- ]
- except Exception as e:
- print(f"[AI筛选] 获取 RSS 列表失败: {e}")
- return []
|