| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340 |
- # coding=utf-8
- """
- 本地存储后端 - SQLite + TXT/HTML
- 使用 SQLite 作为主存储,支持可选的 TXT 快照和 HTML 报告
- """
- import sqlite3
- import shutil
- import pytz
- import re
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional
- from trendradar.storage.base import StorageBackend, NewsItem, NewsData, RSSItem, RSSData
- from trendradar.utils.time import (
- get_configured_time,
- format_date_folder,
- format_time_filename,
- )
- from trendradar.utils.url import normalize_url
- class LocalStorageBackend(StorageBackend):
- """
- 本地存储后端
- 使用 SQLite 数据库存储新闻数据,支持:
- - 按日期组织的 SQLite 数据库文件
- - 可选的 TXT 快照(用于调试)
- - HTML 报告生成
- """
- def __init__(
- self,
- data_dir: str = "output",
- enable_txt: bool = True,
- enable_html: bool = True,
- timezone: str = "Asia/Shanghai",
- ):
- """
- 初始化本地存储后端
- Args:
- data_dir: 数据目录路径
- enable_txt: 是否启用 TXT 快照
- enable_html: 是否启用 HTML 报告
- timezone: 时区配置(默认 Asia/Shanghai)
- """
- self.data_dir = Path(data_dir)
- self.enable_txt = enable_txt
- self.enable_html = enable_html
- self.timezone = timezone
- self._db_connections: Dict[str, sqlite3.Connection] = {}
- @property
- def backend_name(self) -> str:
- return "local"
- @property
- def supports_txt(self) -> bool:
- return self.enable_txt
- def _get_configured_time(self) -> datetime:
- """获取配置时区的当前时间"""
- return get_configured_time(self.timezone)
- def _format_date_folder(self, date: Optional[str] = None) -> str:
- """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
- return format_date_folder(date, self.timezone)
- def _format_time_filename(self) -> str:
- """格式化时间文件名 (格式: HH-MM)"""
- return format_time_filename(self.timezone)
- def _get_db_path(self, date: Optional[str] = None, db_type: str = "news") -> Path:
- """
- 获取 SQLite 数据库路径
- 新结构(扁平):output/{type}/{date}.db
- - output/news/2025-12-28.db
- - output/rss/2025-12-28.db
- Args:
- date: 日期字符串
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- 数据库文件路径
- """
- date_str = self._format_date_folder(date)
- db_dir = self.data_dir / db_type
- db_dir.mkdir(parents=True, exist_ok=True)
- return db_dir / f"{date_str}.db"
- def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
- """
- 获取数据库连接(带缓存)
- Args:
- date: 日期字符串
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- 数据库连接
- """
- db_path = str(self._get_db_path(date, db_type))
- if db_path not in self._db_connections:
- conn = sqlite3.connect(db_path)
- conn.row_factory = sqlite3.Row
- self._init_tables(conn, db_type)
- self._db_connections[db_path] = conn
- return self._db_connections[db_path]
- def _get_schema_path(self, db_type: str = "news") -> Path:
- """
- 获取 schema.sql 文件路径
- Args:
- db_type: 数据库类型 ("news" 或 "rss")
- Returns:
- schema 文件路径
- """
- if db_type == "rss":
- return Path(__file__).parent / "rss_schema.sql"
- return Path(__file__).parent / "schema.sql"
- def _init_tables(self, conn: sqlite3.Connection, db_type: str = "news") -> None:
- """
- 从 schema.sql 初始化数据库表结构
- Args:
- conn: 数据库连接
- db_type: 数据库类型 ("news" 或 "rss")
- """
- schema_path = self._get_schema_path(db_type)
- if schema_path.exists():
- with open(schema_path, "r", encoding="utf-8") as f:
- schema_sql = f.read()
- conn.executescript(schema_sql)
- else:
- raise FileNotFoundError(f"Schema file not found: {schema_path}")
- conn.commit()
- def save_news_data(self, data: NewsData) -> bool:
- """
- 保存新闻数据到 SQLite(以 URL 为唯一标识,支持标题更新检测)
- Args:
- data: 新闻数据
- Returns:
- 是否保存成功
- """
- try:
- conn = self._get_connection(data.date)
- cursor = conn.cursor()
- # 获取配置时区的当前时间
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 首先同步平台信息到 platforms 表
- for source_id, source_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (source_id, source_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- title_changed_count = 0
- success_sources = []
- for source_id, news_list in data.items.items():
- success_sources.append(source_id)
- for item in news_list:
- try:
- # 标准化 URL(去除动态参数,如微博的 band_rank)
- normalized_url = normalize_url(item.url, source_id) if item.url else ""
- # 检查是否已存在(通过标准化 URL + platform_id)
- if normalized_url:
- cursor.execute("""
- SELECT id, title FROM news_items
- WHERE url = ? AND platform_id = ?
- """, (normalized_url, source_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id, existing_title = existing
- # 检查标题是否变化
- if existing_title != item.title:
- # 记录标题变更
- cursor.execute("""
- INSERT INTO title_changes
- (news_item_id, old_title, new_title, changed_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, existing_title, item.title, now_str))
- title_changed_count += 1
- # 记录排名历史
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (existing_id, item.rank, data.crawl_time, now_str))
- # 更新现有记录
- cursor.execute("""
- UPDATE news_items SET
- title = ?,
- rank = ?,
- mobile_url = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.rank, item.mobile_url,
- data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录(存储标准化后的 URL)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, normalized_url,
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- else:
- # URL 为空的情况,直接插入(不做去重)
- cursor.execute("""
- INSERT INTO news_items
- (title, platform_id, rank, url, mobile_url,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, source_id, item.rank, "",
- item.mobile_url, data.crawl_time, data.crawl_time,
- now_str, now_str))
- new_id = cursor.lastrowid
- # 记录初始排名
- cursor.execute("""
- INSERT INTO rank_history
- (news_item_id, rank, crawl_time, created_at)
- VALUES (?, ?, ?, ?)
- """, (new_id, item.rank, data.crawl_time, now_str))
- new_count += 1
- except sqlite3.Error as e:
- print(f"保存新闻条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 获取刚插入的 crawl_record 的 ID
- cursor.execute("""
- SELECT id FROM crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的来源
- for source_id in success_sources:
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, source_id))
- # 记录失败的来源
- for failed_id in data.failed_ids:
- # 确保失败的平台也在 platforms 表中
- cursor.execute("""
- INSERT OR IGNORE INTO platforms (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO crawl_source_status
- (crawl_record_id, platform_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- # 输出详细的存储统计日志
- log_parts = [f"[本地存储] 处理完成:新增 {new_count} 条"]
- if updated_count > 0:
- log_parts.append(f"更新 {updated_count} 条")
- if title_changed_count > 0:
- log_parts.append(f"标题变更 {title_changed_count} 条")
- print(",".join(log_parts))
- return True
- except Exception as e:
- print(f"[本地存储] 保存失败: {e}")
- return False
- def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取指定日期的所有新闻数据(合并后)
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 合并后的新闻数据
- """
- try:
- db_path = self._get_db_path(date)
- if not db_path.exists():
- return None
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取所有新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- ORDER BY n.platform_id, n.last_crawl_time
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史
- rank_history_map: Dict[int, List[int]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT news_item_id, rank FROM rank_history
- WHERE news_item_id IN ({placeholders})
- ORDER BY news_item_id, crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank = rh_row[0], rh_row[1]
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- # 按 platform_id 分组
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- title = row[1]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- items[platform_id].append(NewsItem(
- title=title,
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- ))
- final_items = items
- # 获取失败的来源
- cursor.execute("""
- SELECT DISTINCT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE css.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- return NewsData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=final_items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[本地存储] 读取数据失败: {e}")
- return None
- def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
- """
- 获取最新一次抓取的数据
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 最新抓取的新闻数据
- """
- try:
- db_path = self._get_db_path(date)
- if not db_path.exists():
- return None
- conn = self._get_connection(date)
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的新闻数据(包含 id 用于查询排名历史)
- cursor.execute("""
- SELECT n.id, n.title, n.platform_id, p.name as platform_name,
- n.rank, n.url, n.mobile_url,
- n.first_crawl_time, n.last_crawl_time, n.crawl_count
- FROM news_items n
- LEFT JOIN platforms p ON n.platform_id = p.id
- WHERE n.last_crawl_time = ?
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- # 收集所有 news_item_id
- news_ids = [row[0] for row in rows]
- # 批量查询排名历史
- rank_history_map: Dict[int, List[int]] = {}
- if news_ids:
- placeholders = ",".join("?" * len(news_ids))
- cursor.execute(f"""
- SELECT news_item_id, rank FROM rank_history
- WHERE news_item_id IN ({placeholders})
- ORDER BY news_item_id, crawl_time
- """, news_ids)
- for rh_row in cursor.fetchall():
- news_id, rank = rh_row[0], rh_row[1]
- if news_id not in rank_history_map:
- rank_history_map[news_id] = []
- if rank not in rank_history_map[news_id]:
- rank_history_map[news_id].append(rank)
- items: Dict[str, List[NewsItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- news_id = row[0]
- platform_id = row[2]
- platform_name = row[3] or platform_id
- id_to_name[platform_id] = platform_name
- if platform_id not in items:
- items[platform_id] = []
- # 获取排名历史,如果没有则使用当前排名
- ranks = rank_history_map.get(news_id, [row[4]])
- items[platform_id].append(NewsItem(
- title=row[1],
- source_id=platform_id,
- source_name=platform_name,
- rank=row[4],
- url=row[5] or "",
- mobile_url=row[6] or "",
- crawl_time=row[8], # last_crawl_time
- ranks=ranks,
- first_time=row[7], # first_crawl_time
- last_time=row[8], # last_crawl_time
- count=row[9], # crawl_count
- ))
- # 获取失败的来源(针对最新一次抓取)
- cursor.execute("""
- SELECT css.platform_id
- FROM crawl_source_status css
- JOIN crawl_records cr ON css.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND css.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return NewsData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[本地存储] 获取最新数据失败: {e}")
- return None
- def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
- """
- 检测新增的标题
- 该方法比较当前抓取数据与历史数据,找出新增的标题。
- 关键逻辑:只有在历史批次中从未出现过的标题才算新增。
- Args:
- current_data: 当前抓取的数据
- Returns:
- 新增的标题数据 {source_id: {title: NewsItem}}
- """
- try:
- # 获取历史数据
- historical_data = self.get_today_all_data(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- new_titles[source_id] = {item.title: item for item in news_list}
- return new_titles
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史标题(first_time < current_time 的标题)
- # 这样可以正确处理同一标题因 URL 变化而产生多条记录的情况
- historical_titles: Dict[str, set] = {}
- for source_id, news_list in historical_data.items.items():
- historical_titles[source_id] = set()
- for item in news_list:
- first_time = getattr(item, 'first_time', item.crawl_time)
- if first_time < current_time:
- historical_titles[source_id].add(item.title)
- # 检查是否有历史数据
- has_historical_data = any(len(titles) > 0 for titles in historical_titles.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_titles = {}
- for source_id, news_list in current_data.items.items():
- hist_set = historical_titles.get(source_id, set())
- for item in news_list:
- if item.title not in hist_set:
- if source_id not in new_titles:
- new_titles[source_id] = {}
- new_titles[source_id][item.title] = item
- return new_titles
- except Exception as e:
- print(f"[本地存储] 检测新标题失败: {e}")
- return {}
- def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
- """
- 保存 TXT 快照
- 新结构:output/txt/{date}/{time}.txt
- Args:
- data: 新闻数据
- Returns:
- 保存的文件路径
- """
- if not self.enable_txt:
- return None
- try:
- date_folder = self._format_date_folder(data.date)
- txt_dir = self.data_dir / "txt" / date_folder
- txt_dir.mkdir(parents=True, exist_ok=True)
- file_path = txt_dir / f"{data.crawl_time}.txt"
- with open(file_path, "w", encoding="utf-8") as f:
- for source_id, news_list in data.items.items():
- source_name = data.id_to_name.get(source_id, source_id)
- # 写入来源标题
- if source_name and source_name != source_id:
- f.write(f"{source_id} | {source_name}\n")
- else:
- f.write(f"{source_id}\n")
- # 按排名排序
- sorted_news = sorted(news_list, key=lambda x: x.rank)
- for item in sorted_news:
- line = f"{item.rank}. {item.title}"
- if item.url:
- line += f" [URL:{item.url}]"
- if item.mobile_url:
- line += f" [MOBILE:{item.mobile_url}]"
- f.write(line + "\n")
- f.write("\n")
- # 写入失败的来源
- if data.failed_ids:
- f.write("==== 以下ID请求失败 ====\n")
- for failed_id in data.failed_ids:
- f.write(f"{failed_id}\n")
- print(f"[本地存储] TXT 快照已保存: {file_path}")
- return str(file_path)
- except Exception as e:
- print(f"[本地存储] 保存 TXT 快照失败: {e}")
- return None
- def save_html_report(self, html_content: str, filename: str, is_summary: bool = False) -> Optional[str]:
- """
- 保存 HTML 报告
- 新结构:output/html/{date}/{filename}
- Args:
- html_content: HTML 内容
- filename: 文件名
- is_summary: 是否为汇总报告
- Returns:
- 保存的文件路径
- """
- if not self.enable_html:
- return None
- try:
- date_folder = self._format_date_folder()
- html_dir = self.data_dir / "html" / date_folder
- html_dir.mkdir(parents=True, exist_ok=True)
- file_path = html_dir / filename
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(html_content)
- print(f"[本地存储] HTML 报告已保存: {file_path}")
- return str(file_path)
- except Exception as e:
- print(f"[本地存储] 保存 HTML 报告失败: {e}")
- return None
- def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
- """
- 检查是否是当天第一次抓取
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 是否是第一次抓取
- """
- try:
- db_path = self._get_db_path(date)
- if not db_path.exists():
- return True
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT COUNT(*) as count FROM crawl_records
- """)
- row = cursor.fetchone()
- count = row[0] if row else 0
- # 如果只有一条或没有记录,视为第一次抓取
- return count <= 1
- except Exception as e:
- print(f"[本地存储] 检查首次抓取失败: {e}")
- return True
- def get_crawl_times(self, date: Optional[str] = None) -> List[str]:
- """
- 获取指定日期的所有抓取时间列表
- Args:
- date: 日期字符串,默认为今天
- Returns:
- 抓取时间列表(按时间排序)
- """
- try:
- db_path = self._get_db_path(date)
- if not db_path.exists():
- return []
- conn = self._get_connection(date)
- cursor = conn.cursor()
- cursor.execute("""
- SELECT crawl_time FROM crawl_records
- ORDER BY crawl_time
- """)
- rows = cursor.fetchall()
- return [row[0] for row in rows]
- except Exception as e:
- print(f"[本地存储] 获取抓取时间列表失败: {e}")
- return []
- def cleanup(self) -> None:
- """清理资源(关闭数据库连接)"""
- for db_path, conn in self._db_connections.items():
- try:
- conn.close()
- print(f"[本地存储] 关闭数据库连接: {db_path}")
- except Exception as e:
- print(f"[本地存储] 关闭连接失败 {db_path}: {e}")
- self._db_connections.clear()
- def cleanup_old_data(self, retention_days: int) -> int:
- """
- 清理过期数据
- 新结构清理逻辑:
- - output/news/{date}.db -> 删除过期的 .db 文件
- - output/rss/{date}.db -> 删除过期的 .db 文件
- - output/txt/{date}/ -> 删除过期的日期目录
- - output/html/{date}/ -> 删除过期的日期目录
- Args:
- retention_days: 保留天数(0 表示不清理)
- Returns:
- 删除的文件/目录数量
- """
- if retention_days <= 0:
- return 0
- deleted_count = 0
- cutoff_date = self._get_configured_time() - timedelta(days=retention_days)
- def parse_date_from_name(name: str) -> Optional[datetime]:
- """从文件名或目录名解析日期"""
- # 移除 .db 后缀
- name = name.replace('.db', '')
- try:
- # ISO 格式: YYYY-MM-DD
- date_match = re.match(r'(\d{4})-(\d{2})-(\d{2})', name)
- if date_match:
- return datetime(
- int(date_match.group(1)),
- int(date_match.group(2)),
- int(date_match.group(3)),
- tzinfo=pytz.timezone("Asia/Shanghai")
- )
- # 旧中文格式: YYYY年MM月DD日
- date_match = re.match(r'(\d{4})年(\d{2})月(\d{2})日', name)
- if date_match:
- return datetime(
- int(date_match.group(1)),
- int(date_match.group(2)),
- int(date_match.group(3)),
- tzinfo=pytz.timezone("Asia/Shanghai")
- )
- except Exception:
- pass
- return None
- try:
- if not self.data_dir.exists():
- return 0
- # 清理数据库文件 (news/, rss/)
- for db_type in ["news", "rss"]:
- db_dir = self.data_dir / db_type
- if not db_dir.exists():
- continue
- for db_file in db_dir.glob("*.db"):
- file_date = parse_date_from_name(db_file.name)
- if file_date and file_date < cutoff_date:
- # 先关闭数据库连接
- db_path = str(db_file)
- if db_path in self._db_connections:
- try:
- self._db_connections[db_path].close()
- del self._db_connections[db_path]
- except Exception:
- pass
- # 删除文件
- try:
- db_file.unlink()
- deleted_count += 1
- print(f"[本地存储] 清理过期数据: {db_type}/{db_file.name}")
- except Exception as e:
- print(f"[本地存储] 删除文件失败 {db_file}: {e}")
- # 清理快照目录 (txt/, html/)
- for snapshot_type in ["txt", "html"]:
- snapshot_dir = self.data_dir / snapshot_type
- if not snapshot_dir.exists():
- continue
- for date_folder in snapshot_dir.iterdir():
- if not date_folder.is_dir() or date_folder.name.startswith('.'):
- continue
- folder_date = parse_date_from_name(date_folder.name)
- if folder_date and folder_date < cutoff_date:
- try:
- shutil.rmtree(date_folder)
- deleted_count += 1
- print(f"[本地存储] 清理过期数据: {snapshot_type}/{date_folder.name}")
- except Exception as e:
- print(f"[本地存储] 删除目录失败 {date_folder}: {e}")
- if deleted_count > 0:
- print(f"[本地存储] 共清理 {deleted_count} 个过期文件/目录")
- return deleted_count
- except Exception as e:
- print(f"[本地存储] 清理过期数据失败: {e}")
- return deleted_count
- def has_pushed_today(self, date: Optional[str] = None) -> bool:
- """
- 检查指定日期是否已推送过
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否已推送
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- cursor.execute("""
- SELECT pushed FROM push_records WHERE date = ?
- """, (target_date,))
- row = cursor.fetchone()
- if row:
- return bool(row[0])
- return False
- except Exception as e:
- print(f"[本地存储] 检查推送记录失败: {e}")
- return False
- def record_push(self, report_type: str, date: Optional[str] = None) -> bool:
- """
- 记录推送
- Args:
- report_type: 报告类型
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 是否记录成功
- """
- try:
- conn = self._get_connection(date)
- cursor = conn.cursor()
- target_date = self._format_date_folder(date)
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- cursor.execute("""
- INSERT INTO push_records (date, pushed, push_time, report_type, created_at)
- VALUES (?, 1, ?, ?, ?)
- ON CONFLICT(date) DO UPDATE SET
- pushed = 1,
- push_time = excluded.push_time,
- report_type = excluded.report_type
- """, (target_date, now_str, report_type, now_str))
- conn.commit()
- print(f"[本地存储] 推送记录已保存: {report_type} at {now_str}")
- return True
- except Exception as e:
- print(f"[本地存储] 记录推送失败: {e}")
- return False
- # ========================================
- # RSS 数据存储方法
- # ========================================
- def save_rss_data(self, data: RSSData) -> bool:
- """
- 保存 RSS 数据到 SQLite(以 URL 为唯一标识)
- Args:
- data: RSS 数据
- Returns:
- 是否保存成功
- """
- try:
- conn = self._get_connection(data.date, db_type="rss")
- cursor = conn.cursor()
- now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
- # 同步 RSS 源信息到 rss_feeds 表
- for feed_id, feed_name in data.id_to_name.items():
- cursor.execute("""
- INSERT INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- ON CONFLICT(id) DO UPDATE SET
- name = excluded.name,
- updated_at = excluded.updated_at
- """, (feed_id, feed_name, now_str))
- # 统计计数器
- new_count = 0
- updated_count = 0
- for feed_id, rss_list in data.items.items():
- for item in rss_list:
- try:
- # 检查是否已存在(通过 URL + feed_id)
- if item.url:
- cursor.execute("""
- SELECT id, title FROM rss_items
- WHERE url = ? AND feed_id = ?
- """, (item.url, feed_id))
- existing = cursor.fetchone()
- if existing:
- # 已存在,更新记录
- existing_id = existing[0]
- cursor.execute("""
- UPDATE rss_items SET
- title = ?,
- published_at = ?,
- summary = ?,
- author = ?,
- last_crawl_time = ?,
- crawl_count = crawl_count + 1,
- updated_at = ?
- WHERE id = ?
- """, (item.title, item.published_at, item.summary,
- item.author, data.crawl_time, now_str, existing_id))
- updated_count += 1
- else:
- # 不存在,插入新记录
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, feed_id, item.url, item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- else:
- # URL 为空,直接插入
- cursor.execute("""
- INSERT INTO rss_items
- (title, feed_id, url, published_at, summary, author,
- first_crawl_time, last_crawl_time, crawl_count,
- created_at, updated_at)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)
- """, (item.title, feed_id, "", item.published_at,
- item.summary, item.author, data.crawl_time,
- data.crawl_time, now_str, now_str))
- new_count += 1
- except sqlite3.Error as e:
- print(f"[本地存储] 保存 RSS 条目失败 [{item.title[:30]}...]: {e}")
- total_items = new_count + updated_count
- # 记录抓取信息
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_records
- (crawl_time, total_items, created_at)
- VALUES (?, ?, ?)
- """, (data.crawl_time, total_items, now_str))
- # 记录抓取状态
- cursor.execute("""
- SELECT id FROM rss_crawl_records WHERE crawl_time = ?
- """, (data.crawl_time,))
- record_row = cursor.fetchone()
- if record_row:
- crawl_record_id = record_row[0]
- # 记录成功的源
- for feed_id in data.items.keys():
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'success')
- """, (crawl_record_id, feed_id))
- # 记录失败的源
- for failed_id in data.failed_ids:
- cursor.execute("""
- INSERT OR IGNORE INTO rss_feeds (id, name, updated_at)
- VALUES (?, ?, ?)
- """, (failed_id, failed_id, now_str))
- cursor.execute("""
- INSERT OR REPLACE INTO rss_crawl_status
- (crawl_record_id, feed_id, status)
- VALUES (?, ?, 'failed')
- """, (crawl_record_id, failed_id))
- conn.commit()
- # 输出统计日志
- log_parts = [f"[本地存储] RSS 处理完成:新增 {new_count} 条"]
- if updated_count > 0:
- log_parts.append(f"更新 {updated_count} 条")
- print(",".join(log_parts))
- return True
- except Exception as e:
- print(f"[本地存储] 保存 RSS 数据失败: {e}")
- return False
- def get_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取指定日期的所有 RSS 数据
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- RSSData 对象,如果没有数据返回 None
- """
- try:
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取所有 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- ORDER BY i.published_at DESC
- """)
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- crawl_time = time_row[0] if time_row else self._format_time_filename()
- # 获取失败的源
- cursor.execute("""
- SELECT DISTINCT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cs.status = 'failed'
- """)
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=crawl_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[本地存储] 读取 RSS 数据失败: {e}")
- return None
- def detect_new_rss_items(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
- """
- 检测新增的 RSS 条目(增量模式)
- 该方法比较当前抓取数据与历史数据,找出新增的 RSS 条目。
- 关键逻辑:只有在历史批次中从未出现过的 URL 才算新增。
- Args:
- current_data: 当前抓取的 RSS 数据
- Returns:
- 新增的 RSS 条目 {feed_id: [RSSItem, ...]}
- """
- try:
- # 获取历史数据
- historical_data = self.get_rss_data(current_data.date)
- if not historical_data:
- # 没有历史数据,所有都是新的
- return current_data.items.copy()
- # 获取当前批次时间
- current_time = current_data.crawl_time
- # 收集历史 URL(first_time < current_time 的条目)
- historical_urls: Dict[str, set] = {}
- for feed_id, rss_list in historical_data.items.items():
- historical_urls[feed_id] = set()
- for item in rss_list:
- first_time = getattr(item, 'first_time', item.crawl_time)
- if first_time < current_time:
- if item.url:
- historical_urls[feed_id].add(item.url)
- # 检查是否有历史数据
- has_historical_data = any(len(urls) > 0 for urls in historical_urls.values())
- if not has_historical_data:
- # 第一次抓取,没有"新增"概念
- return {}
- # 检测新增
- new_items: Dict[str, List[RSSItem]] = {}
- for feed_id, rss_list in current_data.items.items():
- hist_set = historical_urls.get(feed_id, set())
- for item in rss_list:
- # 通过 URL 判断是否新增
- if item.url and item.url not in hist_set:
- if feed_id not in new_items:
- new_items[feed_id] = []
- new_items[feed_id].append(item)
- return new_items
- except Exception as e:
- print(f"[本地存储] 检测新 RSS 条目失败: {e}")
- return {}
- def get_latest_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
- """
- 获取最新一次抓取的 RSS 数据(当前榜单模式)
- Args:
- date: 日期字符串(YYYY-MM-DD),默认为今天
- Returns:
- 最新抓取的 RSS 数据,如果没有数据返回 None
- """
- try:
- db_path = self._get_db_path(date, db_type="rss")
- if not db_path.exists():
- return None
- conn = self._get_connection(date, db_type="rss")
- cursor = conn.cursor()
- # 获取最新的抓取时间
- cursor.execute("""
- SELECT crawl_time FROM rss_crawl_records
- ORDER BY crawl_time DESC
- LIMIT 1
- """)
- time_row = cursor.fetchone()
- if not time_row:
- return None
- latest_time = time_row[0]
- # 获取该时间的 RSS 数据
- cursor.execute("""
- SELECT i.id, i.title, i.feed_id, f.name as feed_name,
- i.url, i.published_at, i.summary, i.author,
- i.first_crawl_time, i.last_crawl_time, i.crawl_count
- FROM rss_items i
- LEFT JOIN rss_feeds f ON i.feed_id = f.id
- WHERE i.last_crawl_time = ?
- ORDER BY i.published_at DESC
- """, (latest_time,))
- rows = cursor.fetchall()
- if not rows:
- return None
- items: Dict[str, List[RSSItem]] = {}
- id_to_name: Dict[str, str] = {}
- crawl_date = self._format_date_folder(date)
- for row in rows:
- feed_id = row[2]
- feed_name = row[3] or feed_id
- id_to_name[feed_id] = feed_name
- if feed_id not in items:
- items[feed_id] = []
- items[feed_id].append(RSSItem(
- title=row[1],
- feed_id=feed_id,
- feed_name=feed_name,
- url=row[4] or "",
- published_at=row[5] or "",
- summary=row[6] or "",
- author=row[7] or "",
- crawl_time=row[9],
- first_time=row[8],
- last_time=row[9],
- count=row[10],
- ))
- # 获取失败的源(针对最新一次抓取)
- cursor.execute("""
- SELECT cs.feed_id
- FROM rss_crawl_status cs
- JOIN rss_crawl_records cr ON cs.crawl_record_id = cr.id
- WHERE cr.crawl_time = ? AND cs.status = 'failed'
- """, (latest_time,))
- failed_ids = [row[0] for row in cursor.fetchall()]
- return RSSData(
- date=crawl_date,
- crawl_time=latest_time,
- items=items,
- id_to_name=id_to_name,
- failed_ids=failed_ids,
- )
- except Exception as e:
- print(f"[本地存储] 获取最新 RSS 数据失败: {e}")
- return None
- def __del__(self):
- """析构函数,确保关闭连接"""
- self.cleanup()
|