manager.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. # coding=utf-8
  2. """
  3. 存储管理器 - 统一管理存储后端
  4. 根据环境和配置自动选择合适的存储后端
  5. """
  6. import os
  7. from typing import Optional
  8. from trendradar.storage.base import StorageBackend, NewsData, RSSData
  9. from trendradar.utils.time import DEFAULT_TIMEZONE
  10. # 存储管理器单例
  11. _storage_manager: Optional["StorageManager"] = None
  12. class StorageManager:
  13. """
  14. 存储管理器
  15. 功能:
  16. - 自动检测运行环境(GitHub Actions / Docker / 本地)
  17. - 根据配置选择存储后端(local / remote / auto)
  18. - 提供统一的存储接口
  19. - 支持从远程拉取数据到本地
  20. """
  21. def __init__(
  22. self,
  23. backend_type: str = "auto",
  24. data_dir: str = "output",
  25. enable_txt: bool = True,
  26. enable_html: bool = True,
  27. remote_config: Optional[dict] = None,
  28. local_retention_days: int = 0,
  29. remote_retention_days: int = 0,
  30. pull_enabled: bool = False,
  31. pull_days: int = 0,
  32. timezone: str = DEFAULT_TIMEZONE,
  33. ):
  34. """
  35. 初始化存储管理器
  36. Args:
  37. backend_type: 存储后端类型 (local / remote / auto)
  38. data_dir: 本地数据目录
  39. enable_txt: 是否启用 TXT 快照
  40. enable_html: 是否启用 HTML 报告
  41. remote_config: 远程存储配置(endpoint_url, bucket_name, access_key_id 等)
  42. local_retention_days: 本地数据保留天数(0 = 无限制)
  43. remote_retention_days: 远程数据保留天数(0 = 无限制)
  44. pull_enabled: 是否启用启动时自动拉取
  45. pull_days: 拉取最近 N 天的数据
  46. timezone: 时区配置
  47. """
  48. self.backend_type = backend_type
  49. self.data_dir = data_dir
  50. self.enable_txt = enable_txt
  51. self.enable_html = enable_html
  52. self.remote_config = remote_config or {}
  53. self.local_retention_days = local_retention_days
  54. self.remote_retention_days = remote_retention_days
  55. self.pull_enabled = pull_enabled
  56. self.pull_days = pull_days
  57. self.timezone = timezone
  58. self._backend: Optional[StorageBackend] = None
  59. self._remote_backend: Optional[StorageBackend] = None
  60. @staticmethod
  61. def is_github_actions() -> bool:
  62. """检测是否在 GitHub Actions 环境中运行"""
  63. return os.environ.get("GITHUB_ACTIONS") == "true"
  64. @staticmethod
  65. def is_docker() -> bool:
  66. """检测是否在 Docker 容器中运行"""
  67. # 方法1: 检查 /.dockerenv 文件
  68. if os.path.exists("/.dockerenv"):
  69. return True
  70. # 方法2: 检查 cgroup(Linux)
  71. try:
  72. with open("/proc/1/cgroup", "r") as f:
  73. return "docker" in f.read()
  74. except (FileNotFoundError, PermissionError):
  75. pass
  76. # 方法3: 检查环境变量
  77. return os.environ.get("DOCKER_CONTAINER") == "true"
  78. def _resolve_backend_type(self) -> str:
  79. """解析实际使用的后端类型"""
  80. if self.backend_type == "auto":
  81. if self.is_github_actions():
  82. # GitHub Actions 环境,检查是否配置了远程存储
  83. if self._has_remote_config():
  84. return "remote"
  85. else:
  86. print("[存储管理器] GitHub Actions 环境但未配置远程存储,使用本地存储")
  87. return "local"
  88. else:
  89. return "local"
  90. return self.backend_type
  91. def _has_remote_config(self) -> bool:
  92. """检查是否有有效的远程存储配置"""
  93. # 检查配置或环境变量
  94. bucket_name = self.remote_config.get("bucket_name") or os.environ.get("S3_BUCKET_NAME")
  95. access_key = self.remote_config.get("access_key_id") or os.environ.get("S3_ACCESS_KEY_ID")
  96. secret_key = self.remote_config.get("secret_access_key") or os.environ.get("S3_SECRET_ACCESS_KEY")
  97. endpoint = self.remote_config.get("endpoint_url") or os.environ.get("S3_ENDPOINT_URL")
  98. # 调试日志
  99. has_config = bool(bucket_name and access_key and secret_key and endpoint)
  100. if not has_config:
  101. print(f"[存储管理器] 远程存储配置检查失败:")
  102. print(f" - bucket_name: {'已配置' if bucket_name else '未配置'}")
  103. print(f" - access_key_id: {'已配置' if access_key else '未配置'}")
  104. print(f" - secret_access_key: {'已配置' if secret_key else '未配置'}")
  105. print(f" - endpoint_url: {'已配置' if endpoint else '未配置'}")
  106. return has_config
  107. def _create_remote_backend(self) -> Optional[StorageBackend]:
  108. """创建远程存储后端"""
  109. try:
  110. from trendradar.storage.remote import RemoteStorageBackend
  111. return RemoteStorageBackend(
  112. bucket_name=self.remote_config.get("bucket_name") or os.environ.get("S3_BUCKET_NAME", ""),
  113. access_key_id=self.remote_config.get("access_key_id") or os.environ.get("S3_ACCESS_KEY_ID", ""),
  114. secret_access_key=self.remote_config.get("secret_access_key") or os.environ.get("S3_SECRET_ACCESS_KEY", ""),
  115. endpoint_url=self.remote_config.get("endpoint_url") or os.environ.get("S3_ENDPOINT_URL", ""),
  116. region=self.remote_config.get("region") or os.environ.get("S3_REGION", ""),
  117. enable_txt=self.enable_txt,
  118. enable_html=self.enable_html,
  119. timezone=self.timezone,
  120. )
  121. except ImportError as e:
  122. print(f"[存储管理器] 远程后端导入失败: {e}")
  123. print("[存储管理器] 请确保已安装 boto3: pip install boto3")
  124. return None
  125. except Exception as e:
  126. print(f"[存储管理器] 远程后端初始化失败: {e}")
  127. return None
  128. def get_backend(self) -> StorageBackend:
  129. """获取存储后端实例"""
  130. if self._backend is None:
  131. resolved_type = self._resolve_backend_type()
  132. if resolved_type == "remote":
  133. self._backend = self._create_remote_backend()
  134. if self._backend:
  135. print(f"[存储管理器] 使用远程存储后端")
  136. else:
  137. print("[存储管理器] 回退到本地存储")
  138. resolved_type = "local"
  139. if resolved_type == "local" or self._backend is None:
  140. from trendradar.storage.local import LocalStorageBackend
  141. self._backend = LocalStorageBackend(
  142. data_dir=self.data_dir,
  143. enable_txt=self.enable_txt,
  144. enable_html=self.enable_html,
  145. timezone=self.timezone,
  146. )
  147. print(f"[存储管理器] 使用本地存储后端 (数据目录: {self.data_dir})")
  148. return self._backend
  149. def pull_from_remote(self) -> int:
  150. """
  151. 从远程拉取数据到本地
  152. Returns:
  153. 成功拉取的文件数量
  154. """
  155. if not self.pull_enabled or self.pull_days <= 0:
  156. return 0
  157. if not self._has_remote_config():
  158. print("[存储管理器] 未配置远程存储,无法拉取")
  159. return 0
  160. # 创建远程后端(如果还没有)
  161. if self._remote_backend is None:
  162. self._remote_backend = self._create_remote_backend()
  163. if self._remote_backend is None:
  164. print("[存储管理器] 无法创建远程后端,拉取失败")
  165. return 0
  166. # 调用拉取方法
  167. return self._remote_backend.pull_recent_days(self.pull_days, self.data_dir)
  168. def save_news_data(self, data: NewsData) -> bool:
  169. """保存新闻数据"""
  170. return self.get_backend().save_news_data(data)
  171. def save_rss_data(self, data: RSSData) -> bool:
  172. """保存 RSS 数据"""
  173. return self.get_backend().save_rss_data(data)
  174. def get_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  175. """获取指定日期的所有 RSS 数据(当日汇总模式)"""
  176. return self.get_backend().get_rss_data(date)
  177. def get_latest_rss_data(self, date: Optional[str] = None) -> Optional[RSSData]:
  178. """获取最新一次抓取的 RSS 数据(当前榜单模式)"""
  179. return self.get_backend().get_latest_rss_data(date)
  180. def detect_new_rss_items(self, current_data: RSSData) -> dict:
  181. """检测新增的 RSS 条目(增量模式)"""
  182. return self.get_backend().detect_new_rss_items(current_data)
  183. def get_today_all_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  184. """获取当天所有数据"""
  185. return self.get_backend().get_today_all_data(date)
  186. def get_latest_crawl_data(self, date: Optional[str] = None) -> Optional[NewsData]:
  187. """获取最新抓取数据"""
  188. return self.get_backend().get_latest_crawl_data(date)
  189. def detect_new_titles(self, current_data: NewsData) -> dict:
  190. """检测新增标题"""
  191. return self.get_backend().detect_new_titles(current_data)
  192. def save_txt_snapshot(self, data: NewsData) -> Optional[str]:
  193. """保存 TXT 快照"""
  194. return self.get_backend().save_txt_snapshot(data)
  195. def save_html_report(self, html_content: str, filename: str) -> Optional[str]:
  196. """保存 HTML 报告"""
  197. return self.get_backend().save_html_report(html_content, filename)
  198. def is_first_crawl_today(self, date: Optional[str] = None) -> bool:
  199. """检查是否是当天第一次抓取"""
  200. return self.get_backend().is_first_crawl_today(date)
  201. def cleanup(self) -> None:
  202. """清理资源"""
  203. if self._backend:
  204. self._backend.cleanup()
  205. if self._remote_backend:
  206. self._remote_backend.cleanup()
  207. def cleanup_old_data(self) -> int:
  208. """
  209. 清理过期数据
  210. Returns:
  211. 删除的日期目录数量
  212. """
  213. total_deleted = 0
  214. # 清理本地数据
  215. if self.local_retention_days > 0:
  216. total_deleted += self.get_backend().cleanup_old_data(self.local_retention_days)
  217. # 清理远程数据(如果配置了)
  218. if self.remote_retention_days > 0 and self._has_remote_config():
  219. if self._remote_backend is None:
  220. self._remote_backend = self._create_remote_backend()
  221. if self._remote_backend:
  222. total_deleted += self._remote_backend.cleanup_old_data(self.remote_retention_days)
  223. return total_deleted
  224. @property
  225. def backend_name(self) -> str:
  226. """获取当前后端名称"""
  227. return self.get_backend().backend_name
  228. @property
  229. def supports_txt(self) -> bool:
  230. """是否支持 TXT 快照"""
  231. return self.get_backend().supports_txt
  232. def has_period_executed(self, date_str: str, period_key: str, action: str) -> bool:
  233. """检查指定时间段的某个 action 是否已执行"""
  234. return self.get_backend().has_period_executed(date_str, period_key, action)
  235. def record_period_execution(self, date_str: str, period_key: str, action: str) -> bool:
  236. """记录时间段的 action 执行"""
  237. return self.get_backend().record_period_execution(date_str, period_key, action)
  238. # === AI 智能筛选存储操作 ===
  239. def begin_batch(self):
  240. """开启批量模式(远程后端延迟上传)"""
  241. self.get_backend().begin_batch()
  242. def end_batch(self):
  243. """结束批量模式(统一上传脏数据库)"""
  244. self.get_backend().end_batch()
  245. def get_active_ai_filter_tags(self, date=None, interests_file="ai_interests.txt"):
  246. """获取指定兴趣文件的 active 标签"""
  247. return self.get_backend().get_active_ai_filter_tags(date, interests_file)
  248. def get_latest_prompt_hash(self, date=None, interests_file="ai_interests.txt"):
  249. """获取指定兴趣文件的最新 prompt_hash"""
  250. return self.get_backend().get_latest_prompt_hash(date, interests_file)
  251. def get_latest_ai_filter_tag_version(self, date=None):
  252. """获取最新标签版本号"""
  253. return self.get_backend().get_latest_ai_filter_tag_version(date)
  254. def deprecate_all_ai_filter_tags(self, date=None, interests_file="ai_interests.txt"):
  255. """废弃指定兴趣文件的 active 标签和分类结果"""
  256. return self.get_backend().deprecate_all_ai_filter_tags(date, interests_file)
  257. def save_ai_filter_tags(self, tags, version, prompt_hash, date=None, interests_file="ai_interests.txt"):
  258. """保存新提取的标签"""
  259. return self.get_backend().save_ai_filter_tags(tags, version, prompt_hash, date, interests_file)
  260. def save_ai_filter_results(self, results, date=None):
  261. """保存分类结果"""
  262. return self.get_backend().save_ai_filter_results(results, date)
  263. def get_active_ai_filter_results(self, date=None, interests_file="ai_interests.txt"):
  264. """获取指定兴趣文件的 active 分类结果"""
  265. return self.get_backend().get_active_ai_filter_results(date, interests_file)
  266. def deprecate_specific_ai_filter_tags(self, tag_ids, date=None):
  267. """废弃指定 ID 的标签及其关联分类结果"""
  268. return self.get_backend().deprecate_specific_ai_filter_tags(tag_ids, date)
  269. def update_ai_filter_tags_hash(self, interests_file, new_hash, date=None):
  270. """更新指定兴趣文件所有 active 标签的 prompt_hash"""
  271. return self.get_backend().update_ai_filter_tags_hash(interests_file, new_hash, date)
  272. def update_ai_filter_tag_descriptions(self, tag_updates, date=None, interests_file="ai_interests.txt"):
  273. """按 tag 名匹配,更新 active 标签的 description"""
  274. return self.get_backend().update_ai_filter_tag_descriptions(tag_updates, date, interests_file)
  275. def update_ai_filter_tag_priorities(self, tag_priorities, date=None, interests_file="ai_interests.txt"):
  276. """按 tag 名匹配,更新 active 标签的 priority"""
  277. return self.get_backend().update_ai_filter_tag_priorities(tag_priorities, date, interests_file)
  278. def save_analyzed_news(self, news_ids, source_type, interests_file, prompt_hash, matched_ids, date=None):
  279. """批量记录已分析的新闻(匹配与不匹配都记录)"""
  280. return self.get_backend().save_analyzed_news(news_ids, source_type, interests_file, prompt_hash, matched_ids, date)
  281. def get_analyzed_news_ids(self, source_type="hotlist", date=None, interests_file="ai_interests.txt"):
  282. """获取已分析过的新闻 ID 集合"""
  283. return self.get_backend().get_analyzed_news_ids(source_type, date, interests_file)
  284. def clear_analyzed_news(self, date=None, interests_file="ai_interests.txt"):
  285. """清除指定兴趣文件的所有已分析记录"""
  286. return self.get_backend().clear_analyzed_news(date, interests_file)
  287. def clear_unmatched_analyzed_news(self, date=None, interests_file="ai_interests.txt"):
  288. """清除不匹配的已分析记录"""
  289. return self.get_backend().clear_unmatched_analyzed_news(date, interests_file)
  290. def get_all_news_ids(self, date=None):
  291. """获取所有新闻 ID 和标题"""
  292. return self.get_backend().get_all_news_ids(date)
  293. def get_all_rss_ids(self, date=None):
  294. """获取所有 RSS ID 和标题"""
  295. return self.get_backend().get_all_rss_ids(date)
  296. def get_storage_manager(
  297. backend_type: str = "auto",
  298. data_dir: str = "output",
  299. enable_txt: bool = True,
  300. enable_html: bool = True,
  301. remote_config: Optional[dict] = None,
  302. local_retention_days: int = 0,
  303. remote_retention_days: int = 0,
  304. pull_enabled: bool = False,
  305. pull_days: int = 0,
  306. timezone: str = DEFAULT_TIMEZONE,
  307. force_new: bool = False,
  308. ) -> StorageManager:
  309. """
  310. 获取存储管理器单例
  311. Args:
  312. backend_type: 存储后端类型
  313. data_dir: 本地数据目录
  314. enable_txt: 是否启用 TXT 快照
  315. enable_html: 是否启用 HTML 报告
  316. remote_config: 远程存储配置
  317. local_retention_days: 本地数据保留天数(0 = 无限制)
  318. remote_retention_days: 远程数据保留天数(0 = 无限制)
  319. pull_enabled: 是否启用启动时自动拉取
  320. pull_days: 拉取最近 N 天的数据
  321. timezone: 时区配置
  322. force_new: 是否强制创建新实例
  323. Returns:
  324. StorageManager 实例
  325. """
  326. global _storage_manager
  327. if _storage_manager is None or force_new:
  328. _storage_manager = StorageManager(
  329. backend_type=backend_type,
  330. data_dir=data_dir,
  331. enable_txt=enable_txt,
  332. enable_html=enable_html,
  333. remote_config=remote_config,
  334. local_retention_days=local_retention_days,
  335. remote_retention_days=remote_retention_days,
  336. pull_enabled=pull_enabled,
  337. pull_days=pull_days,
  338. timezone=timezone,
  339. )
  340. return _storage_manager