# coding=utf-8 """ 通知调度器模块 提供统一的通知分发接口。 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。 使用示例: dispatcher = NotificationDispatcher(config, get_time_func, split_content_func) results = dispatcher.dispatch_all(report_data, report_type, ...) """ from __future__ import annotations from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional from trendradar.core.config import ( get_account_at_index, limit_accounts, parse_multi_account_config, validate_paired_configs, ) from .senders import ( send_to_bark, send_to_dingtalk, send_to_email, send_to_feishu, send_to_ntfy, send_to_slack, send_to_telegram, send_to_wework, send_to_generic_webhook, ) from .renderer import ( render_rss_feishu_content, render_rss_dingtalk_content, render_rss_markdown_content, ) # 类型检查时导入,运行时不导入(避免循环导入) if TYPE_CHECKING: from trendradar.ai import AIAnalysisResult, AITranslator class NotificationDispatcher: """ 统一的多账号通知调度器 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。 内部处理账号解析、数量限制、配对验证等逻辑。 """ def __init__( self, config: Dict[str, Any], get_time_func: Callable, split_content_func: Callable, translator: Optional["AITranslator"] = None, ): """ 初始化通知调度器 Args: config: 完整的配置字典,包含所有通知渠道的配置 get_time_func: 获取当前时间的函数 split_content_func: 内容分批函数 translator: AI 翻译器实例(可选) """ self.config = config self.get_time_func = get_time_func self.split_content_func = split_content_func self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3) self.translator = translator def _translate_content( self, report_data: Dict, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ) -> tuple: """ 翻译推送内容 Args: report_data: 报告数据 rss_items: RSS 统计条目 rss_new_items: RSS 新增条目 Returns: tuple: (翻译后的 report_data, rss_items, rss_new_items) """ if not self.translator or not self.translator.enabled: return report_data, rss_items, rss_new_items import copy print(f"[翻译] 开始翻译内容到 {self.translator.target_language}...") # 深拷贝避免修改原始数据 report_data = copy.deepcopy(report_data) rss_items = copy.deepcopy(rss_items) if rss_items else None rss_new_items = copy.deepcopy(rss_new_items) if rss_new_items else None # 收集所有需要翻译的标题 titles_to_translate = [] title_locations = [] # 记录标题位置,用于回填 # 1. 热榜标题 for stat_idx, stat in enumerate(report_data.get("stats", [])): for title_idx, title_data in enumerate(stat.get("titles", [])): titles_to_translate.append(title_data.get("title", "")) title_locations.append(("stats", stat_idx, title_idx)) # 2. 新增热点标题 for source_idx, source in enumerate(report_data.get("new_titles", [])): for title_idx, title_data in enumerate(source.get("titles", [])): titles_to_translate.append(title_data.get("title", "")) title_locations.append(("new_titles", source_idx, title_idx)) # 3. RSS 统计标题(结构与 stats 一致:[{word, count, titles: [{title, ...}]}]) if rss_items: for stat_idx, stat in enumerate(rss_items): for title_idx, title_data in enumerate(stat.get("titles", [])): titles_to_translate.append(title_data.get("title", "")) title_locations.append(("rss_items", stat_idx, title_idx)) # 4. RSS 新增标题(结构与 stats 一致) if rss_new_items: for stat_idx, stat in enumerate(rss_new_items): for title_idx, title_data in enumerate(stat.get("titles", [])): titles_to_translate.append(title_data.get("title", "")) title_locations.append(("rss_new_items", stat_idx, title_idx)) if not titles_to_translate: print("[翻译] 没有需要翻译的内容") return report_data, rss_items, rss_new_items print(f"[翻译] 共 {len(titles_to_translate)} 条标题待翻译") # 批量翻译 result = self.translator.translate_batch(titles_to_translate) if result.success_count == 0: print(f"[翻译] 翻译失败: {result.results[0].error if result.results else '未知错误'}") return report_data, rss_items, rss_new_items print(f"[翻译] 翻译完成: {result.success_count}/{result.total_count} 成功") # 回填翻译结果 for i, (loc_type, idx1, idx2) in enumerate(title_locations): if i < len(result.results) and result.results[i].success: translated = result.results[i].translated_text if loc_type == "stats": report_data["stats"][idx1]["titles"][idx2]["title"] = translated elif loc_type == "new_titles": report_data["new_titles"][idx1]["titles"][idx2]["title"] = translated elif loc_type == "rss_items" and rss_items: rss_items[idx1]["titles"][idx2]["title"] = translated elif loc_type == "rss_new_items" and rss_new_items: rss_new_items[idx1]["titles"][idx2]["title"] = translated return report_data, rss_items, rss_new_items def dispatch_all( self, report_data: Dict, report_type: str, update_info: Optional[Dict] = None, proxy_url: Optional[str] = None, mode: str = "daily", html_file_path: Optional[str] = None, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, standalone_data: Optional[Dict] = None, ) -> Dict[str, bool]: """ 分发通知到所有已配置的渠道(支持热榜+RSS合并推送+AI分析+独立展示区) Args: report_data: 报告数据(由 prepare_report_data 生成) report_type: 报告类型(如 "当日汇总"、"实时增量") update_info: 版本更新信息(可选) proxy_url: 代理 URL(可选) mode: 报告模式 (daily/current/incremental) html_file_path: HTML 报告文件路径(邮件使用) rss_items: RSS 统计条目列表(用于 RSS 统计区块) rss_new_items: RSS 新增条目列表(用于 RSS 新增区块) ai_analysis: AI 分析结果(可选) standalone_data: 独立展示区数据(可选) Returns: Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功 """ results = {} # 获取区域显示配置 display_regions = self.config.get("DISPLAY", {}).get("REGIONS", {}) # 执行翻译(如果启用) report_data, rss_items, rss_new_items = self._translate_content( report_data, rss_items, rss_new_items ) # 飞书 if self.config.get("FEISHU_WEBHOOK_URL"): results["feishu"] = self._send_feishu( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # 钉钉 if self.config.get("DINGTALK_WEBHOOK_URL"): results["dingtalk"] = self._send_dingtalk( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # 企业微信 if self.config.get("WEWORK_WEBHOOK_URL"): results["wework"] = self._send_wework( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # Telegram(需要配对验证) if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"): results["telegram"] = self._send_telegram( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # ntfy(需要配对验证) if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"): results["ntfy"] = self._send_ntfy( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # Bark if self.config.get("BARK_URL"): results["bark"] = self._send_bark( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # Slack if self.config.get("SLACK_WEBHOOK_URL"): results["slack"] = self._send_slack( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # 通用 Webhook if self.config.get("GENERIC_WEBHOOK_URL"): results["generic_webhook"] = self._send_generic_webhook( report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items, ai_analysis, display_regions, standalone_data ) # 邮件(保持原有逻辑,已支持多收件人,AI 分析已嵌入 HTML) if ( self.config.get("EMAIL_FROM") and self.config.get("EMAIL_PASSWORD") and self.config.get("EMAIL_TO") ): results["email"] = self._send_email(report_type, html_file_path) return results def _send_to_multi_accounts( self, channel_name: str, config_value: str, send_func: Callable[..., bool], **kwargs, ) -> bool: """ 通用多账号发送逻辑 Args: channel_name: 渠道名称(用于日志和账号数量限制提示) config_value: 配置值(可能包含多个账号,用 ; 分隔) send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool **kwargs: 传递给发送函数的其他参数 Returns: bool: 任一账号发送成功则返回 True """ accounts = parse_multi_account_config(config_value) if not accounts: return False accounts = limit_accounts(accounts, self.max_accounts, channel_name) results = [] for i, account in enumerate(accounts): if account: account_label = f"账号{i+1}" if len(accounts) > 1 else "" result = send_func(account, account_label=account_label, **kwargs) results.append(result) return any(results) if results else False def _send_feishu( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到飞书(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} # 根据区域开关决定是否发送对应内容 if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} return self._send_to_multi_accounts( channel_name="飞书", config_value=self.config["FEISHU_WEBHOOK_URL"], send_func=lambda url, account_label: send_to_feishu( webhook_url=url, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, get_time_func=self.get_time_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ), ) def _send_dingtalk( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到钉钉(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} return self._send_to_multi_accounts( channel_name="钉钉", config_value=self.config["DINGTALK_WEBHOOK_URL"], send_func=lambda url, account_label: send_to_dingtalk( webhook_url=url, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ), ) def _send_wework( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到企业微信(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} return self._send_to_multi_accounts( channel_name="企业微信", config_value=self.config["WEWORK_WEBHOOK_URL"], send_func=lambda url, account_label: send_to_wework( webhook_url=url, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ), ) def _send_telegram( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到 Telegram(多账号,需验证 token 和 chat_id 配对,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"]) telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"]) if not telegram_tokens or not telegram_chat_ids: return False # 验证配对 valid, count = validate_paired_configs( {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids}, "Telegram", required_keys=["bot_token", "chat_id"], ) if not valid or count == 0: return False # 限制账号数量 telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram") telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)] results = [] for i in range(len(telegram_tokens)): token = telegram_tokens[i] chat_id = telegram_chat_ids[i] if token and chat_id: account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else "" result = send_to_telegram( bot_token=token, chat_id=chat_id, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ) results.append(result) return any(results) if results else False def _send_ntfy( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到 ntfy(多账号,需验证 topic 和 token 配对,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} ntfy_server_url = self.config["NTFY_SERVER_URL"] ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"]) ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", "")) if not ntfy_server_url or not ntfy_topics: return False # 验证 token 和 topic 数量一致(如果配置了 token) if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics): print( f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送" ) return False # 限制账号数量 ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy") if ntfy_tokens: ntfy_tokens = ntfy_tokens[: len(ntfy_topics)] results = [] for i, topic in enumerate(ntfy_topics): if topic: token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else "" account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else "" result = send_to_ntfy( server_url=ntfy_server_url, topic=topic, token=token, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=3800, split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ) results.append(result) return any(results) if results else False def _send_bark( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到 Bark(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} return self._send_to_multi_accounts( channel_name="Bark", config_value=self.config["BARK_URL"], send_func=lambda url, account_label: send_to_bark( bark_url=url, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("BARK_BATCH_SIZE", 3600), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ), ) def _send_slack( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到 Slack(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} return self._send_to_multi_accounts( channel_name="Slack", config_value=self.config["SLACK_WEBHOOK_URL"], send_func=lambda url, account_label: send_to_slack( webhook_url=url, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("SLACK_BATCH_SIZE", 4000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ), ) def _send_generic_webhook( self, report_data: Dict, report_type: str, update_info: Optional[Dict], proxy_url: Optional[str], mode: str, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, ai_analysis: Optional[AIAnalysisResult] = None, display_regions: Optional[Dict] = None, standalone_data: Optional[Dict] = None, ) -> bool: """发送到通用 Webhook(多账号,支持热榜+RSS合并+AI分析+独立展示区)""" display_regions = display_regions or {} if not display_regions.get("HOTLIST", True): report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}} urls = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_URL", "")) templates = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_TEMPLATE", "")) if not urls: return False urls = limit_accounts(urls, self.max_accounts, "通用Webhook") results = [] for i, url in enumerate(urls): if not url: continue template = "" if templates: if i < len(templates): template = templates[i] elif len(templates) == 1: template = templates[0] # 共用一个模板 account_label = f"账号{i+1}" if len(urls) > 1 else "" result = send_to_generic_webhook( webhook_url=url, payload_template=template, report_data=report_data, report_type=report_type, update_info=update_info, proxy_url=proxy_url, mode=mode, account_label=account_label, batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000), batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0), split_content_func=self.split_content_func, rss_items=rss_items if display_regions.get("RSS", True) else None, rss_new_items=rss_new_items if display_regions.get("RSS", True) else None, ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None, display_regions=display_regions, standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None, ) results.append(result) return any(results) if results else False def _send_email( self, report_type: str, html_file_path: Optional[str], ) -> bool: """发送邮件(保持原有逻辑,已支持多收件人) Note: AI 分析内容已在 HTML 生成时嵌入,无需在此传递 """ return send_to_email( from_email=self.config["EMAIL_FROM"], password=self.config["EMAIL_PASSWORD"], to_email=self.config["EMAIL_TO"], report_type=report_type, html_file_path=html_file_path, custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""), custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""), get_time_func=self.get_time_func, ) # === RSS 通知方法 === def dispatch_rss( self, rss_items: List[Dict], feeds_info: Optional[Dict[str, str]] = None, proxy_url: Optional[str] = None, html_file_path: Optional[str] = None, ) -> Dict[str, bool]: """ 分发 RSS 通知到所有已配置的渠道 Args: rss_items: RSS 条目列表,每个条目包含: - title: 标题 - feed_id: RSS 源 ID - feed_name: RSS 源名称 - url: 链接 - published_at: 发布时间 - summary: 摘要(可选) - author: 作者(可选) feeds_info: RSS 源 ID 到名称的映射 proxy_url: 代理 URL(可选) html_file_path: HTML 报告文件路径(邮件使用) Returns: Dict[str, bool]: 每个渠道的发送结果 """ if not rss_items: print("[RSS通知] 没有 RSS 内容,跳过通知") return {} results = {} report_type = "RSS 订阅更新" # 飞书 if self.config.get("FEISHU_WEBHOOK_URL"): results["feishu"] = self._send_rss_feishu( rss_items, feeds_info, proxy_url ) # 钉钉 if self.config.get("DINGTALK_WEBHOOK_URL"): results["dingtalk"] = self._send_rss_dingtalk( rss_items, feeds_info, proxy_url ) # 企业微信 if self.config.get("WEWORK_WEBHOOK_URL"): results["wework"] = self._send_rss_markdown( rss_items, feeds_info, proxy_url, "wework" ) # Telegram if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"): results["telegram"] = self._send_rss_markdown( rss_items, feeds_info, proxy_url, "telegram" ) # ntfy if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"): results["ntfy"] = self._send_rss_markdown( rss_items, feeds_info, proxy_url, "ntfy" ) # Bark if self.config.get("BARK_URL"): results["bark"] = self._send_rss_markdown( rss_items, feeds_info, proxy_url, "bark" ) # Slack if self.config.get("SLACK_WEBHOOK_URL"): results["slack"] = self._send_rss_markdown( rss_items, feeds_info, proxy_url, "slack" ) # 邮件 if ( self.config.get("EMAIL_FROM") and self.config.get("EMAIL_PASSWORD") and self.config.get("EMAIL_TO") ): results["email"] = self._send_email(report_type, html_file_path) return results def _send_rss_feishu( self, rss_items: List[Dict], feeds_info: Optional[Dict[str, str]], proxy_url: Optional[str], ) -> bool: """发送 RSS 到飞书""" import requests content = render_rss_feishu_content( rss_items=rss_items, feeds_info=feeds_info, get_time_func=self.get_time_func, ) webhooks = parse_multi_account_config(self.config["FEISHU_WEBHOOK_URL"]) webhooks = limit_accounts(webhooks, self.max_accounts, "飞书") results = [] for i, webhook_url in enumerate(webhooks): if not webhook_url: continue account_label = f"账号{i+1}" if len(webhooks) > 1 else "" try: # 分批发送 batches = self.split_content_func( content, self.config.get("FEISHU_BATCH_SIZE", 29000) ) for batch_idx, batch_content in enumerate(batches): payload = { "msg_type": "interactive", "card": { "header": { "title": { "tag": "plain_text", "content": f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}", }, "template": "green", }, "elements": [ {"tag": "markdown", "content": batch_content} ], }, } proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ 飞书{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ 飞书{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_dingtalk( self, rss_items: List[Dict], feeds_info: Optional[Dict[str, str]], proxy_url: Optional[str], ) -> bool: """发送 RSS 到钉钉""" import requests content = render_rss_dingtalk_content( rss_items=rss_items, feeds_info=feeds_info, get_time_func=self.get_time_func, ) webhooks = parse_multi_account_config(self.config["DINGTALK_WEBHOOK_URL"]) webhooks = limit_accounts(webhooks, self.max_accounts, "钉钉") results = [] for i, webhook_url in enumerate(webhooks): if not webhook_url: continue account_label = f"账号{i+1}" if len(webhooks) > 1 else "" try: batches = self.split_content_func( content, self.config.get("DINGTALK_BATCH_SIZE", 20000) ) for batch_idx, batch_content in enumerate(batches): title = f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}" payload = { "msgtype": "markdown", "markdown": { "title": title, "text": batch_content, }, } proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ 钉钉{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ 钉钉{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_markdown( self, rss_items: List[Dict], feeds_info: Optional[Dict[str, str]], proxy_url: Optional[str], channel: str, ) -> bool: """发送 RSS 到 Markdown 兼容渠道(企业微信、Telegram、ntfy、Bark、Slack)""" import requests content = render_rss_markdown_content( rss_items=rss_items, feeds_info=feeds_info, get_time_func=self.get_time_func, ) try: if channel == "wework": return self._send_rss_wework(content, proxy_url) elif channel == "telegram": return self._send_rss_telegram(content, proxy_url) elif channel == "ntfy": return self._send_rss_ntfy(content, proxy_url) elif channel == "bark": return self._send_rss_bark(content, proxy_url) elif channel == "slack": return self._send_rss_slack(content, proxy_url) except Exception as e: print(f"❌ {channel} RSS 通知发送失败: {e}") return False return False def _send_rss_wework(self, content: str, proxy_url: Optional[str]) -> bool: """发送 RSS 到企业微信""" import requests webhooks = parse_multi_account_config(self.config["WEWORK_WEBHOOK_URL"]) webhooks = limit_accounts(webhooks, self.max_accounts, "企业微信") results = [] for i, webhook_url in enumerate(webhooks): if not webhook_url: continue account_label = f"账号{i+1}" if len(webhooks) > 1 else "" try: batches = self.split_content_func( content, self.config.get("MESSAGE_BATCH_SIZE", 4000) ) for batch_content in batches: payload = { "msgtype": "markdown", "markdown": {"content": batch_content}, } proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ 企业微信{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ 企业微信{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_telegram(self, content: str, proxy_url: Optional[str]) -> bool: """发送 RSS 到 Telegram""" import requests tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"]) chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"]) if not tokens or not chat_ids: return False results = [] for i in range(min(len(tokens), len(chat_ids), self.max_accounts)): token = tokens[i] chat_id = chat_ids[i] if not token or not chat_id: continue account_label = f"账号{i+1}" if len(tokens) > 1 else "" try: batches = self.split_content_func( content, self.config.get("MESSAGE_BATCH_SIZE", 4000) ) for batch_content in batches: url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_id, "text": batch_content, "parse_mode": "Markdown", } proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post(url, json=payload, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ Telegram{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ Telegram{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_ntfy(self, content: str, proxy_url: Optional[str]) -> bool: """发送 RSS 到 ntfy""" import requests server_url = self.config["NTFY_SERVER_URL"] topics = parse_multi_account_config(self.config["NTFY_TOPIC"]) tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", "")) if not server_url or not topics: return False topics = limit_accounts(topics, self.max_accounts, "ntfy") results = [] for i, topic in enumerate(topics): if not topic: continue token = tokens[i] if tokens and i < len(tokens) else "" account_label = f"账号{i+1}" if len(topics) > 1 else "" try: batches = self.split_content_func(content, 3800) for batch_content in batches: url = f"{server_url.rstrip('/')}/{topic}" headers = {"Title": "RSS 订阅更新", "Markdown": "yes"} if token: headers["Authorization"] = f"Bearer {token}" proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post( url, data=batch_content.encode("utf-8"), headers=headers, proxies=proxies, timeout=30 ) resp.raise_for_status() print(f"✅ ntfy{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ ntfy{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_bark(self, content: str, proxy_url: Optional[str]) -> bool: """发送 RSS 到 Bark""" import requests import urllib.parse urls = parse_multi_account_config(self.config["BARK_URL"]) urls = limit_accounts(urls, self.max_accounts, "Bark") results = [] for i, bark_url in enumerate(urls): if not bark_url: continue account_label = f"账号{i+1}" if len(urls) > 1 else "" try: batches = self.split_content_func( content, self.config.get("BARK_BATCH_SIZE", 3600) ) for batch_content in batches: title = urllib.parse.quote("📰 RSS 订阅更新") body = urllib.parse.quote(batch_content) url = f"{bark_url.rstrip('/')}/{title}/{body}" proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.get(url, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ Bark{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ Bark{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False def _send_rss_slack(self, content: str, proxy_url: Optional[str]) -> bool: """发送 RSS 到 Slack""" import requests webhooks = parse_multi_account_config(self.config["SLACK_WEBHOOK_URL"]) webhooks = limit_accounts(webhooks, self.max_accounts, "Slack") results = [] for i, webhook_url in enumerate(webhooks): if not webhook_url: continue account_label = f"账号{i+1}" if len(webhooks) > 1 else "" try: batches = self.split_content_func( content, self.config.get("SLACK_BATCH_SIZE", 4000) ) for batch_content in batches: payload = { "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": batch_content, }, } ] } proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30) resp.raise_for_status() print(f"✅ Slack{account_label} RSS 通知发送成功") results.append(True) except Exception as e: print(f"❌ Slack{account_label} RSS 通知发送失败: {e}") results.append(False) return any(results) if results else False