local.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. # coding=utf-8
  2. """
  3. 本地存储后端 - SQLite + TXT/HTML
  4. 使用 SQLite 作为主存储,支持可选的 TXT 快照和 HTML 报告
  5. """
  6. import sqlite3
  7. import shutil
  8. import pytz
  9. import re
  10. from datetime import datetime, timedelta
  11. from pathlib import Path
  12. from typing import Dict, List, Optional
  13. from trendradar.storage.base import StorageBackend, NewsData, RSSItem, RSSData
  14. from trendradar.storage.sqlite_mixin import SQLiteStorageMixin
  15. from trendradar.utils.time import (
  16. DEFAULT_TIMEZONE,
  17. get_configured_time,
  18. format_date_folder,
  19. format_time_filename,
  20. )
  21. class LocalStorageBackend(SQLiteStorageMixin, StorageBackend):
  22. """
  23. 本地存储后端
  24. 使用 SQLite 数据库存储新闻数据,支持:
  25. - 按日期组织的 SQLite 数据库文件
  26. - 可选的 TXT 快照(用于调试)
  27. - HTML 报告生成
  28. """
  29. def __init__(
  30. self,
  31. data_dir: str = "output",
  32. enable_txt: bool = True,
  33. enable_html: bool = True,
  34. timezone: str = DEFAULT_TIMEZONE,
  35. ):
  36. """
  37. 初始化本地存储后端
  38. Args:
  39. data_dir: 数据目录路径
  40. enable_txt: 是否启用 TXT 快照
  41. enable_html: 是否启用 HTML 报告
  42. timezone: 时区配置
  43. """
  44. self.data_dir = Path(data_dir)
  45. self.enable_txt = enable_txt
  46. self.enable_html = enable_html
  47. self.timezone = timezone
  48. self._db_connections: Dict[str, sqlite3.Connection] = {}
  49. @property
  50. def backend_name(self) -> str:
  51. return "local"
  52. @property
  53. def supports_txt(self) -> bool:
  54. return self.enable_txt
  55. # ========================================
  56. # SQLiteStorageMixin 抽象方法实现
  57. # ========================================
  58. def _get_configured_time(self) -> datetime:
  59. """获取配置时区的当前时间"""
  60. return get_configured_time(self.timezone)
  61. def _format_date_folder(self, date: Optional[str] = None) -> str:
  62. """格式化日期文件夹名 (ISO 格式: YYYY-MM-DD)"""
  63. return format_date_folder(date, self.timezone)
  64. def _format_time_filename(self) -> str:
  65. """格式化时间文件名 (格式: HH-MM)"""
  66. return format_time_filename(self.timezone)
  67. def _get_db_path(self, date: Optional[str] = None, db_type: str = "news") -> Path:
  68. """
  69. 获取 SQLite 数据库路径
  70. 新结构(扁平):output/{type}/{date}.db
  71. - output/news/2025-12-28.db
  72. - output/rss/2025-12-28.db
  73. Args:
  74. date: 日期字符串
  75. db_type: 数据库类型 ("news" 或 "rss")
  76. Returns:
  77. 数据库文件路径
  78. """
  79. date_str = self._format_date_folder(date)
  80. db_dir = self.data_dir / db_type
  81. db_dir.mkdir(parents=True, exist_ok=True)
  82. return db_dir / f"{date_str}.db"
  83. def _get_connection(self, date: Optional[str] = None, db_type: str = "news") -> sqlite3.Connection:
  84. """
  85. 获取数据库连接(带缓存)
  86. Args:
  87. date: 日期字符串
  88. db_type: 数据库类型 ("news" 或 "rss")
  89. Returns:
  90. 数据库连接
  91. """
  92. db_path = str(self._get_db_path(date, db_type))
  93. if db_path not in self._db_connections:
  94. conn = sqlite3.connect(db_path)
  95. conn.row_factory = sqlite3.Row
  96. self._init_tables(conn, db_type)
  97. self._db_connections[db_path] = conn
  98. return self._db_connections[db_path]
  99. # ========================================
  100. # StorageBackend 接口实现(委托给 mixin)
  101. # ========================================
  102. def save_news_data(self, data: NewsData) -> bool:
  103. """保存新闻数据到 SQLite"""
  104. db_path = self._get_db_path(data.date)
  105. if not db_path.exists():
  106. # 确保目录存在
  107. db_path.parent.mkdir(parents=True, exist_ok=True)
  108. success, new_count, updated_count, title_changed_count, off_list_count = \
  109. self._save_news_data_impl(data, "[本地存储]")
  110. if success:
  111. # 输出详细的存储统计日志
  112. log_parts = [f"[本地存储] 处理完成:新增 {new_count} 条"]
  113. if updated_count > 0:
  114. log_parts.append(f"更新 {updated_count} 条")
  115. if title_changed_count > 0:
  116. log_parts.append(f"标题变更 {title_changed_count} 条")
  117. if off_list_count > 0:
  118. log_parts.append(f"脱榜 {off_list_count} 条")
  119. print(",".join(log_parts))
  120. return success
  121. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  122. """获取指定日期的所有新闻数据(合并后)"""
  123. db_path = self._get_db_path(date)
  124. if not db_path.exists():
  125. return None
  126. return self._get_today_all_data_impl(date)
  127. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  128. """获取最新一次抓取的数据"""
  129. db_path = self._get_db_path(date)
  130. if not db_path.exists():
  131. return None
  132. return self._get_latest_crawl_data_impl(date)
  133. def detect_new_titles(self, current_data: NewsData) -> Dict[str, Dict]:
  134. """检测新增的标题"""
  135. return self._detect_new_titles_impl(current_data)
  136. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  137. """检查是否是当天第一次抓取"""
  138. db_path = self._get_db_path(date)
  139. if not db_path.exists():
  140. return True
  141. return self._is_first_crawl_today_impl(date)
  142. def get_crawl_times(self, date: Optional[str] = None) -> List[str]:
  143. """获取指定日期的所有抓取时间列表"""
  144. db_path = self._get_db_path(date)
  145. if not db_path.exists():
  146. return []
  147. return self._get_crawl_times_impl(date)
  148. # ========================================
  149. # 时间段执行记录(调度系统)
  150. # ========================================
  151. def has_period_executed(self, date_str: str, period_key: str, action: str) -> bool:
  152. """检查指定时间段的某个 action 是否已执行"""
  153. return self._has_period_executed_impl(date_str, period_key, action)
  154. def record_period_execution(self, date_str: str, period_key: str, action: str) -> bool:
  155. """记录时间段的 action 执行"""
  156. success = self._record_period_execution_impl(date_str, period_key, action)
  157. if success:
  158. now_str = self._get_configured_time().strftime("%Y-%m-%d %H:%M:%S")
  159. print(f"[本地存储] 时间段执行记录已保存: {period_key}/{action} at {now_str}")
  160. return success
  161. # ========================================
  162. # RSS 数据存储方法
  163. # ========================================
  164. def save_rss_data(self, data: RSSData) -> bool:
  165. """保存 RSS 数据到 SQLite"""
  166. success, new_count, updated_count = self._save_rss_data_impl(data, "[本地存储]")
  167. if success:
  168. # 输出统计日志
  169. log_parts = [f"[本地存储] RSS 处理完成:新增 {new_count} 条"]
  170. if updated_count > 0:
  171. log_parts.append(f"更新 {updated_count} 条")
  172. print(",".join(log_parts))
  173. return success
  174. def get_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  175. """获取指定日期的所有 RSS 数据"""
  176. return self._get_rss_data_impl(date)
  177. def detect_new_rss_items(self, current_data: RSSData) -> Dict[str, List[RSSItem]]:
  178. """检测新增的 RSS 条目"""
  179. return self._detect_new_rss_items_impl(current_data)
  180. def get_latest_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  181. """获取最新一次抓取的 RSS 数据"""
  182. db_path = self._get_db_path(date, db_type="rss")
  183. if not db_path.exists():
  184. return None
  185. return self._get_latest_rss_data_impl(date)
  186. # ========================================
  187. # AI 智能筛选
  188. # ========================================
  189. def get_active_ai_filter_tags(self, date=None, interests_file="ai_interests.txt"):
  190. return self._get_active_tags_impl(date, interests_file)
  191. def get_latest_prompt_hash(self, date=None, interests_file="ai_interests.txt"):
  192. return self._get_latest_prompt_hash_impl(date, interests_file)
  193. def get_latest_ai_filter_tag_version(self, date=None):
  194. return self._get_latest_tag_version_impl(date)
  195. def deprecate_all_ai_filter_tags(self, date=None, interests_file="ai_interests.txt"):
  196. return self._deprecate_all_tags_impl(date, interests_file)
  197. def save_ai_filter_tags(self, tags, version, prompt_hash, date=None, interests_file="ai_interests.txt"):
  198. return self._save_tags_impl(date, tags, version, prompt_hash, interests_file)
  199. def save_ai_filter_results(self, results, date=None):
  200. return self._save_filter_results_impl(date, results)
  201. def get_active_ai_filter_results(self, date=None, interests_file="ai_interests.txt"):
  202. return self._get_active_filter_results_impl(date, interests_file)
  203. def deprecate_specific_ai_filter_tags(self, tag_ids, date=None):
  204. return self._deprecate_specific_tags_impl(date, tag_ids)
  205. def update_ai_filter_tags_hash(self, interests_file, new_hash, date=None):
  206. return self._update_tags_hash_impl(date, interests_file, new_hash)
  207. def update_ai_filter_tag_descriptions(self, tag_updates, date=None, interests_file="ai_interests.txt"):
  208. return self._update_tag_descriptions_impl(date, tag_updates, interests_file)
  209. def update_ai_filter_tag_priorities(self, tag_priorities, date=None, interests_file="ai_interests.txt"):
  210. return self._update_tag_priorities_impl(date, tag_priorities, interests_file)
  211. def save_analyzed_news(self, news_ids, source_type, interests_file, prompt_hash, matched_ids, date=None):
  212. return self._save_analyzed_news_impl(date, news_ids, source_type, interests_file, prompt_hash, matched_ids)
  213. def get_analyzed_news_ids(self, source_type="hotlist", date=None, interests_file="ai_interests.txt"):
  214. return self._get_analyzed_news_ids_impl(date, source_type, interests_file)
  215. def clear_analyzed_news(self, date=None, interests_file="ai_interests.txt"):
  216. return self._clear_analyzed_news_impl(date, interests_file)
  217. def clear_unmatched_analyzed_news(self, date=None, interests_file="ai_interests.txt"):
  218. return self._clear_unmatched_analyzed_news_impl(date, interests_file)
  219. def get_all_news_ids(self, date=None):
  220. return self._get_all_news_ids_impl(date)
  221. def get_all_rss_ids(self, date=None):
  222. return self._get_all_rss_ids_impl(date)
  223. # ========================================
  224. # 本地特有功能:TXT/HTML 快照
  225. # ========================================
  226. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  227. """
  228. 保存 TXT 快照
  229. 新结构:output/txt/{date}/{time}.txt
  230. Args:
  231. data: 新闻数据
  232. Returns:
  233. 保存的文件路径
  234. """
  235. if not self.enable_txt:
  236. return None
  237. try:
  238. date_folder = self._format_date_folder(data.date)
  239. txt_dir = self.data_dir / "txt" / date_folder
  240. txt_dir.mkdir(parents=True, exist_ok=True)
  241. file_path = txt_dir / f"{data.crawl_time}.txt"
  242. with open(file_path, "w", encoding="utf-8") as f:
  243. for source_id, news_list in data.items.items():
  244. source_name = data.id_to_name.get(source_id, source_id)
  245. # 写入来源标题
  246. if source_name and source_name != source_id:
  247. f.write(f"{source_id} | {source_name}\n")
  248. else:
  249. f.write(f"{source_id}\n")
  250. # 按排名排序
  251. sorted_news = sorted(news_list, key=lambda x: x.rank)
  252. for item in sorted_news:
  253. line = f"{item.rank}. {item.title}"
  254. if item.url:
  255. line += f" [URL:{item.url}]"
  256. if item.mobile_url:
  257. line += f" [MOBILE:{item.mobile_url}]"
  258. f.write(line + "\n")
  259. f.write("\n")
  260. # 写入失败的来源
  261. if data.failed_ids:
  262. f.write("==== 以下ID请求失败 ====\n")
  263. for failed_id in data.failed_ids:
  264. f.write(f"{failed_id}\n")
  265. print(f"[本地存储] TXT 快照已保存: {file_path}")
  266. return str(file_path)
  267. except Exception as e:
  268. print(f"[本地存储] 保存 TXT 快照失败: {e}")
  269. return None
  270. def save_html_report(self, html_content: str, filename: str) -> Optional[str]:
  271. """
  272. 保存 HTML 报告
  273. 新结构:output/html/{date}/{filename}
  274. Args:
  275. html_content: HTML 内容
  276. filename: 文件名
  277. Returns:
  278. 保存的文件路径
  279. """
  280. if not self.enable_html:
  281. return None
  282. try:
  283. date_folder = self._format_date_folder()
  284. html_dir = self.data_dir / "html" / date_folder
  285. html_dir.mkdir(parents=True, exist_ok=True)
  286. file_path = html_dir / filename
  287. with open(file_path, "w", encoding="utf-8") as f:
  288. f.write(html_content)
  289. print(f"[本地存储] HTML 报告已保存: {file_path}")
  290. return str(file_path)
  291. except Exception as e:
  292. print(f"[本地存储] 保存 HTML 报告失败: {e}")
  293. return None
  294. # ========================================
  295. # 本地特有功能:资源清理
  296. # ========================================
  297. def cleanup(self) -> None:
  298. """清理资源(关闭数据库连接)"""
  299. for db_path, conn in self._db_connections.items():
  300. try:
  301. conn.close()
  302. print(f"[本地存储] 关闭数据库连接: {db_path}")
  303. except Exception as e:
  304. print(f"[本地存储] 关闭连接失败 {db_path}: {e}")
  305. self._db_connections.clear()
  306. def cleanup_old_data(self, retention_days: int) -> int:
  307. """
  308. 清理过期数据
  309. 新结构清理逻辑:
  310. - output/news/{date}.db -> 删除过期的 .db 文件
  311. - output/rss/{date}.db -> 删除过期的 .db 文件
  312. - output/txt/{date}/ -> 删除过期的日期目录
  313. - output/html/{date}/ -> 删除过期的日期目录
  314. Args:
  315. retention_days: 保留天数(0 表示不清理)
  316. Returns:
  317. 删除的文件/目录数量
  318. """
  319. if retention_days <= 0:
  320. return 0
  321. deleted_count = 0
  322. cutoff_date = self._get_configured_time() - timedelta(days=retention_days)
  323. def parse_date_from_name(name: str) -> Optional[datetime]:
  324. """从文件名或目录名解析日期 (ISO 格式: YYYY-MM-DD)"""
  325. # 移除 .db 后缀
  326. name = name.replace('.db', '')
  327. try:
  328. date_match = re.match(r'(\d{4})-(\d{2})-(\d{2})', name)
  329. if date_match:
  330. return datetime(
  331. int(date_match.group(1)),
  332. int(date_match.group(2)),
  333. int(date_match.group(3)),
  334. tzinfo=pytz.timezone(self.timezone)
  335. )
  336. except Exception:
  337. pass
  338. return None
  339. try:
  340. if not self.data_dir.exists():
  341. return 0
  342. # 清理数据库文件 (news/, rss/)
  343. for db_type in ["news", "rss"]:
  344. db_dir = self.data_dir / db_type
  345. if not db_dir.exists():
  346. continue
  347. for db_file in db_dir.glob("*.db"):
  348. file_date = parse_date_from_name(db_file.name)
  349. if file_date and file_date < cutoff_date:
  350. # 先关闭数据库连接
  351. db_path = str(db_file)
  352. if db_path in self._db_connections:
  353. try:
  354. self._db_connections[db_path].close()
  355. del self._db_connections[db_path]
  356. except Exception:
  357. pass
  358. # 删除文件
  359. try:
  360. db_file.unlink()
  361. deleted_count += 1
  362. print(f"[本地存储] 清理过期数据: {db_type}/{db_file.name}")
  363. except Exception as e:
  364. print(f"[本地存储] 删除文件失败 {db_file}: {e}")
  365. # 清理快照目录 (txt/, html/)
  366. for snapshot_type in ["txt", "html"]:
  367. snapshot_dir = self.data_dir / snapshot_type
  368. if not snapshot_dir.exists():
  369. continue
  370. for date_folder in snapshot_dir.iterdir():
  371. if not date_folder.is_dir() or date_folder.name.startswith('.'):
  372. continue
  373. folder_date = parse_date_from_name(date_folder.name)
  374. if folder_date and folder_date < cutoff_date:
  375. try:
  376. shutil.rmtree(date_folder)
  377. deleted_count += 1
  378. print(f"[本地存储] 清理过期数据: {snapshot_type}/{date_folder.name}")
  379. except Exception as e:
  380. print(f"[本地存储] 删除目录失败 {date_folder}: {e}")
  381. if deleted_count > 0:
  382. print(f"[本地存储] 共清理 {deleted_count} 个过期文件/目录")
  383. return deleted_count
  384. except Exception as e:
  385. print(f"[本地存储] 清理过期数据失败: {e}")
  386. return deleted_count
  387. def __del__(self):
  388. """析构函数,确保关闭连接"""
  389. self.cleanup()