dispatcher.py 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152
  1. # coding=utf-8
  2. """
  3. 通知调度器模块
  4. 提供统一的通知分发接口。
  5. 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。
  6. 使用示例:
  7. dispatcher = NotificationDispatcher(config, get_time_func, split_content_func)
  8. results = dispatcher.dispatch_all(report_data, report_type, ...)
  9. """
  10. from __future__ import annotations
  11. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
  12. from trendradar.core.config import (
  13. get_account_at_index,
  14. limit_accounts,
  15. parse_multi_account_config,
  16. validate_paired_configs,
  17. )
  18. from .senders import (
  19. send_to_bark,
  20. send_to_dingtalk,
  21. send_to_email,
  22. send_to_feishu,
  23. send_to_ntfy,
  24. send_to_slack,
  25. send_to_telegram,
  26. send_to_wework,
  27. send_to_generic_webhook,
  28. )
  29. from .renderer import (
  30. render_rss_feishu_content,
  31. render_rss_dingtalk_content,
  32. render_rss_markdown_content,
  33. )
  34. # 类型检查时导入,运行时不导入(避免循环导入)
  35. if TYPE_CHECKING:
  36. from trendradar.ai import AIAnalysisResult, AITranslator
  37. class NotificationDispatcher:
  38. """
  39. 统一的多账号通知调度器
  40. 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。
  41. 内部处理账号解析、数量限制、配对验证等逻辑。
  42. """
  43. def __init__(
  44. self,
  45. config: Dict[str, Any],
  46. get_time_func: Callable,
  47. split_content_func: Callable,
  48. translator: Optional["AITranslator"] = None,
  49. ):
  50. """
  51. 初始化通知调度器
  52. Args:
  53. config: 完整的配置字典,包含所有通知渠道的配置
  54. get_time_func: 获取当前时间的函数
  55. split_content_func: 内容分批函数
  56. translator: AI 翻译器实例(可选)
  57. """
  58. self.config = config
  59. self.get_time_func = get_time_func
  60. self.split_content_func = split_content_func
  61. self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3)
  62. self.translator = translator
  63. def _translate_content(
  64. self,
  65. report_data: Dict,
  66. rss_items: Optional[List[Dict]] = None,
  67. rss_new_items: Optional[List[Dict]] = None,
  68. ) -> tuple:
  69. """
  70. 翻译推送内容
  71. Args:
  72. report_data: 报告数据
  73. rss_items: RSS 统计条目
  74. rss_new_items: RSS 新增条目
  75. Returns:
  76. tuple: (翻译后的 report_data, rss_items, rss_new_items)
  77. """
  78. if not self.translator or not self.translator.enabled:
  79. return report_data, rss_items, rss_new_items
  80. import copy
  81. print(f"[翻译] 开始翻译内容到 {self.translator.target_language}...")
  82. # 深拷贝避免修改原始数据
  83. report_data = copy.deepcopy(report_data)
  84. rss_items = copy.deepcopy(rss_items) if rss_items else None
  85. rss_new_items = copy.deepcopy(rss_new_items) if rss_new_items else None
  86. # 收集所有需要翻译的标题
  87. titles_to_translate = []
  88. title_locations = [] # 记录标题位置,用于回填
  89. # 1. 热榜标题
  90. for stat_idx, stat in enumerate(report_data.get("stats", [])):
  91. for title_idx, title_data in enumerate(stat.get("titles", [])):
  92. titles_to_translate.append(title_data.get("title", ""))
  93. title_locations.append(("stats", stat_idx, title_idx))
  94. # 2. 新增热点标题
  95. for source_idx, source in enumerate(report_data.get("new_titles", [])):
  96. for title_idx, title_data in enumerate(source.get("titles", [])):
  97. titles_to_translate.append(title_data.get("title", ""))
  98. title_locations.append(("new_titles", source_idx, title_idx))
  99. # 3. RSS 统计标题(结构与 stats 一致:[{word, count, titles: [{title, ...}]}])
  100. if rss_items:
  101. for stat_idx, stat in enumerate(rss_items):
  102. for title_idx, title_data in enumerate(stat.get("titles", [])):
  103. titles_to_translate.append(title_data.get("title", ""))
  104. title_locations.append(("rss_items", stat_idx, title_idx))
  105. # 4. RSS 新增标题(结构与 stats 一致)
  106. if rss_new_items:
  107. for stat_idx, stat in enumerate(rss_new_items):
  108. for title_idx, title_data in enumerate(stat.get("titles", [])):
  109. titles_to_translate.append(title_data.get("title", ""))
  110. title_locations.append(("rss_new_items", stat_idx, title_idx))
  111. if not titles_to_translate:
  112. print("[翻译] 没有需要翻译的内容")
  113. return report_data, rss_items, rss_new_items
  114. print(f"[翻译] 共 {len(titles_to_translate)} 条标题待翻译")
  115. # 批量翻译
  116. result = self.translator.translate_batch(titles_to_translate)
  117. if result.success_count == 0:
  118. print(f"[翻译] 翻译失败: {result.results[0].error if result.results else '未知错误'}")
  119. return report_data, rss_items, rss_new_items
  120. print(f"[翻译] 翻译完成: {result.success_count}/{result.total_count} 成功")
  121. # 回填翻译结果
  122. for i, (loc_type, idx1, idx2) in enumerate(title_locations):
  123. if i < len(result.results) and result.results[i].success:
  124. translated = result.results[i].translated_text
  125. if loc_type == "stats":
  126. report_data["stats"][idx1]["titles"][idx2]["title"] = translated
  127. elif loc_type == "new_titles":
  128. report_data["new_titles"][idx1]["titles"][idx2]["title"] = translated
  129. elif loc_type == "rss_items" and rss_items:
  130. rss_items[idx1]["titles"][idx2]["title"] = translated
  131. elif loc_type == "rss_new_items" and rss_new_items:
  132. rss_new_items[idx1]["titles"][idx2]["title"] = translated
  133. return report_data, rss_items, rss_new_items
  134. def dispatch_all(
  135. self,
  136. report_data: Dict,
  137. report_type: str,
  138. update_info: Optional[Dict] = None,
  139. proxy_url: Optional[str] = None,
  140. mode: str = "daily",
  141. html_file_path: Optional[str] = None,
  142. rss_items: Optional[List[Dict]] = None,
  143. rss_new_items: Optional[List[Dict]] = None,
  144. ai_analysis: Optional[AIAnalysisResult] = None,
  145. standalone_data: Optional[Dict] = None,
  146. ) -> Dict[str, bool]:
  147. """
  148. 分发通知到所有已配置的渠道(支持热榜+RSS合并推送+AI分析+独立展示区)
  149. Args:
  150. report_data: 报告数据(由 prepare_report_data 生成)
  151. report_type: 报告类型(如 "当日汇总"、"实时增量")
  152. update_info: 版本更新信息(可选)
  153. proxy_url: 代理 URL(可选)
  154. mode: 报告模式 (daily/current/incremental)
  155. html_file_path: HTML 报告文件路径(邮件使用)
  156. rss_items: RSS 统计条目列表(用于 RSS 统计区块)
  157. rss_new_items: RSS 新增条目列表(用于 RSS 新增区块)
  158. ai_analysis: AI 分析结果(可选)
  159. standalone_data: 独立展示区数据(可选)
  160. Returns:
  161. Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功
  162. """
  163. results = {}
  164. # 获取区域显示配置
  165. display_regions = self.config.get("DISPLAY", {}).get("REGIONS", {})
  166. # 执行翻译(如果启用)
  167. report_data, rss_items, rss_new_items = self._translate_content(
  168. report_data, rss_items, rss_new_items
  169. )
  170. # 飞书
  171. if self.config.get("FEISHU_WEBHOOK_URL"):
  172. results["feishu"] = self._send_feishu(
  173. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  174. ai_analysis, display_regions, standalone_data
  175. )
  176. # 钉钉
  177. if self.config.get("DINGTALK_WEBHOOK_URL"):
  178. results["dingtalk"] = self._send_dingtalk(
  179. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  180. ai_analysis, display_regions, standalone_data
  181. )
  182. # 企业微信
  183. if self.config.get("WEWORK_WEBHOOK_URL"):
  184. results["wework"] = self._send_wework(
  185. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  186. ai_analysis, display_regions, standalone_data
  187. )
  188. # Telegram(需要配对验证)
  189. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  190. results["telegram"] = self._send_telegram(
  191. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  192. ai_analysis, display_regions, standalone_data
  193. )
  194. # ntfy(需要配对验证)
  195. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  196. results["ntfy"] = self._send_ntfy(
  197. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  198. ai_analysis, display_regions, standalone_data
  199. )
  200. # Bark
  201. if self.config.get("BARK_URL"):
  202. results["bark"] = self._send_bark(
  203. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  204. ai_analysis, display_regions, standalone_data
  205. )
  206. # Slack
  207. if self.config.get("SLACK_WEBHOOK_URL"):
  208. results["slack"] = self._send_slack(
  209. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  210. ai_analysis, display_regions, standalone_data
  211. )
  212. # 通用 Webhook
  213. if self.config.get("GENERIC_WEBHOOK_URL"):
  214. results["generic_webhook"] = self._send_generic_webhook(
  215. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  216. ai_analysis, display_regions, standalone_data
  217. )
  218. # 邮件(保持原有逻辑,已支持多收件人,AI 分析已嵌入 HTML)
  219. if (
  220. self.config.get("EMAIL_FROM")
  221. and self.config.get("EMAIL_PASSWORD")
  222. and self.config.get("EMAIL_TO")
  223. ):
  224. results["email"] = self._send_email(report_type, html_file_path)
  225. return results
  226. def _send_to_multi_accounts(
  227. self,
  228. channel_name: str,
  229. config_value: str,
  230. send_func: Callable[..., bool],
  231. **kwargs,
  232. ) -> bool:
  233. """
  234. 通用多账号发送逻辑
  235. Args:
  236. channel_name: 渠道名称(用于日志和账号数量限制提示)
  237. config_value: 配置值(可能包含多个账号,用 ; 分隔)
  238. send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool
  239. **kwargs: 传递给发送函数的其他参数
  240. Returns:
  241. bool: 任一账号发送成功则返回 True
  242. """
  243. accounts = parse_multi_account_config(config_value)
  244. if not accounts:
  245. return False
  246. accounts = limit_accounts(accounts, self.max_accounts, channel_name)
  247. results = []
  248. for i, account in enumerate(accounts):
  249. if account:
  250. account_label = f"账号{i+1}" if len(accounts) > 1 else ""
  251. result = send_func(account, account_label=account_label, **kwargs)
  252. results.append(result)
  253. return any(results) if results else False
  254. def _send_feishu(
  255. self,
  256. report_data: Dict,
  257. report_type: str,
  258. update_info: Optional[Dict],
  259. proxy_url: Optional[str],
  260. mode: str,
  261. rss_items: Optional[List[Dict]] = None,
  262. rss_new_items: Optional[List[Dict]] = None,
  263. ai_analysis: Optional[AIAnalysisResult] = None,
  264. display_regions: Optional[Dict] = None,
  265. standalone_data: Optional[Dict] = None,
  266. ) -> bool:
  267. """发送到飞书(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  268. display_regions = display_regions or {}
  269. # 根据区域开关决定是否发送对应内容
  270. if not display_regions.get("HOTLIST", True):
  271. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  272. return self._send_to_multi_accounts(
  273. channel_name="飞书",
  274. config_value=self.config["FEISHU_WEBHOOK_URL"],
  275. send_func=lambda url, account_label: send_to_feishu(
  276. webhook_url=url,
  277. report_data=report_data,
  278. report_type=report_type,
  279. update_info=update_info,
  280. proxy_url=proxy_url,
  281. mode=mode,
  282. account_label=account_label,
  283. batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000),
  284. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  285. split_content_func=self.split_content_func,
  286. get_time_func=self.get_time_func,
  287. rss_items=rss_items if display_regions.get("RSS", True) else None,
  288. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  289. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  290. display_regions=display_regions,
  291. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  292. ),
  293. )
  294. def _send_dingtalk(
  295. self,
  296. report_data: Dict,
  297. report_type: str,
  298. update_info: Optional[Dict],
  299. proxy_url: Optional[str],
  300. mode: str,
  301. rss_items: Optional[List[Dict]] = None,
  302. rss_new_items: Optional[List[Dict]] = None,
  303. ai_analysis: Optional[AIAnalysisResult] = None,
  304. display_regions: Optional[Dict] = None,
  305. standalone_data: Optional[Dict] = None,
  306. ) -> bool:
  307. """发送到钉钉(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  308. display_regions = display_regions or {}
  309. if not display_regions.get("HOTLIST", True):
  310. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  311. return self._send_to_multi_accounts(
  312. channel_name="钉钉",
  313. config_value=self.config["DINGTALK_WEBHOOK_URL"],
  314. send_func=lambda url, account_label: send_to_dingtalk(
  315. webhook_url=url,
  316. report_data=report_data,
  317. report_type=report_type,
  318. update_info=update_info,
  319. proxy_url=proxy_url,
  320. mode=mode,
  321. account_label=account_label,
  322. batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000),
  323. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  324. split_content_func=self.split_content_func,
  325. rss_items=rss_items if display_regions.get("RSS", True) else None,
  326. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  327. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  328. display_regions=display_regions,
  329. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  330. ),
  331. )
  332. def _send_wework(
  333. self,
  334. report_data: Dict,
  335. report_type: str,
  336. update_info: Optional[Dict],
  337. proxy_url: Optional[str],
  338. mode: str,
  339. rss_items: Optional[List[Dict]] = None,
  340. rss_new_items: Optional[List[Dict]] = None,
  341. ai_analysis: Optional[AIAnalysisResult] = None,
  342. display_regions: Optional[Dict] = None,
  343. standalone_data: Optional[Dict] = None,
  344. ) -> bool:
  345. """发送到企业微信(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  346. display_regions = display_regions or {}
  347. if not display_regions.get("HOTLIST", True):
  348. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  349. return self._send_to_multi_accounts(
  350. channel_name="企业微信",
  351. config_value=self.config["WEWORK_WEBHOOK_URL"],
  352. send_func=lambda url, account_label: send_to_wework(
  353. webhook_url=url,
  354. report_data=report_data,
  355. report_type=report_type,
  356. update_info=update_info,
  357. proxy_url=proxy_url,
  358. mode=mode,
  359. account_label=account_label,
  360. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  361. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  362. msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"),
  363. split_content_func=self.split_content_func,
  364. rss_items=rss_items if display_regions.get("RSS", True) else None,
  365. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  366. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  367. display_regions=display_regions,
  368. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  369. ),
  370. )
  371. def _send_telegram(
  372. self,
  373. report_data: Dict,
  374. report_type: str,
  375. update_info: Optional[Dict],
  376. proxy_url: Optional[str],
  377. mode: str,
  378. rss_items: Optional[List[Dict]] = None,
  379. rss_new_items: Optional[List[Dict]] = None,
  380. ai_analysis: Optional[AIAnalysisResult] = None,
  381. display_regions: Optional[Dict] = None,
  382. standalone_data: Optional[Dict] = None,
  383. ) -> bool:
  384. """发送到 Telegram(多账号,需验证 token 和 chat_id 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  385. display_regions = display_regions or {}
  386. if not display_regions.get("HOTLIST", True):
  387. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  388. telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  389. telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  390. if not telegram_tokens or not telegram_chat_ids:
  391. return False
  392. # 验证配对
  393. valid, count = validate_paired_configs(
  394. {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids},
  395. "Telegram",
  396. required_keys=["bot_token", "chat_id"],
  397. )
  398. if not valid or count == 0:
  399. return False
  400. # 限制账号数量
  401. telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram")
  402. telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)]
  403. results = []
  404. for i in range(len(telegram_tokens)):
  405. token = telegram_tokens[i]
  406. chat_id = telegram_chat_ids[i]
  407. if token and chat_id:
  408. account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else ""
  409. result = send_to_telegram(
  410. bot_token=token,
  411. chat_id=chat_id,
  412. report_data=report_data,
  413. report_type=report_type,
  414. update_info=update_info,
  415. proxy_url=proxy_url,
  416. mode=mode,
  417. account_label=account_label,
  418. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  419. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  420. split_content_func=self.split_content_func,
  421. rss_items=rss_items if display_regions.get("RSS", True) else None,
  422. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  423. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  424. display_regions=display_regions,
  425. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  426. )
  427. results.append(result)
  428. return any(results) if results else False
  429. def _send_ntfy(
  430. self,
  431. report_data: Dict,
  432. report_type: str,
  433. update_info: Optional[Dict],
  434. proxy_url: Optional[str],
  435. mode: str,
  436. rss_items: Optional[List[Dict]] = None,
  437. rss_new_items: Optional[List[Dict]] = None,
  438. ai_analysis: Optional[AIAnalysisResult] = None,
  439. display_regions: Optional[Dict] = None,
  440. standalone_data: Optional[Dict] = None,
  441. ) -> bool:
  442. """发送到 ntfy(多账号,需验证 topic 和 token 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  443. display_regions = display_regions or {}
  444. if not display_regions.get("HOTLIST", True):
  445. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  446. ntfy_server_url = self.config["NTFY_SERVER_URL"]
  447. ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  448. ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  449. if not ntfy_server_url or not ntfy_topics:
  450. return False
  451. # 验证 token 和 topic 数量一致(如果配置了 token)
  452. if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics):
  453. print(
  454. f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送"
  455. )
  456. return False
  457. # 限制账号数量
  458. ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy")
  459. if ntfy_tokens:
  460. ntfy_tokens = ntfy_tokens[: len(ntfy_topics)]
  461. results = []
  462. for i, topic in enumerate(ntfy_topics):
  463. if topic:
  464. token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else ""
  465. account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else ""
  466. result = send_to_ntfy(
  467. server_url=ntfy_server_url,
  468. topic=topic,
  469. token=token,
  470. report_data=report_data,
  471. report_type=report_type,
  472. update_info=update_info,
  473. proxy_url=proxy_url,
  474. mode=mode,
  475. account_label=account_label,
  476. batch_size=3800,
  477. split_content_func=self.split_content_func,
  478. rss_items=rss_items if display_regions.get("RSS", True) else None,
  479. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  480. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  481. display_regions=display_regions,
  482. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  483. )
  484. results.append(result)
  485. return any(results) if results else False
  486. def _send_bark(
  487. self,
  488. report_data: Dict,
  489. report_type: str,
  490. update_info: Optional[Dict],
  491. proxy_url: Optional[str],
  492. mode: str,
  493. rss_items: Optional[List[Dict]] = None,
  494. rss_new_items: Optional[List[Dict]] = None,
  495. ai_analysis: Optional[AIAnalysisResult] = None,
  496. display_regions: Optional[Dict] = None,
  497. standalone_data: Optional[Dict] = None,
  498. ) -> bool:
  499. """发送到 Bark(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  500. display_regions = display_regions or {}
  501. if not display_regions.get("HOTLIST", True):
  502. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  503. return self._send_to_multi_accounts(
  504. channel_name="Bark",
  505. config_value=self.config["BARK_URL"],
  506. send_func=lambda url, account_label: send_to_bark(
  507. bark_url=url,
  508. report_data=report_data,
  509. report_type=report_type,
  510. update_info=update_info,
  511. proxy_url=proxy_url,
  512. mode=mode,
  513. account_label=account_label,
  514. batch_size=self.config.get("BARK_BATCH_SIZE", 3600),
  515. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  516. split_content_func=self.split_content_func,
  517. rss_items=rss_items if display_regions.get("RSS", True) else None,
  518. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  519. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  520. display_regions=display_regions,
  521. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  522. ),
  523. )
  524. def _send_slack(
  525. self,
  526. report_data: Dict,
  527. report_type: str,
  528. update_info: Optional[Dict],
  529. proxy_url: Optional[str],
  530. mode: str,
  531. rss_items: Optional[List[Dict]] = None,
  532. rss_new_items: Optional[List[Dict]] = None,
  533. ai_analysis: Optional[AIAnalysisResult] = None,
  534. display_regions: Optional[Dict] = None,
  535. standalone_data: Optional[Dict] = None,
  536. ) -> bool:
  537. """发送到 Slack(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  538. display_regions = display_regions or {}
  539. if not display_regions.get("HOTLIST", True):
  540. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  541. return self._send_to_multi_accounts(
  542. channel_name="Slack",
  543. config_value=self.config["SLACK_WEBHOOK_URL"],
  544. send_func=lambda url, account_label: send_to_slack(
  545. webhook_url=url,
  546. report_data=report_data,
  547. report_type=report_type,
  548. update_info=update_info,
  549. proxy_url=proxy_url,
  550. mode=mode,
  551. account_label=account_label,
  552. batch_size=self.config.get("SLACK_BATCH_SIZE", 4000),
  553. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  554. split_content_func=self.split_content_func,
  555. rss_items=rss_items if display_regions.get("RSS", True) else None,
  556. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  557. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  558. display_regions=display_regions,
  559. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  560. ),
  561. )
  562. def _send_generic_webhook(
  563. self,
  564. report_data: Dict,
  565. report_type: str,
  566. update_info: Optional[Dict],
  567. proxy_url: Optional[str],
  568. mode: str,
  569. rss_items: Optional[List[Dict]] = None,
  570. rss_new_items: Optional[List[Dict]] = None,
  571. ai_analysis: Optional[AIAnalysisResult] = None,
  572. display_regions: Optional[Dict] = None,
  573. standalone_data: Optional[Dict] = None,
  574. ) -> bool:
  575. """发送到通用 Webhook(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  576. display_regions = display_regions or {}
  577. if not display_regions.get("HOTLIST", True):
  578. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  579. urls = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_URL", ""))
  580. templates = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_TEMPLATE", ""))
  581. if not urls:
  582. return False
  583. urls = limit_accounts(urls, self.max_accounts, "通用Webhook")
  584. results = []
  585. for i, url in enumerate(urls):
  586. if not url:
  587. continue
  588. template = ""
  589. if templates:
  590. if i < len(templates):
  591. template = templates[i]
  592. elif len(templates) == 1:
  593. template = templates[0] # 共用一个模板
  594. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  595. result = send_to_generic_webhook(
  596. webhook_url=url,
  597. payload_template=template,
  598. report_data=report_data,
  599. report_type=report_type,
  600. update_info=update_info,
  601. proxy_url=proxy_url,
  602. mode=mode,
  603. account_label=account_label,
  604. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  605. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  606. split_content_func=self.split_content_func,
  607. rss_items=rss_items if display_regions.get("RSS", True) else None,
  608. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  609. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  610. display_regions=display_regions,
  611. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  612. )
  613. results.append(result)
  614. return any(results) if results else False
  615. def _send_email(
  616. self,
  617. report_type: str,
  618. html_file_path: Optional[str],
  619. ) -> bool:
  620. """发送邮件(保持原有逻辑,已支持多收件人)
  621. Note:
  622. AI 分析内容已在 HTML 生成时嵌入,无需在此传递
  623. """
  624. return send_to_email(
  625. from_email=self.config["EMAIL_FROM"],
  626. password=self.config["EMAIL_PASSWORD"],
  627. to_email=self.config["EMAIL_TO"],
  628. report_type=report_type,
  629. html_file_path=html_file_path,
  630. custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""),
  631. custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""),
  632. get_time_func=self.get_time_func,
  633. )
  634. # === RSS 通知方法 ===
  635. def dispatch_rss(
  636. self,
  637. rss_items: List[Dict],
  638. feeds_info: Optional[Dict[str, str]] = None,
  639. proxy_url: Optional[str] = None,
  640. html_file_path: Optional[str] = None,
  641. ) -> Dict[str, bool]:
  642. """
  643. 分发 RSS 通知到所有已配置的渠道
  644. Args:
  645. rss_items: RSS 条目列表,每个条目包含:
  646. - title: 标题
  647. - feed_id: RSS 源 ID
  648. - feed_name: RSS 源名称
  649. - url: 链接
  650. - published_at: 发布时间
  651. - summary: 摘要(可选)
  652. - author: 作者(可选)
  653. feeds_info: RSS 源 ID 到名称的映射
  654. proxy_url: 代理 URL(可选)
  655. html_file_path: HTML 报告文件路径(邮件使用)
  656. Returns:
  657. Dict[str, bool]: 每个渠道的发送结果
  658. """
  659. if not rss_items:
  660. print("[RSS通知] 没有 RSS 内容,跳过通知")
  661. return {}
  662. results = {}
  663. report_type = "RSS 订阅更新"
  664. # 飞书
  665. if self.config.get("FEISHU_WEBHOOK_URL"):
  666. results["feishu"] = self._send_rss_feishu(
  667. rss_items, feeds_info, proxy_url
  668. )
  669. # 钉钉
  670. if self.config.get("DINGTALK_WEBHOOK_URL"):
  671. results["dingtalk"] = self._send_rss_dingtalk(
  672. rss_items, feeds_info, proxy_url
  673. )
  674. # 企业微信
  675. if self.config.get("WEWORK_WEBHOOK_URL"):
  676. results["wework"] = self._send_rss_markdown(
  677. rss_items, feeds_info, proxy_url, "wework"
  678. )
  679. # Telegram
  680. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  681. results["telegram"] = self._send_rss_markdown(
  682. rss_items, feeds_info, proxy_url, "telegram"
  683. )
  684. # ntfy
  685. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  686. results["ntfy"] = self._send_rss_markdown(
  687. rss_items, feeds_info, proxy_url, "ntfy"
  688. )
  689. # Bark
  690. if self.config.get("BARK_URL"):
  691. results["bark"] = self._send_rss_markdown(
  692. rss_items, feeds_info, proxy_url, "bark"
  693. )
  694. # Slack
  695. if self.config.get("SLACK_WEBHOOK_URL"):
  696. results["slack"] = self._send_rss_markdown(
  697. rss_items, feeds_info, proxy_url, "slack"
  698. )
  699. # 邮件
  700. if (
  701. self.config.get("EMAIL_FROM")
  702. and self.config.get("EMAIL_PASSWORD")
  703. and self.config.get("EMAIL_TO")
  704. ):
  705. results["email"] = self._send_email(report_type, html_file_path)
  706. return results
  707. def _send_rss_feishu(
  708. self,
  709. rss_items: List[Dict],
  710. feeds_info: Optional[Dict[str, str]],
  711. proxy_url: Optional[str],
  712. ) -> bool:
  713. """发送 RSS 到飞书"""
  714. import requests
  715. content = render_rss_feishu_content(
  716. rss_items=rss_items,
  717. feeds_info=feeds_info,
  718. get_time_func=self.get_time_func,
  719. )
  720. webhooks = parse_multi_account_config(self.config["FEISHU_WEBHOOK_URL"])
  721. webhooks = limit_accounts(webhooks, self.max_accounts, "飞书")
  722. results = []
  723. for i, webhook_url in enumerate(webhooks):
  724. if not webhook_url:
  725. continue
  726. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  727. try:
  728. # 分批发送
  729. batches = self.split_content_func(
  730. content, self.config.get("FEISHU_BATCH_SIZE", 29000)
  731. )
  732. for batch_idx, batch_content in enumerate(batches):
  733. payload = {
  734. "msg_type": "interactive",
  735. "card": {
  736. "header": {
  737. "title": {
  738. "tag": "plain_text",
  739. "content": f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}",
  740. },
  741. "template": "green",
  742. },
  743. "elements": [
  744. {"tag": "markdown", "content": batch_content}
  745. ],
  746. },
  747. }
  748. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  749. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  750. resp.raise_for_status()
  751. print(f"✅ 飞书{account_label} RSS 通知发送成功")
  752. results.append(True)
  753. except Exception as e:
  754. print(f"❌ 飞书{account_label} RSS 通知发送失败: {e}")
  755. results.append(False)
  756. return any(results) if results else False
  757. def _send_rss_dingtalk(
  758. self,
  759. rss_items: List[Dict],
  760. feeds_info: Optional[Dict[str, str]],
  761. proxy_url: Optional[str],
  762. ) -> bool:
  763. """发送 RSS 到钉钉"""
  764. import requests
  765. content = render_rss_dingtalk_content(
  766. rss_items=rss_items,
  767. feeds_info=feeds_info,
  768. get_time_func=self.get_time_func,
  769. )
  770. webhooks = parse_multi_account_config(self.config["DINGTALK_WEBHOOK_URL"])
  771. webhooks = limit_accounts(webhooks, self.max_accounts, "钉钉")
  772. results = []
  773. for i, webhook_url in enumerate(webhooks):
  774. if not webhook_url:
  775. continue
  776. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  777. try:
  778. batches = self.split_content_func(
  779. content, self.config.get("DINGTALK_BATCH_SIZE", 20000)
  780. )
  781. for batch_idx, batch_content in enumerate(batches):
  782. title = f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}"
  783. payload = {
  784. "msgtype": "markdown",
  785. "markdown": {
  786. "title": title,
  787. "text": batch_content,
  788. },
  789. }
  790. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  791. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  792. resp.raise_for_status()
  793. print(f"✅ 钉钉{account_label} RSS 通知发送成功")
  794. results.append(True)
  795. except Exception as e:
  796. print(f"❌ 钉钉{account_label} RSS 通知发送失败: {e}")
  797. results.append(False)
  798. return any(results) if results else False
  799. def _send_rss_markdown(
  800. self,
  801. rss_items: List[Dict],
  802. feeds_info: Optional[Dict[str, str]],
  803. proxy_url: Optional[str],
  804. channel: str,
  805. ) -> bool:
  806. """发送 RSS 到 Markdown 兼容渠道(企业微信、Telegram、ntfy、Bark、Slack)"""
  807. import requests
  808. content = render_rss_markdown_content(
  809. rss_items=rss_items,
  810. feeds_info=feeds_info,
  811. get_time_func=self.get_time_func,
  812. )
  813. try:
  814. if channel == "wework":
  815. return self._send_rss_wework(content, proxy_url)
  816. elif channel == "telegram":
  817. return self._send_rss_telegram(content, proxy_url)
  818. elif channel == "ntfy":
  819. return self._send_rss_ntfy(content, proxy_url)
  820. elif channel == "bark":
  821. return self._send_rss_bark(content, proxy_url)
  822. elif channel == "slack":
  823. return self._send_rss_slack(content, proxy_url)
  824. except Exception as e:
  825. print(f"❌ {channel} RSS 通知发送失败: {e}")
  826. return False
  827. return False
  828. def _send_rss_wework(self, content: str, proxy_url: Optional[str]) -> bool:
  829. """发送 RSS 到企业微信"""
  830. import requests
  831. webhooks = parse_multi_account_config(self.config["WEWORK_WEBHOOK_URL"])
  832. webhooks = limit_accounts(webhooks, self.max_accounts, "企业微信")
  833. results = []
  834. for i, webhook_url in enumerate(webhooks):
  835. if not webhook_url:
  836. continue
  837. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  838. try:
  839. batches = self.split_content_func(
  840. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  841. )
  842. for batch_content in batches:
  843. payload = {
  844. "msgtype": "markdown",
  845. "markdown": {"content": batch_content},
  846. }
  847. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  848. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  849. resp.raise_for_status()
  850. print(f"✅ 企业微信{account_label} RSS 通知发送成功")
  851. results.append(True)
  852. except Exception as e:
  853. print(f"❌ 企业微信{account_label} RSS 通知发送失败: {e}")
  854. results.append(False)
  855. return any(results) if results else False
  856. def _send_rss_telegram(self, content: str, proxy_url: Optional[str]) -> bool:
  857. """发送 RSS 到 Telegram"""
  858. import requests
  859. tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  860. chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  861. if not tokens or not chat_ids:
  862. return False
  863. results = []
  864. for i in range(min(len(tokens), len(chat_ids), self.max_accounts)):
  865. token = tokens[i]
  866. chat_id = chat_ids[i]
  867. if not token or not chat_id:
  868. continue
  869. account_label = f"账号{i+1}" if len(tokens) > 1 else ""
  870. try:
  871. batches = self.split_content_func(
  872. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  873. )
  874. for batch_content in batches:
  875. url = f"https://api.telegram.org/bot{token}/sendMessage"
  876. payload = {
  877. "chat_id": chat_id,
  878. "text": batch_content,
  879. "parse_mode": "Markdown",
  880. }
  881. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  882. resp = requests.post(url, json=payload, proxies=proxies, timeout=30)
  883. resp.raise_for_status()
  884. print(f"✅ Telegram{account_label} RSS 通知发送成功")
  885. results.append(True)
  886. except Exception as e:
  887. print(f"❌ Telegram{account_label} RSS 通知发送失败: {e}")
  888. results.append(False)
  889. return any(results) if results else False
  890. def _send_rss_ntfy(self, content: str, proxy_url: Optional[str]) -> bool:
  891. """发送 RSS 到 ntfy"""
  892. import requests
  893. server_url = self.config["NTFY_SERVER_URL"]
  894. topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  895. tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  896. if not server_url or not topics:
  897. return False
  898. topics = limit_accounts(topics, self.max_accounts, "ntfy")
  899. results = []
  900. for i, topic in enumerate(topics):
  901. if not topic:
  902. continue
  903. token = tokens[i] if tokens and i < len(tokens) else ""
  904. account_label = f"账号{i+1}" if len(topics) > 1 else ""
  905. try:
  906. batches = self.split_content_func(content, 3800)
  907. for batch_content in batches:
  908. url = f"{server_url.rstrip('/')}/{topic}"
  909. headers = {"Title": "RSS 订阅更新", "Markdown": "yes"}
  910. if token:
  911. headers["Authorization"] = f"Bearer {token}"
  912. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  913. resp = requests.post(
  914. url, data=batch_content.encode("utf-8"),
  915. headers=headers, proxies=proxies, timeout=30
  916. )
  917. resp.raise_for_status()
  918. print(f"✅ ntfy{account_label} RSS 通知发送成功")
  919. results.append(True)
  920. except Exception as e:
  921. print(f"❌ ntfy{account_label} RSS 通知发送失败: {e}")
  922. results.append(False)
  923. return any(results) if results else False
  924. def _send_rss_bark(self, content: str, proxy_url: Optional[str]) -> bool:
  925. """发送 RSS 到 Bark"""
  926. import requests
  927. import urllib.parse
  928. urls = parse_multi_account_config(self.config["BARK_URL"])
  929. urls = limit_accounts(urls, self.max_accounts, "Bark")
  930. results = []
  931. for i, bark_url in enumerate(urls):
  932. if not bark_url:
  933. continue
  934. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  935. try:
  936. batches = self.split_content_func(
  937. content, self.config.get("BARK_BATCH_SIZE", 3600)
  938. )
  939. for batch_content in batches:
  940. title = urllib.parse.quote("📰 RSS 订阅更新")
  941. body = urllib.parse.quote(batch_content)
  942. url = f"{bark_url.rstrip('/')}/{title}/{body}"
  943. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  944. resp = requests.get(url, proxies=proxies, timeout=30)
  945. resp.raise_for_status()
  946. print(f"✅ Bark{account_label} RSS 通知发送成功")
  947. results.append(True)
  948. except Exception as e:
  949. print(f"❌ Bark{account_label} RSS 通知发送失败: {e}")
  950. results.append(False)
  951. return any(results) if results else False
  952. def _send_rss_slack(self, content: str, proxy_url: Optional[str]) -> bool:
  953. """发送 RSS 到 Slack"""
  954. import requests
  955. webhooks = parse_multi_account_config(self.config["SLACK_WEBHOOK_URL"])
  956. webhooks = limit_accounts(webhooks, self.max_accounts, "Slack")
  957. results = []
  958. for i, webhook_url in enumerate(webhooks):
  959. if not webhook_url:
  960. continue
  961. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  962. try:
  963. batches = self.split_content_func(
  964. content, self.config.get("SLACK_BATCH_SIZE", 4000)
  965. )
  966. for batch_content in batches:
  967. payload = {
  968. "blocks": [
  969. {
  970. "type": "section",
  971. "text": {
  972. "type": "mrkdwn",
  973. "text": batch_content,
  974. },
  975. }
  976. ]
  977. }
  978. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  979. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  980. resp.raise_for_status()
  981. print(f"✅ Slack{account_label} RSS 通知发送成功")
  982. results.append(True)
  983. except Exception as e:
  984. print(f"❌ Slack{account_label} RSS 通知发送失败: {e}")
  985. results.append(False)
  986. return any(results) if results else False