| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481 |
- # coding=utf-8
- """
- 应用上下文模块
- 提供配置上下文类,封装所有依赖配置的操作,消除全局状态和包装函数。
- """
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- from trendradar.utils.time import (
- DEFAULT_TIMEZONE,
- get_configured_time,
- format_date_folder,
- format_time_filename,
- get_current_time_display,
- convert_time_for_display,
- )
- from trendradar.core import (
- load_frequency_words,
- matches_word_groups,
- read_all_today_titles,
- detect_latest_new_titles,
- count_word_frequency,
- Scheduler,
- )
- from trendradar.report import (
- clean_title,
- prepare_report_data,
- generate_html_report,
- render_html_content,
- )
- from trendradar.notification import (
- render_feishu_content,
- render_dingtalk_content,
- split_content_into_batches,
- NotificationDispatcher,
- )
- from trendradar.ai import AITranslator
- from trendradar.storage import get_storage_manager
- class AppContext:
- """
- 应用上下文类
- 封装所有依赖配置的操作,提供统一的接口。
- 消除对全局 CONFIG 的依赖,提高可测试性。
- 使用示例:
- config = load_config()
- ctx = AppContext(config)
- # 时间操作
- now = ctx.get_time()
- date_folder = ctx.format_date()
- # 存储操作
- storage = ctx.get_storage_manager()
- # 报告生成
- html = ctx.generate_html_report(stats, total_titles, ...)
- """
- def __init__(self, config: Dict[str, Any]):
- """
- 初始化应用上下文
- Args:
- config: 完整的配置字典
- """
- self.config = config
- self._storage_manager = None
- self._scheduler = None
- # === 配置访问 ===
- @property
- def timezone(self) -> str:
- """获取配置的时区"""
- return self.config.get("TIMEZONE", DEFAULT_TIMEZONE)
- @property
- def rank_threshold(self) -> int:
- """获取排名阈值"""
- return self.config.get("RANK_THRESHOLD", 50)
- @property
- def weight_config(self) -> Dict:
- """获取权重配置"""
- return self.config.get("WEIGHT_CONFIG", {})
- @property
- def platforms(self) -> List[Dict]:
- """获取平台配置列表"""
- return self.config.get("PLATFORMS", [])
- @property
- def platform_ids(self) -> List[str]:
- """获取平台ID列表"""
- return [p["id"] for p in self.platforms]
- @property
- def rss_config(self) -> Dict:
- """获取 RSS 配置"""
- return self.config.get("RSS", {})
- @property
- def rss_enabled(self) -> bool:
- """RSS 是否启用"""
- return self.rss_config.get("ENABLED", False)
- @property
- def rss_feeds(self) -> List[Dict]:
- """获取 RSS 源列表"""
- return self.rss_config.get("FEEDS", [])
- @property
- def display_mode(self) -> str:
- """获取显示模式 (keyword | platform)"""
- return self.config.get("DISPLAY_MODE", "keyword")
- @property
- def show_new_section(self) -> bool:
- """是否显示新增热点区域"""
- return self.config.get("DISPLAY", {}).get("REGIONS", {}).get("NEW_ITEMS", True)
- @property
- def region_order(self) -> List[str]:
- """获取区域显示顺序"""
- default_order = ["hotlist", "rss", "new_items", "standalone", "ai_analysis"]
- return self.config.get("DISPLAY", {}).get("REGION_ORDER", default_order)
- # === 时间操作 ===
- def get_time(self) -> datetime:
- """获取当前配置时区的时间"""
- return get_configured_time(self.timezone)
- def format_date(self) -> str:
- """格式化日期文件夹 (YYYY-MM-DD)"""
- return format_date_folder(timezone=self.timezone)
- def format_time(self) -> str:
- """格式化时间文件名 (HH-MM)"""
- return format_time_filename(self.timezone)
- def get_time_display(self) -> str:
- """获取时间显示 (HH:MM)"""
- return get_current_time_display(self.timezone)
- @staticmethod
- def convert_time_display(time_str: str) -> str:
- """将 HH-MM 转换为 HH:MM"""
- return convert_time_for_display(time_str)
- # === 存储操作 ===
- def get_storage_manager(self):
- """获取存储管理器(延迟初始化,单例)"""
- if self._storage_manager is None:
- storage_config = self.config.get("STORAGE", {})
- remote_config = storage_config.get("REMOTE", {})
- local_config = storage_config.get("LOCAL", {})
- pull_config = storage_config.get("PULL", {})
- self._storage_manager = get_storage_manager(
- backend_type=storage_config.get("BACKEND", "auto"),
- data_dir=local_config.get("DATA_DIR", "output"),
- enable_txt=storage_config.get("FORMATS", {}).get("TXT", True),
- enable_html=storage_config.get("FORMATS", {}).get("HTML", True),
- remote_config={
- "bucket_name": remote_config.get("BUCKET_NAME", ""),
- "access_key_id": remote_config.get("ACCESS_KEY_ID", ""),
- "secret_access_key": remote_config.get("SECRET_ACCESS_KEY", ""),
- "endpoint_url": remote_config.get("ENDPOINT_URL", ""),
- "region": remote_config.get("REGION", ""),
- },
- local_retention_days=local_config.get("RETENTION_DAYS", 0),
- remote_retention_days=remote_config.get("RETENTION_DAYS", 0),
- pull_enabled=pull_config.get("ENABLED", False),
- pull_days=pull_config.get("DAYS", 7),
- timezone=self.timezone,
- )
- return self._storage_manager
- def get_output_path(self, subfolder: str, filename: str) -> str:
- """获取输出路径(扁平化结构:output/类型/日期/文件名)"""
- output_dir = Path("output") / subfolder / self.format_date()
- output_dir.mkdir(parents=True, exist_ok=True)
- return str(output_dir / filename)
- # === 数据处理 ===
- def read_today_titles(
- self, platform_ids: Optional[List[str]] = None, quiet: bool = False
- ) -> Tuple[Dict, Dict, Dict]:
- """读取当天所有标题"""
- return read_all_today_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
- def detect_new_titles(
- self, platform_ids: Optional[List[str]] = None, quiet: bool = False
- ) -> Dict:
- """检测最新批次的新增标题"""
- return detect_latest_new_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
- def is_first_crawl(self) -> bool:
- """检测是否是当天第一次爬取"""
- return self.get_storage_manager().is_first_crawl_today()
- # === 频率词处理 ===
- def load_frequency_words(
- self, frequency_file: Optional[str] = None
- ) -> Tuple[List[Dict], List[str], List[str]]:
- """加载频率词配置"""
- return load_frequency_words(frequency_file)
- def matches_word_groups(
- self,
- title: str,
- word_groups: List[Dict],
- filter_words: List[str],
- global_filters: Optional[List[str]] = None,
- ) -> bool:
- """检查标题是否匹配词组规则"""
- return matches_word_groups(title, word_groups, filter_words, global_filters)
- # === 统计分析 ===
- def count_frequency(
- self,
- results: Dict,
- word_groups: List[Dict],
- filter_words: List[str],
- id_to_name: Dict,
- title_info: Optional[Dict] = None,
- new_titles: Optional[Dict] = None,
- mode: str = "daily",
- global_filters: Optional[List[str]] = None,
- quiet: bool = False,
- ) -> Tuple[List[Dict], int]:
- """统计词频"""
- return count_word_frequency(
- results=results,
- word_groups=word_groups,
- filter_words=filter_words,
- id_to_name=id_to_name,
- title_info=title_info,
- rank_threshold=self.rank_threshold,
- new_titles=new_titles,
- mode=mode,
- global_filters=global_filters,
- weight_config=self.weight_config,
- max_news_per_keyword=self.config.get("MAX_NEWS_PER_KEYWORD", 0),
- sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False),
- is_first_crawl_func=self.is_first_crawl,
- convert_time_func=self.convert_time_display,
- quiet=quiet,
- )
- # === 报告生成 ===
- def prepare_report(
- self,
- stats: List[Dict],
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- mode: str = "daily",
- ) -> Dict:
- """准备报告数据"""
- return prepare_report_data(
- stats=stats,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- mode=mode,
- rank_threshold=self.rank_threshold,
- matches_word_groups_func=self.matches_word_groups,
- load_frequency_words_func=self.load_frequency_words,
- show_new_section=self.show_new_section,
- )
- def generate_html(
- self,
- stats: List[Dict],
- total_titles: int,
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- mode: str = "daily",
- update_info: Optional[Dict] = None,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ai_analysis: Optional[Any] = None,
- standalone_data: Optional[Dict] = None,
- ) -> str:
- """生成HTML报告"""
- return generate_html_report(
- stats=stats,
- total_titles=total_titles,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- mode=mode,
- update_info=update_info,
- rank_threshold=self.rank_threshold,
- output_dir="output",
- date_folder=self.format_date(),
- time_filename=self.format_time(),
- render_html_func=lambda *args, **kwargs: self.render_html(*args, rss_items=rss_items, rss_new_items=rss_new_items, ai_analysis=ai_analysis, standalone_data=standalone_data, **kwargs),
- matches_word_groups_func=self.matches_word_groups,
- load_frequency_words_func=self.load_frequency_words,
- )
- def render_html(
- self,
- report_data: Dict,
- total_titles: int,
- mode: str = "daily",
- update_info: Optional[Dict] = None,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ai_analysis: Optional[Any] = None,
- standalone_data: Optional[Dict] = None,
- ) -> str:
- """渲染HTML内容"""
- return render_html_content(
- report_data=report_data,
- total_titles=total_titles,
- mode=mode,
- update_info=update_info,
- region_order=self.region_order,
- get_time_func=self.get_time,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- display_mode=self.display_mode,
- ai_analysis=ai_analysis,
- show_new_section=self.show_new_section,
- standalone_data=standalone_data,
- )
- # === 通知内容渲染 ===
- def render_feishu(
- self,
- report_data: Dict,
- update_info: Optional[Dict] = None,
- mode: str = "daily",
- ) -> str:
- """渲染飞书内容"""
- return render_feishu_content(
- report_data=report_data,
- update_info=update_info,
- mode=mode,
- separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
- region_order=self.region_order,
- get_time_func=self.get_time,
- show_new_section=self.show_new_section,
- )
- def render_dingtalk(
- self,
- report_data: Dict,
- update_info: Optional[Dict] = None,
- mode: str = "daily",
- ) -> str:
- """渲染钉钉内容"""
- return render_dingtalk_content(
- report_data=report_data,
- update_info=update_info,
- mode=mode,
- region_order=self.region_order,
- get_time_func=self.get_time,
- show_new_section=self.show_new_section,
- )
- def split_content(
- self,
- report_data: Dict,
- format_type: str,
- update_info: Optional[Dict] = None,
- max_bytes: Optional[int] = None,
- mode: str = "daily",
- rss_items: Optional[list] = None,
- rss_new_items: Optional[list] = None,
- ai_content: Optional[str] = None,
- standalone_data: Optional[Dict] = None,
- ai_stats: Optional[Dict] = None,
- report_type: str = "热点分析报告",
- ) -> List[str]:
- """分批处理消息内容(支持热榜+RSS合并+AI分析+独立展示区)
- Args:
- report_data: 报告数据
- format_type: 格式类型
- update_info: 更新信息
- max_bytes: 最大字节数
- mode: 报告模式
- rss_items: RSS 统计条目列表
- rss_new_items: RSS 新增条目列表
- ai_content: AI 分析内容(已渲染的字符串)
- standalone_data: 独立展示区数据
- ai_stats: AI 分析统计数据
- report_type: 报告类型
- Returns:
- 分批后的消息内容列表
- """
- return split_content_into_batches(
- report_data=report_data,
- format_type=format_type,
- update_info=update_info,
- max_bytes=max_bytes,
- mode=mode,
- batch_sizes={
- "dingtalk": self.config.get("DINGTALK_BATCH_SIZE", 20000),
- "feishu": self.config.get("FEISHU_BATCH_SIZE", 29000),
- "default": self.config.get("MESSAGE_BATCH_SIZE", 4000),
- },
- feishu_separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
- region_order=self.region_order,
- get_time_func=self.get_time,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- timezone=self.config.get("TIMEZONE", DEFAULT_TIMEZONE),
- display_mode=self.display_mode,
- ai_content=ai_content,
- standalone_data=standalone_data,
- rank_threshold=self.rank_threshold,
- ai_stats=ai_stats,
- report_type=report_type,
- show_new_section=self.show_new_section,
- )
- # === 通知发送 ===
- def create_notification_dispatcher(self) -> NotificationDispatcher:
- """创建通知调度器"""
- # 创建翻译器(如果启用)
- translator = None
- trans_config = self.config.get("AI_TRANSLATION", {})
- if trans_config.get("ENABLED", False):
- ai_config = self.config.get("AI", {})
- translator = AITranslator(trans_config, ai_config)
- return NotificationDispatcher(
- config=self.config,
- get_time_func=self.get_time,
- split_content_func=self.split_content,
- translator=translator,
- )
- def create_scheduler(self) -> Scheduler:
- """
- 创建调度器(延迟初始化,单例)
- 基于 config.yaml 的 schedule 段 + timeline.yaml 构建。
- """
- if self._scheduler is None:
- schedule_config = self.config.get("SCHEDULE", {})
- timeline_data = self.config.get("_TIMELINE_DATA", {})
- self._scheduler = Scheduler(
- schedule_config=schedule_config,
- timeline_data=timeline_data,
- storage_backend=self.get_storage_manager(),
- get_time_func=self.get_time,
- )
- return self._scheduler
- # === 资源清理 ===
- def cleanup(self):
- """清理资源"""
- if self._storage_manager:
- self._storage_manager.cleanup_old_data()
- self._storage_manager.cleanup()
- self._storage_manager = None
|