context.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. # coding=utf-8
  2. """
  3. 应用上下文模块
  4. 提供配置上下文类,封装所有依赖配置的操作,消除全局状态和包装函数。
  5. """
  6. from datetime import datetime
  7. from pathlib import Path
  8. from typing import Any, Callable, Dict, List, Optional, Tuple
  9. from trendradar.utils.time import (
  10. get_configured_time,
  11. format_date_folder,
  12. format_time_filename,
  13. get_current_time_display,
  14. convert_time_for_display,
  15. )
  16. from trendradar.core import (
  17. load_frequency_words,
  18. matches_word_groups,
  19. save_titles_to_file,
  20. read_all_today_titles,
  21. detect_latest_new_titles,
  22. is_first_crawl_today,
  23. count_word_frequency,
  24. )
  25. from trendradar.report import (
  26. clean_title,
  27. prepare_report_data,
  28. generate_html_report,
  29. render_html_content,
  30. )
  31. from trendradar.notification import (
  32. render_feishu_content,
  33. render_dingtalk_content,
  34. split_content_into_batches,
  35. NotificationDispatcher,
  36. PushRecordManager,
  37. )
  38. from trendradar.storage import get_storage_manager
  39. class AppContext:
  40. """
  41. 应用上下文类
  42. 封装所有依赖配置的操作,提供统一的接口。
  43. 消除对全局 CONFIG 的依赖,提高可测试性。
  44. 使用示例:
  45. config = load_config()
  46. ctx = AppContext(config)
  47. # 时间操作
  48. now = ctx.get_time()
  49. date_folder = ctx.format_date()
  50. # 存储操作
  51. storage = ctx.get_storage_manager()
  52. # 报告生成
  53. html = ctx.generate_html_report(stats, total_titles, ...)
  54. """
  55. def __init__(self, config: Dict[str, Any]):
  56. """
  57. 初始化应用上下文
  58. Args:
  59. config: 完整的配置字典
  60. """
  61. self.config = config
  62. self._storage_manager = None
  63. # === 配置访问 ===
  64. @property
  65. def timezone(self) -> str:
  66. """获取配置的时区"""
  67. return self.config.get("TIMEZONE", "Asia/Shanghai")
  68. @property
  69. def rank_threshold(self) -> int:
  70. """获取排名阈值"""
  71. return self.config.get("RANK_THRESHOLD", 50)
  72. @property
  73. def weight_config(self) -> Dict:
  74. """获取权重配置"""
  75. return self.config.get("WEIGHT_CONFIG", {})
  76. @property
  77. def platforms(self) -> List[Dict]:
  78. """获取平台配置列表"""
  79. return self.config.get("PLATFORMS", [])
  80. @property
  81. def platform_ids(self) -> List[str]:
  82. """获取平台ID列表"""
  83. return [p["id"] for p in self.platforms]
  84. @property
  85. def rss_config(self) -> Dict:
  86. """获取 RSS 配置"""
  87. return self.config.get("RSS", {})
  88. @property
  89. def rss_enabled(self) -> bool:
  90. """RSS 是否启用"""
  91. return self.rss_config.get("ENABLED", False)
  92. @property
  93. def rss_feeds(self) -> List[Dict]:
  94. """获取 RSS 源列表"""
  95. return self.rss_config.get("FEEDS", [])
  96. # === 时间操作 ===
  97. def get_time(self) -> datetime:
  98. """获取当前配置时区的时间"""
  99. return get_configured_time(self.timezone)
  100. def format_date(self) -> str:
  101. """格式化日期文件夹 (YYYY-MM-DD)"""
  102. return format_date_folder(timezone=self.timezone)
  103. def format_time(self) -> str:
  104. """格式化时间文件名 (HH-MM)"""
  105. return format_time_filename(self.timezone)
  106. def get_time_display(self) -> str:
  107. """获取时间显示 (HH:MM)"""
  108. return get_current_time_display(self.timezone)
  109. @staticmethod
  110. def convert_time_display(time_str: str) -> str:
  111. """将 HH-MM 转换为 HH:MM"""
  112. return convert_time_for_display(time_str)
  113. # === 存储操作 ===
  114. def get_storage_manager(self):
  115. """获取存储管理器(延迟初始化,单例)"""
  116. if self._storage_manager is None:
  117. storage_config = self.config.get("STORAGE", {})
  118. remote_config = storage_config.get("REMOTE", {})
  119. local_config = storage_config.get("LOCAL", {})
  120. pull_config = storage_config.get("PULL", {})
  121. self._storage_manager = get_storage_manager(
  122. backend_type=storage_config.get("BACKEND", "auto"),
  123. data_dir=local_config.get("DATA_DIR", "output"),
  124. enable_txt=storage_config.get("FORMATS", {}).get("TXT", True),
  125. enable_html=storage_config.get("FORMATS", {}).get("HTML", True),
  126. remote_config={
  127. "bucket_name": remote_config.get("BUCKET_NAME", ""),
  128. "access_key_id": remote_config.get("ACCESS_KEY_ID", ""),
  129. "secret_access_key": remote_config.get("SECRET_ACCESS_KEY", ""),
  130. "endpoint_url": remote_config.get("ENDPOINT_URL", ""),
  131. "region": remote_config.get("REGION", ""),
  132. },
  133. local_retention_days=local_config.get("RETENTION_DAYS", 0),
  134. remote_retention_days=remote_config.get("RETENTION_DAYS", 0),
  135. pull_enabled=pull_config.get("ENABLED", False),
  136. pull_days=pull_config.get("DAYS", 7),
  137. timezone=self.timezone,
  138. )
  139. return self._storage_manager
  140. def get_output_path(self, subfolder: str, filename: str) -> str:
  141. """获取输出路径"""
  142. output_dir = Path("output") / self.format_date() / subfolder
  143. output_dir.mkdir(parents=True, exist_ok=True)
  144. return str(output_dir / filename)
  145. # === 数据处理 ===
  146. def save_titles(self, results: Dict, id_to_name: Dict, failed_ids: List) -> str:
  147. """保存标题到文件"""
  148. output_path = self.get_output_path("txt", f"{self.format_time()}.txt")
  149. return save_titles_to_file(results, id_to_name, failed_ids, output_path, clean_title)
  150. def read_today_titles(
  151. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  152. ) -> Tuple[Dict, Dict, Dict]:
  153. """读取当天所有标题"""
  154. return read_all_today_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  155. def detect_new_titles(
  156. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  157. ) -> Dict:
  158. """检测最新批次的新增标题"""
  159. return detect_latest_new_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  160. def is_first_crawl(self) -> bool:
  161. """检测是否是当天第一次爬取"""
  162. return self.get_storage_manager().is_first_crawl_today()
  163. # === 频率词处理 ===
  164. def load_frequency_words(
  165. self, frequency_file: Optional[str] = None
  166. ) -> Tuple[List[Dict], List[str], List[str]]:
  167. """加载频率词配置"""
  168. return load_frequency_words(frequency_file)
  169. def matches_word_groups(
  170. self,
  171. title: str,
  172. word_groups: List[Dict],
  173. filter_words: List[str],
  174. global_filters: Optional[List[str]] = None,
  175. ) -> bool:
  176. """检查标题是否匹配词组规则"""
  177. return matches_word_groups(title, word_groups, filter_words, global_filters)
  178. # === 统计分析 ===
  179. def count_frequency(
  180. self,
  181. results: Dict,
  182. word_groups: List[Dict],
  183. filter_words: List[str],
  184. id_to_name: Dict,
  185. title_info: Optional[Dict] = None,
  186. new_titles: Optional[Dict] = None,
  187. mode: str = "daily",
  188. global_filters: Optional[List[str]] = None,
  189. quiet: bool = False,
  190. ) -> Tuple[List[Dict], int]:
  191. """统计词频"""
  192. return count_word_frequency(
  193. results=results,
  194. word_groups=word_groups,
  195. filter_words=filter_words,
  196. id_to_name=id_to_name,
  197. title_info=title_info,
  198. rank_threshold=self.rank_threshold,
  199. new_titles=new_titles,
  200. mode=mode,
  201. global_filters=global_filters,
  202. weight_config=self.weight_config,
  203. max_news_per_keyword=self.config.get("MAX_NEWS_PER_KEYWORD", 0),
  204. sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False),
  205. is_first_crawl_func=self.is_first_crawl,
  206. convert_time_func=self.convert_time_display,
  207. quiet=quiet,
  208. )
  209. # === 报告生成 ===
  210. def prepare_report(
  211. self,
  212. stats: List[Dict],
  213. failed_ids: Optional[List] = None,
  214. new_titles: Optional[Dict] = None,
  215. id_to_name: Optional[Dict] = None,
  216. mode: str = "daily",
  217. ) -> Dict:
  218. """准备报告数据"""
  219. return prepare_report_data(
  220. stats=stats,
  221. failed_ids=failed_ids,
  222. new_titles=new_titles,
  223. id_to_name=id_to_name,
  224. mode=mode,
  225. rank_threshold=self.rank_threshold,
  226. matches_word_groups_func=self.matches_word_groups,
  227. load_frequency_words_func=self.load_frequency_words,
  228. )
  229. def generate_html(
  230. self,
  231. stats: List[Dict],
  232. total_titles: int,
  233. failed_ids: Optional[List] = None,
  234. new_titles: Optional[Dict] = None,
  235. id_to_name: Optional[Dict] = None,
  236. mode: str = "daily",
  237. is_daily_summary: bool = False,
  238. update_info: Optional[Dict] = None,
  239. ) -> str:
  240. """生成HTML报告"""
  241. return generate_html_report(
  242. stats=stats,
  243. total_titles=total_titles,
  244. failed_ids=failed_ids,
  245. new_titles=new_titles,
  246. id_to_name=id_to_name,
  247. mode=mode,
  248. is_daily_summary=is_daily_summary,
  249. update_info=update_info,
  250. rank_threshold=self.rank_threshold,
  251. output_dir="output",
  252. date_folder=self.format_date(),
  253. time_filename=self.format_time(),
  254. render_html_func=lambda *args, **kwargs: self.render_html(*args, **kwargs),
  255. matches_word_groups_func=self.matches_word_groups,
  256. load_frequency_words_func=self.load_frequency_words,
  257. enable_index_copy=True,
  258. )
  259. def render_html(
  260. self,
  261. report_data: Dict,
  262. total_titles: int,
  263. is_daily_summary: bool = False,
  264. mode: str = "daily",
  265. update_info: Optional[Dict] = None,
  266. ) -> str:
  267. """渲染HTML内容"""
  268. return render_html_content(
  269. report_data=report_data,
  270. total_titles=total_titles,
  271. is_daily_summary=is_daily_summary,
  272. mode=mode,
  273. update_info=update_info,
  274. reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False),
  275. get_time_func=self.get_time,
  276. )
  277. # === 通知内容渲染 ===
  278. def render_feishu(
  279. self,
  280. report_data: Dict,
  281. update_info: Optional[Dict] = None,
  282. mode: str = "daily",
  283. ) -> str:
  284. """渲染飞书内容"""
  285. return render_feishu_content(
  286. report_data=report_data,
  287. update_info=update_info,
  288. mode=mode,
  289. separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  290. reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False),
  291. get_time_func=self.get_time,
  292. )
  293. def render_dingtalk(
  294. self,
  295. report_data: Dict,
  296. update_info: Optional[Dict] = None,
  297. mode: str = "daily",
  298. ) -> str:
  299. """渲染钉钉内容"""
  300. return render_dingtalk_content(
  301. report_data=report_data,
  302. update_info=update_info,
  303. mode=mode,
  304. reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False),
  305. get_time_func=self.get_time,
  306. )
  307. def split_content(
  308. self,
  309. report_data: Dict,
  310. format_type: str,
  311. update_info: Optional[Dict] = None,
  312. max_bytes: Optional[int] = None,
  313. mode: str = "daily",
  314. rss_items: Optional[list] = None,
  315. rss_new_items: Optional[list] = None,
  316. ) -> List[str]:
  317. """分批处理消息内容(支持热榜+RSS合并)
  318. Args:
  319. report_data: 报告数据
  320. format_type: 格式类型
  321. update_info: 更新信息
  322. max_bytes: 最大字节数
  323. mode: 报告模式
  324. rss_items: RSS 统计条目列表
  325. rss_new_items: RSS 新增条目列表
  326. Returns:
  327. 分批后的消息内容列表
  328. """
  329. return split_content_into_batches(
  330. report_data=report_data,
  331. format_type=format_type,
  332. update_info=update_info,
  333. max_bytes=max_bytes,
  334. mode=mode,
  335. batch_sizes={
  336. "dingtalk": self.config.get("DINGTALK_BATCH_SIZE", 20000),
  337. "feishu": self.config.get("FEISHU_BATCH_SIZE", 29000),
  338. "default": self.config.get("MESSAGE_BATCH_SIZE", 4000),
  339. },
  340. feishu_separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  341. reverse_content_order=self.config.get("REVERSE_CONTENT_ORDER", False),
  342. get_time_func=self.get_time,
  343. rss_items=rss_items,
  344. rss_new_items=rss_new_items,
  345. timezone=self.config.get("TIMEZONE", "Asia/Shanghai"),
  346. )
  347. # === 通知发送 ===
  348. def create_notification_dispatcher(self) -> NotificationDispatcher:
  349. """创建通知调度器"""
  350. return NotificationDispatcher(
  351. config=self.config,
  352. get_time_func=self.get_time,
  353. split_content_func=self.split_content,
  354. )
  355. def create_push_manager(self) -> PushRecordManager:
  356. """创建推送记录管理器"""
  357. return PushRecordManager(
  358. storage_backend=self.get_storage_manager(),
  359. get_time_func=self.get_time,
  360. )
  361. # === 资源清理 ===
  362. def cleanup(self):
  363. """清理资源"""
  364. if self._storage_manager:
  365. self._storage_manager.cleanup_old_data()
  366. self._storage_manager.cleanup()
  367. self._storage_manager = None