context.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. # coding=utf-8
  2. """
  3. 应用上下文模块
  4. 提供配置上下文类,封装所有依赖配置的操作,消除全局状态和包装函数。
  5. """
  6. from datetime import datetime
  7. from pathlib import Path
  8. from typing import Any, Dict, List, Optional, Tuple
  9. from trendradar.utils.time import (
  10. DEFAULT_TIMEZONE,
  11. get_configured_time,
  12. format_date_folder,
  13. format_time_filename,
  14. get_current_time_display,
  15. convert_time_for_display,
  16. )
  17. from trendradar.core import (
  18. load_frequency_words,
  19. matches_word_groups,
  20. read_all_today_titles,
  21. detect_latest_new_titles,
  22. count_word_frequency,
  23. Scheduler,
  24. )
  25. from trendradar.report import (
  26. clean_title,
  27. prepare_report_data,
  28. generate_html_report,
  29. render_html_content,
  30. )
  31. from trendradar.notification import (
  32. render_feishu_content,
  33. render_dingtalk_content,
  34. split_content_into_batches,
  35. NotificationDispatcher,
  36. )
  37. from trendradar.ai import AITranslator
  38. from trendradar.storage import get_storage_manager
  39. class AppContext:
  40. """
  41. 应用上下文类
  42. 封装所有依赖配置的操作,提供统一的接口。
  43. 消除对全局 CONFIG 的依赖,提高可测试性。
  44. 使用示例:
  45. config = load_config()
  46. ctx = AppContext(config)
  47. # 时间操作
  48. now = ctx.get_time()
  49. date_folder = ctx.format_date()
  50. # 存储操作
  51. storage = ctx.get_storage_manager()
  52. # 报告生成
  53. html = ctx.generate_html_report(stats, total_titles, ...)
  54. """
  55. def __init__(self, config: Dict[str, Any]):
  56. """
  57. 初始化应用上下文
  58. Args:
  59. config: 完整的配置字典
  60. """
  61. self.config = config
  62. self._storage_manager = None
  63. self._scheduler = None
  64. # === 配置访问 ===
  65. @property
  66. def timezone(self) -> str:
  67. """获取配置的时区"""
  68. return self.config.get("TIMEZONE", DEFAULT_TIMEZONE)
  69. @property
  70. def rank_threshold(self) -> int:
  71. """获取排名阈值"""
  72. return self.config.get("RANK_THRESHOLD", 50)
  73. @property
  74. def weight_config(self) -> Dict:
  75. """获取权重配置"""
  76. return self.config.get("WEIGHT_CONFIG", {})
  77. @property
  78. def platforms(self) -> List[Dict]:
  79. """获取平台配置列表"""
  80. return self.config.get("PLATFORMS", [])
  81. @property
  82. def platform_ids(self) -> List[str]:
  83. """获取平台ID列表"""
  84. return [p["id"] for p in self.platforms]
  85. @property
  86. def rss_config(self) -> Dict:
  87. """获取 RSS 配置"""
  88. return self.config.get("RSS", {})
  89. @property
  90. def rss_enabled(self) -> bool:
  91. """RSS 是否启用"""
  92. return self.rss_config.get("ENABLED", False)
  93. @property
  94. def rss_feeds(self) -> List[Dict]:
  95. """获取 RSS 源列表"""
  96. return self.rss_config.get("FEEDS", [])
  97. @property
  98. def display_mode(self) -> str:
  99. """获取显示模式 (keyword | platform)"""
  100. return self.config.get("DISPLAY_MODE", "keyword")
  101. @property
  102. def show_new_section(self) -> bool:
  103. """是否显示新增热点区域"""
  104. return self.config.get("DISPLAY", {}).get("REGIONS", {}).get("NEW_ITEMS", True)
  105. @property
  106. def region_order(self) -> List[str]:
  107. """获取区域显示顺序"""
  108. default_order = ["hotlist", "rss", "new_items", "standalone", "ai_analysis"]
  109. return self.config.get("DISPLAY", {}).get("REGION_ORDER", default_order)
  110. # === 时间操作 ===
  111. def get_time(self) -> datetime:
  112. """获取当前配置时区的时间"""
  113. return get_configured_time(self.timezone)
  114. def format_date(self) -> str:
  115. """格式化日期文件夹 (YYYY-MM-DD)"""
  116. return format_date_folder(timezone=self.timezone)
  117. def format_time(self) -> str:
  118. """格式化时间文件名 (HH-MM)"""
  119. return format_time_filename(self.timezone)
  120. def get_time_display(self) -> str:
  121. """获取时间显示 (HH:MM)"""
  122. return get_current_time_display(self.timezone)
  123. @staticmethod
  124. def convert_time_display(time_str: str) -> str:
  125. """将 HH-MM 转换为 HH:MM"""
  126. return convert_time_for_display(time_str)
  127. # === 存储操作 ===
  128. def get_storage_manager(self):
  129. """获取存储管理器(延迟初始化,单例)"""
  130. if self._storage_manager is None:
  131. storage_config = self.config.get("STORAGE", {})
  132. remote_config = storage_config.get("REMOTE", {})
  133. local_config = storage_config.get("LOCAL", {})
  134. pull_config = storage_config.get("PULL", {})
  135. self._storage_manager = get_storage_manager(
  136. backend_type=storage_config.get("BACKEND", "auto"),
  137. data_dir=local_config.get("DATA_DIR", "output"),
  138. enable_txt=storage_config.get("FORMATS", {}).get("TXT", True),
  139. enable_html=storage_config.get("FORMATS", {}).get("HTML", True),
  140. remote_config={
  141. "bucket_name": remote_config.get("BUCKET_NAME", ""),
  142. "access_key_id": remote_config.get("ACCESS_KEY_ID", ""),
  143. "secret_access_key": remote_config.get("SECRET_ACCESS_KEY", ""),
  144. "endpoint_url": remote_config.get("ENDPOINT_URL", ""),
  145. "region": remote_config.get("REGION", ""),
  146. },
  147. local_retention_days=local_config.get("RETENTION_DAYS", 0),
  148. remote_retention_days=remote_config.get("RETENTION_DAYS", 0),
  149. pull_enabled=pull_config.get("ENABLED", False),
  150. pull_days=pull_config.get("DAYS", 7),
  151. timezone=self.timezone,
  152. )
  153. return self._storage_manager
  154. def get_output_path(self, subfolder: str, filename: str) -> str:
  155. """获取输出路径(扁平化结构:output/类型/日期/文件名)"""
  156. output_dir = Path("output") / subfolder / self.format_date()
  157. output_dir.mkdir(parents=True, exist_ok=True)
  158. return str(output_dir / filename)
  159. # === 数据处理 ===
  160. def read_today_titles(
  161. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  162. ) -> Tuple[Dict, Dict, Dict]:
  163. """读取当天所有标题"""
  164. return read_all_today_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  165. def detect_new_titles(
  166. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  167. ) -> Dict:
  168. """检测最新批次的新增标题"""
  169. return detect_latest_new_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  170. def is_first_crawl(self) -> bool:
  171. """检测是否是当天第一次爬取"""
  172. return self.get_storage_manager().is_first_crawl_today()
  173. # === 频率词处理 ===
  174. def load_frequency_words(
  175. self, frequency_file: Optional[str] = None
  176. ) -> Tuple[List[Dict], List[str], List[str]]:
  177. """加载频率词配置"""
  178. return load_frequency_words(frequency_file)
  179. def matches_word_groups(
  180. self,
  181. title: str,
  182. word_groups: List[Dict],
  183. filter_words: List[str],
  184. global_filters: Optional[List[str]] = None,
  185. ) -> bool:
  186. """检查标题是否匹配词组规则"""
  187. return matches_word_groups(title, word_groups, filter_words, global_filters)
  188. # === 统计分析 ===
  189. def count_frequency(
  190. self,
  191. results: Dict,
  192. word_groups: List[Dict],
  193. filter_words: List[str],
  194. id_to_name: Dict,
  195. title_info: Optional[Dict] = None,
  196. new_titles: Optional[Dict] = None,
  197. mode: str = "daily",
  198. global_filters: Optional[List[str]] = None,
  199. quiet: bool = False,
  200. ) -> Tuple[List[Dict], int]:
  201. """统计词频"""
  202. return count_word_frequency(
  203. results=results,
  204. word_groups=word_groups,
  205. filter_words=filter_words,
  206. id_to_name=id_to_name,
  207. title_info=title_info,
  208. rank_threshold=self.rank_threshold,
  209. new_titles=new_titles,
  210. mode=mode,
  211. global_filters=global_filters,
  212. weight_config=self.weight_config,
  213. max_news_per_keyword=self.config.get("MAX_NEWS_PER_KEYWORD", 0),
  214. sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False),
  215. is_first_crawl_func=self.is_first_crawl,
  216. convert_time_func=self.convert_time_display,
  217. quiet=quiet,
  218. )
  219. # === 报告生成 ===
  220. def prepare_report(
  221. self,
  222. stats: List[Dict],
  223. failed_ids: Optional[List] = None,
  224. new_titles: Optional[Dict] = None,
  225. id_to_name: Optional[Dict] = None,
  226. mode: str = "daily",
  227. ) -> Dict:
  228. """准备报告数据"""
  229. return prepare_report_data(
  230. stats=stats,
  231. failed_ids=failed_ids,
  232. new_titles=new_titles,
  233. id_to_name=id_to_name,
  234. mode=mode,
  235. rank_threshold=self.rank_threshold,
  236. matches_word_groups_func=self.matches_word_groups,
  237. load_frequency_words_func=self.load_frequency_words,
  238. show_new_section=self.show_new_section,
  239. )
  240. def generate_html(
  241. self,
  242. stats: List[Dict],
  243. total_titles: int,
  244. failed_ids: Optional[List] = None,
  245. new_titles: Optional[Dict] = None,
  246. id_to_name: Optional[Dict] = None,
  247. mode: str = "daily",
  248. update_info: Optional[Dict] = None,
  249. rss_items: Optional[List[Dict]] = None,
  250. rss_new_items: Optional[List[Dict]] = None,
  251. ai_analysis: Optional[Any] = None,
  252. standalone_data: Optional[Dict] = None,
  253. ) -> str:
  254. """生成HTML报告"""
  255. return generate_html_report(
  256. stats=stats,
  257. total_titles=total_titles,
  258. failed_ids=failed_ids,
  259. new_titles=new_titles,
  260. id_to_name=id_to_name,
  261. mode=mode,
  262. update_info=update_info,
  263. rank_threshold=self.rank_threshold,
  264. output_dir="output",
  265. date_folder=self.format_date(),
  266. time_filename=self.format_time(),
  267. render_html_func=lambda *args, **kwargs: self.render_html(*args, rss_items=rss_items, rss_new_items=rss_new_items, ai_analysis=ai_analysis, standalone_data=standalone_data, **kwargs),
  268. matches_word_groups_func=self.matches_word_groups,
  269. load_frequency_words_func=self.load_frequency_words,
  270. )
  271. def render_html(
  272. self,
  273. report_data: Dict,
  274. total_titles: int,
  275. mode: str = "daily",
  276. update_info: Optional[Dict] = None,
  277. rss_items: Optional[List[Dict]] = None,
  278. rss_new_items: Optional[List[Dict]] = None,
  279. ai_analysis: Optional[Any] = None,
  280. standalone_data: Optional[Dict] = None,
  281. ) -> str:
  282. """渲染HTML内容"""
  283. return render_html_content(
  284. report_data=report_data,
  285. total_titles=total_titles,
  286. mode=mode,
  287. update_info=update_info,
  288. region_order=self.region_order,
  289. get_time_func=self.get_time,
  290. rss_items=rss_items,
  291. rss_new_items=rss_new_items,
  292. display_mode=self.display_mode,
  293. ai_analysis=ai_analysis,
  294. show_new_section=self.show_new_section,
  295. standalone_data=standalone_data,
  296. )
  297. # === 通知内容渲染 ===
  298. def render_feishu(
  299. self,
  300. report_data: Dict,
  301. update_info: Optional[Dict] = None,
  302. mode: str = "daily",
  303. ) -> str:
  304. """渲染飞书内容"""
  305. return render_feishu_content(
  306. report_data=report_data,
  307. update_info=update_info,
  308. mode=mode,
  309. separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  310. region_order=self.region_order,
  311. get_time_func=self.get_time,
  312. show_new_section=self.show_new_section,
  313. )
  314. def render_dingtalk(
  315. self,
  316. report_data: Dict,
  317. update_info: Optional[Dict] = None,
  318. mode: str = "daily",
  319. ) -> str:
  320. """渲染钉钉内容"""
  321. return render_dingtalk_content(
  322. report_data=report_data,
  323. update_info=update_info,
  324. mode=mode,
  325. region_order=self.region_order,
  326. get_time_func=self.get_time,
  327. show_new_section=self.show_new_section,
  328. )
  329. def split_content(
  330. self,
  331. report_data: Dict,
  332. format_type: str,
  333. update_info: Optional[Dict] = None,
  334. max_bytes: Optional[int] = None,
  335. mode: str = "daily",
  336. rss_items: Optional[list] = None,
  337. rss_new_items: Optional[list] = None,
  338. ai_content: Optional[str] = None,
  339. standalone_data: Optional[Dict] = None,
  340. ai_stats: Optional[Dict] = None,
  341. report_type: str = "热点分析报告",
  342. ) -> List[str]:
  343. """分批处理消息内容(支持热榜+RSS合并+AI分析+独立展示区)
  344. Args:
  345. report_data: 报告数据
  346. format_type: 格式类型
  347. update_info: 更新信息
  348. max_bytes: 最大字节数
  349. mode: 报告模式
  350. rss_items: RSS 统计条目列表
  351. rss_new_items: RSS 新增条目列表
  352. ai_content: AI 分析内容(已渲染的字符串)
  353. standalone_data: 独立展示区数据
  354. ai_stats: AI 分析统计数据
  355. report_type: 报告类型
  356. Returns:
  357. 分批后的消息内容列表
  358. """
  359. return split_content_into_batches(
  360. report_data=report_data,
  361. format_type=format_type,
  362. update_info=update_info,
  363. max_bytes=max_bytes,
  364. mode=mode,
  365. batch_sizes={
  366. "dingtalk": self.config.get("DINGTALK_BATCH_SIZE", 20000),
  367. "feishu": self.config.get("FEISHU_BATCH_SIZE", 29000),
  368. "default": self.config.get("MESSAGE_BATCH_SIZE", 4000),
  369. },
  370. feishu_separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  371. region_order=self.region_order,
  372. get_time_func=self.get_time,
  373. rss_items=rss_items,
  374. rss_new_items=rss_new_items,
  375. timezone=self.config.get("TIMEZONE", DEFAULT_TIMEZONE),
  376. display_mode=self.display_mode,
  377. ai_content=ai_content,
  378. standalone_data=standalone_data,
  379. rank_threshold=self.rank_threshold,
  380. ai_stats=ai_stats,
  381. report_type=report_type,
  382. show_new_section=self.show_new_section,
  383. )
  384. # === 通知发送 ===
  385. def create_notification_dispatcher(self) -> NotificationDispatcher:
  386. """创建通知调度器"""
  387. # 创建翻译器(如果启用)
  388. translator = None
  389. trans_config = self.config.get("AI_TRANSLATION", {})
  390. if trans_config.get("ENABLED", False):
  391. ai_config = self.config.get("AI", {})
  392. translator = AITranslator(trans_config, ai_config)
  393. return NotificationDispatcher(
  394. config=self.config,
  395. get_time_func=self.get_time,
  396. split_content_func=self.split_content,
  397. translator=translator,
  398. )
  399. def create_scheduler(self) -> Scheduler:
  400. """
  401. 创建调度器(延迟初始化,单例)
  402. 基于 config.yaml 的 schedule 段 + timeline.yaml 构建。
  403. """
  404. if self._scheduler is None:
  405. schedule_config = self.config.get("SCHEDULE", {})
  406. timeline_data = self.config.get("_TIMELINE_DATA", {})
  407. self._scheduler = Scheduler(
  408. schedule_config=schedule_config,
  409. timeline_data=timeline_data,
  410. storage_backend=self.get_storage_manager(),
  411. get_time_func=self.get_time,
  412. )
  413. return self._scheduler
  414. # === 资源清理 ===
  415. def cleanup(self):
  416. """清理资源"""
  417. if self._storage_manager:
  418. self._storage_manager.cleanup_old_data()
  419. self._storage_manager.cleanup()
  420. self._storage_manager = None