| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308 |
- # coding=utf-8
- """
- SQLite 存储 Mixin
- 提供共用的 SQLite 数据库操作逻辑,供 LocalStorageBackend 和 RemoteStorageBackend 复用。
- """
- import sqlite3
- from abc import abstractmethod
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from trendradar.storage.base import NewsItem, NewsData, RSSItem, RSSData
- from trendradar.utils.url import normalize_url
- class SQLiteStorageMixin:
- """
- SQLite 存储操作 Mixin
- 子类需要实现以下抽象方法:
- - _get_connection(date, db_type) -> sqlite3.Connection
- - _get_configured_time() -> datetime
- - _format_date_folder(date) -> str
- - _format_time_filename() -> str
- """
- # ========================================
- # 抽象方法 - 子类必须实现
- # ========================================
- @abstractmethod
- def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
- """获取数据库连接"""
- pass
- @abstractmethod
- def _get_configured_time(self) -> datetime:
- """获取配置时区的当前时间"""
- pass
- @abstractmethod
- def _format_date_folder(self, date: Optional[str] = None) -> str:
- """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
- pass
- @abstractmethod
- def _format_time_filename(self) -> str:
- """格式化时间文件名 (格式: HH-MM)"""
- pass
- # ========================================
- # Schema 管理
- # ========================================
- def _get_schema_path(self, db_type: str = "news") -> Path:
- """
- 获取 schema.sql 文件路径
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- schema 文件路径
- """
- if db_type == "rss":
- return Path(__file__).parent / "rss_schema.sql"
- return Path(__file__).parent / "schema.sql"
- def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
- """
- 从 schema.sql 初始化数据库表结构
- Args:
- conn: 数据库连接
- db_type: 数据库类型 ("news" 或 "rss")
- """
- schema_path = self._get_schema_path(db_type)
- if schema_path.exists():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_sql = f.read()
- conn.executescript(schema_sql)
- else:
- raise FileNotFoundError(f"Schema file not found: {schema_path}")
- conn.commit()
- # ========================================
- # 新闻数据存储
- # ========================================
- def _save_news_data_impl(self, data: NewsData, log_prefix: str = "[存储]") -> tuple[bool, int, int, int, int]:
- """
- 保存新闻数据到 SQLite(核心实现)
- Args:
- data: 新闻数据
- log_prefix: 日志前缀
- Returns:
- (success, new_count, updated_count, title_changed_count, off_list_count)
- """
- try:
- conn = self._get_connection(data.date)
- cursor = conn.cursor()
- # 获取配置时区的当前时间
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 首先同步平台信息到 platforms 表
- for source_id, source_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (source_id, source_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- title_changed_count = 0
- success_sources = []
- for source_id, news_list in data.items.items():
- success_sources.append(source_id)
- for item in news_list:
- try:
- # 标准化 URL(去除动态参数,如微博的 band_rank)
- normalized_url = normalize_url(item.url, source_id) if item.url else ""
- # 检查是否已存在(通过标准化 URL + platform_id)
- if normalized_url:
- cursor.execute("""
- SELECT id, title FROM news_items
- WHERE url = ? AND platform_id = ?
- """, (normalized_url, source_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id, existing_title = existing
- # 检查标题是否变化
- if existing_title != item.title:
- # 记录标题变更
- cursor.execute("""
- INSERT INTO title_changes
- (news_item_id, old_title, new_title, changed_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, existing_title, item.title, now_str))
- title_changed_count += 1
- # 记录排名历史
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, item.rank, data.crawl_time, now_str))
- # 更新现有记录
- cursor.execute("""
- UPDATE news_items SET
- title = ?,
- rank = ?,
- mobile_url = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.rank, item.mobile_url,
- data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录(存储标准化后的 URL)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, normalized_url,
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- else:
- # URL 为空的情况,直接插入(不做去重)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, "",
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- except sqlite3.Error as e:
- print(f"{log_prefix} 保存新闻条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # ========================================
- # 脱榜检测:检测上次在榜但这次不在榜的新闻
- # ========================================
- off_list_count = 0
- # 获取上一次抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- WHERE crawl_time < ?
- ORDER BY crawl_time DESC
- LIMIT 1
- """, (data.crawl_time,))
- prev_record = cursor.fetchone()
- if prev_record:
- prev_crawl_time = prev_record[0]
- # 对于每个成功抓取的平台,检测脱榜
- for source_id in success_sources:
- # 获取当前抓取中该平台的所有标准化 URL
- current_urls = set()
- for item in data.items.get(source_id, []):
- normalized_url = normalize_url(item.url, source_id) if item.url else ""
- if normalized_url:
- current_urls.add(normalized_url)
- # 查询上次在榜(last_crawl_time = prev_crawl_time)但这次不在榜的新闻
- # 这些新闻是"第一次脱榜",需要记录
- cursor.execute("""
- SELECT id, url FROM news_items
- WHERE platform_id = ?
- AND last_crawl_time = ?
- AND url != ''
- """, (source_id, prev_crawl_time))
- for row in cursor.fetchall():
- news_id, url = row[0], row[1]
- if url not in current_urls:
- # 插入脱榜记录(rank=0 表示脱榜)
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, 0, ?, ?)
- """, (news_id, data.crawl_time, now_str))
- off_list_count += 1
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 获取刚插入的 crawl_record 的 ID
- cursor.execute("""
- SELECT id FROM crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的来源
- for source_id in success_sources:
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, source_id))
- # 记录失败的来源
- for failed_id in data.failed_ids:
- # 确保失败的平台也在 platforms 表中
- cursor.execute("""
- INSERT OR IGNORE INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- return True, new_count, updated_count, title_changed_count, off_list_count
- except Exception as e:
- print(f"{log_prefix} 保存失败: {e}")
- return False, 0, 0, 0, 0
- def _get_today_all_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取指定日期的所有新闻数据(合并后)
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 合并后的新闻数据
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取所有新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- ORDER BY n.platform_id, n.last_crawl_time
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史(同时获取时间和排名)
- # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
- # 这样可以避免显示新闻永久脱榜后的无意义记录
- rank_history_map: Dict[int, List[int]] = {}
- rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT rh.news_item_id, rh.rank, rh.crawl_time
- FROM rank_history rh
- JOIN news_items ni ON rh.news_item_id = ni.id
- WHERE rh.news_item_id IN ({placeholders})
- AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
- ORDER BY rh.news_item_id, rh.crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
- # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank != 0 and rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- # 构建 rank_timeline 列表(完整时间线,包含脱榜)
- if news_id not in rank_timeline_map:
- rank_timeline_map[news_id] = []
- # 提取时间部分(HH:MM)
- time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
- rank_timeline_map[news_id].append({
- "time": time_part,
- "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
- })
- # 按 platform_id 分组
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- title = row[1]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- rank_timeline = rank_timeline_map.get(news_id, [])
- items[platform_id].append(NewsItem(
- title=title,
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- rank_timeline=rank_timeline,
- ))
- final_items = items
- # 获取失败的来源
- cursor.execute("""
- SELECT DISTINCT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE css.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- return NewsData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=final_items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 读取数据失败: {e}")
- return None
- def _get_latest_crawl_data_impl(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取最新一次抓取的数据
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 最新抓取的新闻数据
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- WHERE n.last_crawl_time = ?
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史(同时获取时间和排名)
- # 过滤逻辑:只保留 last_crawl_time 之前的脱榜记录(rank=0)
- # 这样可以避免显示新闻永久脱榜后的无意义记录
- rank_history_map: Dict[int, List[int]] = {}
- rank_timeline_map: Dict[int, List[Dict[str, Any]]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT rh.news_item_id, rh.rank, rh.crawl_time
- FROM rank_history rh
- JOIN news_items ni ON rh.news_item_id = ni.id
- WHERE rh.news_item_id IN ({placeholders})
- AND NOT (rh.rank = 0 AND rh.crawl_time > ni.last_crawl_time)
- ORDER BY rh.news_item_id, rh.crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank, crawl_time = rh_row[0], rh_row[1], rh_row[2]
- # 构建 ranks 列表(去重,排除脱榜记录 rank=0)
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank != 0 and rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- # 构建 rank_timeline 列表(完整时间线,包含脱榜)
- if news_id not in rank_timeline_map:
- rank_timeline_map[news_id] = []
- # 提取时间部分(HH:MM)
- time_part = crawl_time.split()[1][:5] if ' ' in crawl_time else crawl_time[:5]
- rank_timeline_map[news_id].append({
- "time": time_part,
- "rank": rank if rank != 0 else None # 0 转为 None 表示脱榜
- })
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- rank_timeline = rank_timeline_map.get(news_id, [])
- items[platform_id].append(NewsItem(
- title=row[1],
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- rank_timeline=rank_timeline,
- ))
- # 获取失败的来源(针对最新一次抓取)
- cursor.execute("""
- SELECT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND css.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return NewsData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 获取最新数据失败: {e}")
- return None
- def _detect_new_titles_impl(self, current_data: NewsData) -> Dict[str, Dict]:
- """
- 检测新增的标题
- 该方法比较当前抓取数据与历史数据,找出新增的标题。
- 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
- Args:
- current_data: 当前抓取的数据
- Returns:
- 新增的标题数据 {source_id: {title: NewsItem}}
- """
- try:
- # 获取历史数据
- historical_data = self._get_today_all_data_impl(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- new_titles[source_id] = {item.title: item for item in news_list}
- return new_titles
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史标题(first_time < current_time 的标题)
- # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
- historical_titles: Dict[str, set] = {}
- for source_id, news_list in historical_data.items.items():
- historical_titles[source_id] = set()
- for item in news_list:
- first_time = getattr(item, 'first_time', item.crawl_time)
- if first_time < current_time:
- historical_titles[source_id].add(item.title)
- # 检查是否有历史数据
- has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- hist_set = historical_titles.get(source_id, set())
- for item in news_list:
- if item.title not in hist_set:
- if source_id not in new_titles:
- new_titles[source_id] = {}
- new_titles[source_id][item.title] = item
- return new_titles
- except Exception as e:
- print(f"[存储] 检测新标题失败: {e}")
- return {}
- def _is_first_crawl_today_impl(self, date: Optional[str] = None) -> bool:
- """
- 检查是否是当天第一次抓取
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 是否是第一次抓取
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT COUNT(*) as count FROM crawl_records
- """)
- row = cursor.fetchone()
- count = row[0] if row else 0
- # 如果只有一条或没有记录,视为第一次抓取
- return count <= 1
- except Exception as e:
- print(f"[存储] 检查首次抓取失败: {e}")
- return True
- def _get_crawl_times_impl(self, date: Optional[str] = None) -> List[str]:
- """
- 获取指定日期的所有抓取时间列表
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 抓取时间列表(按时间排序)
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time
- """)
- rows = cursor.fetchall()
- return [row[0] for row in rows]
- except Exception as e:
- print(f"[存储] 获取抓取时间列表失败: {e}")
- return []
- # ========================================
- # 推送记录
- # ========================================
- def _has_pushed_today_impl(self, date: Optional[str] = None) -> bool:
- """
- 检查指定日期是否已推送过
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否已推送
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- SELECT pushed FROM push_records WHERE date = ?
- """, (target_date,))
- row = cursor.fetchone()
- if row:
- return bool(row[0])
- return False
- except Exception as e:
- print(f"[存储] 检查推送记录失败: {e}")
- return False
- def _record_push_impl(self, report_type: str, date: Optional[str] = None) -> bool:
- """
- 记录推送
- Args:
- report_type: 报告类型
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否记录成功
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- cursor.execute("""
- INSERT INTO push_records (date, pushed, push_time, report_type, created_at)
- VALUES (?, 1, ?, ?, ?)
- ON CONFLICT(date) DO UPDATE SET
- pushed = 1,
- push_time = excluded.push_time,
- report_type = excluded.report_type
- """, (target_date, now_str, report_type, now_str))
- conn.commit()
- return True
- except Exception as e:
- print(f"[存储] 记录推送失败: {e}")
- return False
- def _has_ai_analyzed_today_impl(self, date: Optional[str] = None) -> bool:
- """
- 检查指定日期是否已进行过 AI 分析
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否已分析
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- SELECT ai_analyzed FROM push_records WHERE date = ?
- """, (target_date,))
- row = cursor.fetchone()
- if row:
- return bool(row[0])
- return False
- except Exception as e:
- print(f"[存储] 检查 AI 分析记录失败: {e}")
- return False
- def _record_ai_analysis_impl(self, analysis_mode: str, date: Optional[str] = None) -> bool:
- """
- 记录 AI 分析
- Args:
- analysis_mode: 分析模式(daily/current/incremental)
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否记录成功
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- cursor.execute("""
- INSERT INTO push_records (date, ai_analyzed, ai_analysis_time, ai_analysis_mode, created_at)
- VALUES (?, 1, ?, ?, ?)
- ON CONFLICT(date) DO UPDATE SET
- ai_analyzed = 1,
- ai_analysis_time = excluded.ai_analysis_time,
- ai_analysis_mode = excluded.ai_analysis_mode
- """, (target_date, now_str, analysis_mode, now_str))
- conn.commit()
- return True
- except Exception as e:
- print(f"[存储] 记录 AI 分析失败: {e}")
- return False
- def _reset_push_state_impl(self, date: Optional[str] = None) -> bool:
- """
- 重置推送状态
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否重置成功
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- UPDATE push_records
- SET pushed = 0, push_time = NULL
- WHERE date = ?
- """, (target_date,))
- conn.commit()
- print(f"[存储] 已重置 {target_date} 的推送状态")
- return True
- except Exception as e:
- print(f"[存储] 重置推送状态失败: {e}")
- return False
- def _reset_ai_analysis_state_impl(self, date: Optional[str] = None) -> bool:
- """
- 重置 AI 分析状态
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否重置成功
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- UPDATE push_records
- SET ai_analyzed = 0, ai_analysis_time = NULL, ai_analysis_mode = NULL
- WHERE date = ?
- """, (target_date,))
- conn.commit()
- print(f"[存储] 已重置 {target_date} 的 AI 分析状态")
- return True
- except Exception as e:
- print(f"[存储] 重置 AI 分析状态失败: {e}")
- return False
- def _get_push_status_impl(self, date: Optional[str] = None) -> dict:
- """
- 获取推送状态详情
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 状态详情字典
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- SELECT date, pushed, push_time, report_type,
- ai_analyzed, ai_analysis_time, ai_analysis_mode
- FROM push_records
- WHERE date = ?
- """, (target_date,))
- row = cursor.fetchone()
- if row:
- return {
- "date": row[0],
- "pushed": bool(row[1]),
- "push_time": row[2],
- "report_type": row[3],
- "ai_analyzed": bool(row[4]),
- "ai_analysis_time": row[5],
- "ai_analysis_mode": row[6],
- }
- return {
- "date": target_date,
- "pushed": False,
- "push_time": None,
- "report_type": None,
- "ai_analyzed": False,
- "ai_analysis_time": None,
- "ai_analysis_mode": None,
- }
- except Exception as e:
- print(f"[存储] 获取推送状态失败: {e}")
- return {}
- # ========================================
- # RSS 数据存储
- # ========================================
- def _save_rss_data_impl(self, data: RSSData, log_prefix: str = "[存储]") -> tuple[bool, int, int]:
- """
- 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
- Args:
- data: RSS 数据
- log_prefix: 日志前缀
- Returns:
- (success, new_count, updated_count)
- """
- try:
- conn = self._get_connection(data.date, db_type="rss")
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 同步 RSS 源信息到 rss_feeds 表
- for feed_id, feed_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (feed_id, feed_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- for feed_id, rss_list in data.items.items():
- for item in rss_list:
- try:
- # 检查是否已存在(通过 URL + feed_id)
- if item.url:
- cursor.execute("""
- SELECT id, title FROM rss_items
- WHERE url = ? AND feed_id = ?
- """, (item.url, feed_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id = existing[0]
- cursor.execute("""
- UPDATE rss_items SET
- title = ?,
- published_at = ?,
- summary = ?,
- author = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.published_at, item.summary,
- item.author, data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录(使用 ON CONFLICT 兜底处理并发/竞争场景)
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- ON CONFLICT(url, feed_id) DO UPDATE SET
- title = excluded.title,
- published_at = excluded.published_at,
- summary = excluded.summary,
- author = excluded.author,
- last_crawl_time = excluded.last_crawl_time,
- crawl_count = crawl_count + 1,
- updated_at = excluded.updated_at
- """, (item.title, feed_id, item.url, item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- else:
- # URL 为空,用 try-except 处理重复
- try:
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, feed_id, "", item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- except sqlite3.IntegrityError:
- # 重复的空 URL 条目,忽略
- pass
- except sqlite3.Error as e:
- print(f"{log_prefix} 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 记录抓取状态
- cursor.execute("""
- SELECT id FROM rss_crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的源
- for feed_id in data.items.keys():
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, feed_id))
- # 记录失败的源
- for failed_id in data.failed_ids:
- cursor.execute("""
- INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- return True, new_count, updated_count
- except Exception as e:
- print(f"{log_prefix} 保存 RSS 数据失败: {e}")
- return False, 0, 0
- def _get_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取指定日期的所有 RSS 数据
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- RSSData 对象,如果没有数据返回 None
- """
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取所有 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- ORDER BY i.published_at DESC
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- # 获取失败的源
- cursor.execute("""
- SELECT DISTINCT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cs.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 读取 RSS 数据失败: {e}")
- return None
- def _detect_new_rss_items_impl(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
- """
- 检测新增的 RSS 条目(增量模式)
- 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
- 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
- Args:
- current_data: 当前抓取的 RSS 数据
- Returns:
- 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
- """
- try:
- # 获取历史数据
- historical_data = self._get_rss_data_impl(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- return current_data.items.copy()
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史 URL(first_time < current_time 的条目)
- historical_urls: Dict[str, set] = {}
- for feed_id, rss_list in historical_data.items.items():
- historical_urls[feed_id] = set()
- for item in rss_list:
- first_time = getattr(item, 'first_time', item.crawl_time)
- if first_time < current_time:
- if item.url:
- historical_urls[feed_id].add(item.url)
- # 检查是否有历史数据
- has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_items: Dict[str, List[RSSItem]] = {}
- for feed_id, rss_list in current_data.items.items():
- hist_set = historical_urls.get(feed_id, set())
- for item in rss_list:
- # 通过 URL 判断是否新增
- if item.url and item.url not in hist_set:
- if feed_id not in new_items:
- new_items[feed_id] = []
- new_items[feed_id].append(item)
- return new_items
- except Exception as e:
- print(f"[存储] 检测新 RSS 条目失败: {e}")
- return {}
- def _get_latest_rss_data_impl(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取最新一次抓取的 RSS 数据(当前榜单模式)
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 最新抓取的 RSS 数据,如果没有数据返回 None
- """
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- WHERE i.last_crawl_time = ?
- ORDER BY i.published_at DESC
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取失败的源(针对最新一次抓取)
- cursor.execute("""
- SELECT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND cs.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[存储] 获取最新 RSS 数据失败: {e}")
- return None
|