dispatcher.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. # coding=utf-8
  2. """
  3. 通知调度器模块
  4. 提供统一的通知分发接口。
  5. 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。
  6. 使用示例:
  7. dispatcher = NotificationDispatcher(config, get_time_func, split_content_func)
  8. results = dispatcher.dispatch_all(report_data, report_type, ...)
  9. """
  10. from typing import Any, Callable, Dict, List, Optional
  11. from trendradar.core.config import (
  12. get_account_at_index,
  13. limit_accounts,
  14. parse_multi_account_config,
  15. validate_paired_configs,
  16. )
  17. from .senders import (
  18. send_to_bark,
  19. send_to_dingtalk,
  20. send_to_email,
  21. send_to_feishu,
  22. send_to_ntfy,
  23. send_to_slack,
  24. send_to_telegram,
  25. send_to_wework,
  26. )
  27. class NotificationDispatcher:
  28. """
  29. 统一的多账号通知调度器
  30. 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。
  31. 内部处理账号解析、数量限制、配对验证等逻辑。
  32. """
  33. def __init__(
  34. self,
  35. config: Dict[str, Any],
  36. get_time_func: Callable,
  37. split_content_func: Callable,
  38. ):
  39. """
  40. 初始化通知调度器
  41. Args:
  42. config: 完整的配置字典,包含所有通知渠道的配置
  43. get_time_func: 获取当前时间的函数
  44. split_content_func: 内容分批函数
  45. """
  46. self.config = config
  47. self.get_time_func = get_time_func
  48. self.split_content_func = split_content_func
  49. self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3)
  50. def dispatch_all(
  51. self,
  52. report_data: Dict,
  53. report_type: str,
  54. update_info: Optional[Dict] = None,
  55. proxy_url: Optional[str] = None,
  56. mode: str = "daily",
  57. html_file_path: Optional[str] = None,
  58. ) -> Dict[str, bool]:
  59. """
  60. 分发通知到所有已配置的渠道
  61. Args:
  62. report_data: 报告数据(由 prepare_report_data 生成)
  63. report_type: 报告类型(如 "当日汇总"、"实时增量")
  64. update_info: 版本更新信息(可选)
  65. proxy_url: 代理 URL(可选)
  66. mode: 报告模式 (daily/current/incremental)
  67. html_file_path: HTML 报告文件路径(邮件使用)
  68. Returns:
  69. Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功
  70. """
  71. results = {}
  72. # 飞书
  73. if self.config.get("FEISHU_WEBHOOK_URL"):
  74. results["feishu"] = self._send_feishu(
  75. report_data, report_type, update_info, proxy_url, mode
  76. )
  77. # 钉钉
  78. if self.config.get("DINGTALK_WEBHOOK_URL"):
  79. results["dingtalk"] = self._send_dingtalk(
  80. report_data, report_type, update_info, proxy_url, mode
  81. )
  82. # 企业微信
  83. if self.config.get("WEWORK_WEBHOOK_URL"):
  84. results["wework"] = self._send_wework(
  85. report_data, report_type, update_info, proxy_url, mode
  86. )
  87. # Telegram(需要配对验证)
  88. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  89. results["telegram"] = self._send_telegram(
  90. report_data, report_type, update_info, proxy_url, mode
  91. )
  92. # ntfy(需要配对验证)
  93. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  94. results["ntfy"] = self._send_ntfy(
  95. report_data, report_type, update_info, proxy_url, mode
  96. )
  97. # Bark
  98. if self.config.get("BARK_URL"):
  99. results["bark"] = self._send_bark(
  100. report_data, report_type, update_info, proxy_url, mode
  101. )
  102. # Slack
  103. if self.config.get("SLACK_WEBHOOK_URL"):
  104. results["slack"] = self._send_slack(
  105. report_data, report_type, update_info, proxy_url, mode
  106. )
  107. # 邮件(保持原有逻辑,已支持多收件人)
  108. if (
  109. self.config.get("EMAIL_FROM")
  110. and self.config.get("EMAIL_PASSWORD")
  111. and self.config.get("EMAIL_TO")
  112. ):
  113. results["email"] = self._send_email(report_type, html_file_path)
  114. return results
  115. def _send_to_multi_accounts(
  116. self,
  117. channel_name: str,
  118. config_value: str,
  119. send_func: Callable[..., bool],
  120. **kwargs,
  121. ) -> bool:
  122. """
  123. 通用多账号发送逻辑
  124. Args:
  125. channel_name: 渠道名称(用于日志和账号数量限制提示)
  126. config_value: 配置值(可能包含多个账号,用 ; 分隔)
  127. send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool
  128. **kwargs: 传递给发送函数的其他参数
  129. Returns:
  130. bool: 任一账号发送成功则返回 True
  131. """
  132. accounts = parse_multi_account_config(config_value)
  133. if not accounts:
  134. return False
  135. accounts = limit_accounts(accounts, self.max_accounts, channel_name)
  136. results = []
  137. for i, account in enumerate(accounts):
  138. if account:
  139. account_label = f"账号{i+1}" if len(accounts) > 1 else ""
  140. result = send_func(account, account_label=account_label, **kwargs)
  141. results.append(result)
  142. return any(results) if results else False
  143. def _send_feishu(
  144. self,
  145. report_data: Dict,
  146. report_type: str,
  147. update_info: Optional[Dict],
  148. proxy_url: Optional[str],
  149. mode: str,
  150. ) -> bool:
  151. """发送到飞书(多账号)"""
  152. return self._send_to_multi_accounts(
  153. channel_name="飞书",
  154. config_value=self.config["FEISHU_WEBHOOK_URL"],
  155. send_func=lambda url, account_label: send_to_feishu(
  156. webhook_url=url,
  157. report_data=report_data,
  158. report_type=report_type,
  159. update_info=update_info,
  160. proxy_url=proxy_url,
  161. mode=mode,
  162. account_label=account_label,
  163. batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000),
  164. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  165. split_content_func=self.split_content_func,
  166. get_time_func=self.get_time_func,
  167. ),
  168. )
  169. def _send_dingtalk(
  170. self,
  171. report_data: Dict,
  172. report_type: str,
  173. update_info: Optional[Dict],
  174. proxy_url: Optional[str],
  175. mode: str,
  176. ) -> bool:
  177. """发送到钉钉(多账号)"""
  178. return self._send_to_multi_accounts(
  179. channel_name="钉钉",
  180. config_value=self.config["DINGTALK_WEBHOOK_URL"],
  181. send_func=lambda url, account_label: send_to_dingtalk(
  182. webhook_url=url,
  183. report_data=report_data,
  184. report_type=report_type,
  185. update_info=update_info,
  186. proxy_url=proxy_url,
  187. mode=mode,
  188. account_label=account_label,
  189. batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000),
  190. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  191. split_content_func=self.split_content_func,
  192. ),
  193. )
  194. def _send_wework(
  195. self,
  196. report_data: Dict,
  197. report_type: str,
  198. update_info: Optional[Dict],
  199. proxy_url: Optional[str],
  200. mode: str,
  201. ) -> bool:
  202. """发送到企业微信(多账号)"""
  203. return self._send_to_multi_accounts(
  204. channel_name="企业微信",
  205. config_value=self.config["WEWORK_WEBHOOK_URL"],
  206. send_func=lambda url, account_label: send_to_wework(
  207. webhook_url=url,
  208. report_data=report_data,
  209. report_type=report_type,
  210. update_info=update_info,
  211. proxy_url=proxy_url,
  212. mode=mode,
  213. account_label=account_label,
  214. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  215. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  216. msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"),
  217. split_content_func=self.split_content_func,
  218. ),
  219. )
  220. def _send_telegram(
  221. self,
  222. report_data: Dict,
  223. report_type: str,
  224. update_info: Optional[Dict],
  225. proxy_url: Optional[str],
  226. mode: str,
  227. ) -> bool:
  228. """发送到 Telegram(多账号,需验证 token 和 chat_id 配对)"""
  229. telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  230. telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  231. if not telegram_tokens or not telegram_chat_ids:
  232. return False
  233. # 验证配对
  234. valid, count = validate_paired_configs(
  235. {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids},
  236. "Telegram",
  237. required_keys=["bot_token", "chat_id"],
  238. )
  239. if not valid or count == 0:
  240. return False
  241. # 限制账号数量
  242. telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram")
  243. telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)]
  244. results = []
  245. for i in range(len(telegram_tokens)):
  246. token = telegram_tokens[i]
  247. chat_id = telegram_chat_ids[i]
  248. if token and chat_id:
  249. account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else ""
  250. result = send_to_telegram(
  251. bot_token=token,
  252. chat_id=chat_id,
  253. report_data=report_data,
  254. report_type=report_type,
  255. update_info=update_info,
  256. proxy_url=proxy_url,
  257. mode=mode,
  258. account_label=account_label,
  259. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  260. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  261. split_content_func=self.split_content_func,
  262. )
  263. results.append(result)
  264. return any(results) if results else False
  265. def _send_ntfy(
  266. self,
  267. report_data: Dict,
  268. report_type: str,
  269. update_info: Optional[Dict],
  270. proxy_url: Optional[str],
  271. mode: str,
  272. ) -> bool:
  273. """发送到 ntfy(多账号,需验证 topic 和 token 配对)"""
  274. ntfy_server_url = self.config["NTFY_SERVER_URL"]
  275. ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  276. ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  277. if not ntfy_server_url or not ntfy_topics:
  278. return False
  279. # 验证 token 和 topic 数量一致(如果配置了 token)
  280. if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics):
  281. print(
  282. f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送"
  283. )
  284. return False
  285. # 限制账号数量
  286. ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy")
  287. if ntfy_tokens:
  288. ntfy_tokens = ntfy_tokens[: len(ntfy_topics)]
  289. results = []
  290. for i, topic in enumerate(ntfy_topics):
  291. if topic:
  292. token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else ""
  293. account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else ""
  294. result = send_to_ntfy(
  295. server_url=ntfy_server_url,
  296. topic=topic,
  297. token=token,
  298. report_data=report_data,
  299. report_type=report_type,
  300. update_info=update_info,
  301. proxy_url=proxy_url,
  302. mode=mode,
  303. account_label=account_label,
  304. batch_size=3800,
  305. split_content_func=self.split_content_func,
  306. )
  307. results.append(result)
  308. return any(results) if results else False
  309. def _send_bark(
  310. self,
  311. report_data: Dict,
  312. report_type: str,
  313. update_info: Optional[Dict],
  314. proxy_url: Optional[str],
  315. mode: str,
  316. ) -> bool:
  317. """发送到 Bark(多账号)"""
  318. return self._send_to_multi_accounts(
  319. channel_name="Bark",
  320. config_value=self.config["BARK_URL"],
  321. send_func=lambda url, account_label: send_to_bark(
  322. bark_url=url,
  323. report_data=report_data,
  324. report_type=report_type,
  325. update_info=update_info,
  326. proxy_url=proxy_url,
  327. mode=mode,
  328. account_label=account_label,
  329. batch_size=self.config.get("BARK_BATCH_SIZE", 3600),
  330. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  331. split_content_func=self.split_content_func,
  332. ),
  333. )
  334. def _send_slack(
  335. self,
  336. report_data: Dict,
  337. report_type: str,
  338. update_info: Optional[Dict],
  339. proxy_url: Optional[str],
  340. mode: str,
  341. ) -> bool:
  342. """发送到 Slack(多账号)"""
  343. return self._send_to_multi_accounts(
  344. channel_name="Slack",
  345. config_value=self.config["SLACK_WEBHOOK_URL"],
  346. send_func=lambda url, account_label: send_to_slack(
  347. webhook_url=url,
  348. report_data=report_data,
  349. report_type=report_type,
  350. update_info=update_info,
  351. proxy_url=proxy_url,
  352. mode=mode,
  353. account_label=account_label,
  354. batch_size=self.config.get("SLACK_BATCH_SIZE", 4000),
  355. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  356. split_content_func=self.split_content_func,
  357. ),
  358. )
  359. def _send_email(
  360. self,
  361. report_type: str,
  362. html_file_path: Optional[str],
  363. ) -> bool:
  364. """发送邮件(保持原有逻辑,已支持多收件人)"""
  365. return send_to_email(
  366. from_email=self.config["EMAIL_FROM"],
  367. password=self.config["EMAIL_PASSWORD"],
  368. to_email=self.config["EMAIL_TO"],
  369. report_type=report_type,
  370. html_file_path=html_file_path,
  371. custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""),
  372. custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""),
  373. get_time_func=self.get_time_func,
  374. )