dispatcher.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150
  1. # coding=utf-8
  2. """
  3. 通知调度器模块
  4. 提供统一的通知分发接口。
  5. 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。
  6. 使用示例:
  7. dispatcher = NotificationDispatcher(config, get_time_func, split_content_func)
  8. results = dispatcher.dispatch_all(report_data, report_type, ...)
  9. """
  10. from __future__ import annotations
  11. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
  12. from trendradar.core.config import (
  13. get_account_at_index,
  14. limit_accounts,
  15. parse_multi_account_config,
  16. validate_paired_configs,
  17. )
  18. from .senders import (
  19. send_to_bark,
  20. send_to_dingtalk,
  21. send_to_email,
  22. send_to_feishu,
  23. send_to_ntfy,
  24. send_to_slack,
  25. send_to_telegram,
  26. send_to_wework,
  27. send_to_generic_webhook,
  28. )
  29. from .renderer import (
  30. render_rss_feishu_content,
  31. render_rss_dingtalk_content,
  32. render_rss_markdown_content,
  33. )
  34. # 类型检查时导入,运行时不导入(避免循环导入)
  35. if TYPE_CHECKING:
  36. from trendradar.ai import AIAnalysisResult, AITranslator
  37. class NotificationDispatcher:
  38. """
  39. 统一的多账号通知调度器
  40. 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。
  41. 内部处理账号解析、数量限制、配对验证等逻辑。
  42. """
  43. def __init__(
  44. self,
  45. config: Dict[str, Any],
  46. get_time_func: Callable,
  47. split_content_func: Callable,
  48. translator: Optional["AITranslator"] = None,
  49. ):
  50. """
  51. 初始化通知调度器
  52. Args:
  53. config: 完整的配置字典,包含所有通知渠道的配置
  54. get_time_func: 获取当前时间的函数
  55. split_content_func: 内容分批函数
  56. translator: AI 翻译器实例(可选)
  57. """
  58. self.config = config
  59. self.get_time_func = get_time_func
  60. self.split_content_func = split_content_func
  61. self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3)
  62. self.translator = translator
  63. def _translate_content(
  64. self,
  65. report_data: Dict,
  66. rss_items: Optional[List[Dict]] = None,
  67. rss_new_items: Optional[List[Dict]] = None,
  68. ) -> tuple:
  69. """
  70. 翻译推送内容
  71. Args:
  72. report_data: 报告数据
  73. rss_items: RSS 统计条目
  74. rss_new_items: RSS 新增条目
  75. Returns:
  76. tuple: (翻译后的 report_data, rss_items, rss_new_items)
  77. """
  78. if not self.translator or not self.translator.enabled:
  79. return report_data, rss_items, rss_new_items
  80. import copy
  81. print(f"[翻译] 开始翻译内容到 {self.translator.target_language}...")
  82. # 深拷贝避免修改原始数据
  83. report_data = copy.deepcopy(report_data)
  84. rss_items = copy.deepcopy(rss_items) if rss_items else None
  85. rss_new_items = copy.deepcopy(rss_new_items) if rss_new_items else None
  86. # 收集所有需要翻译的标题
  87. titles_to_translate = []
  88. title_locations = [] # 记录标题位置,用于回填
  89. # 1. 热榜标题
  90. for stat_idx, stat in enumerate(report_data.get("stats", [])):
  91. for title_idx, title_data in enumerate(stat.get("titles", [])):
  92. titles_to_translate.append(title_data.get("title", ""))
  93. title_locations.append(("stats", stat_idx, title_idx))
  94. # 2. 新增热点标题
  95. for source_idx, source in enumerate(report_data.get("new_titles", [])):
  96. for title_idx, title_data in enumerate(source.get("titles", [])):
  97. titles_to_translate.append(title_data.get("title", ""))
  98. title_locations.append(("new_titles", source_idx, title_idx))
  99. # 3. RSS 统计标题
  100. if rss_items:
  101. for item_idx, item in enumerate(rss_items):
  102. titles_to_translate.append(item.get("title", ""))
  103. title_locations.append(("rss_items", item_idx, None))
  104. # 4. RSS 新增标题
  105. if rss_new_items:
  106. for item_idx, item in enumerate(rss_new_items):
  107. titles_to_translate.append(item.get("title", ""))
  108. title_locations.append(("rss_new_items", item_idx, None))
  109. if not titles_to_translate:
  110. print("[翻译] 没有需要翻译的内容")
  111. return report_data, rss_items, rss_new_items
  112. print(f"[翻译] 共 {len(titles_to_translate)} 条标题待翻译")
  113. # 批量翻译
  114. result = self.translator.translate_batch(titles_to_translate)
  115. if result.success_count == 0:
  116. print(f"[翻译] 翻译失败: {result.results[0].error if result.results else '未知错误'}")
  117. return report_data, rss_items, rss_new_items
  118. print(f"[翻译] 翻译完成: {result.success_count}/{result.total_count} 成功")
  119. # 回填翻译结果
  120. for i, (loc_type, idx1, idx2) in enumerate(title_locations):
  121. if i < len(result.results) and result.results[i].success:
  122. translated = result.results[i].translated_text
  123. if loc_type == "stats":
  124. report_data["stats"][idx1]["titles"][idx2]["title"] = translated
  125. elif loc_type == "new_titles":
  126. report_data["new_titles"][idx1]["titles"][idx2]["title"] = translated
  127. elif loc_type == "rss_items" and rss_items:
  128. rss_items[idx1]["title"] = translated
  129. elif loc_type == "rss_new_items" and rss_new_items:
  130. rss_new_items[idx1]["title"] = translated
  131. return report_data, rss_items, rss_new_items
  132. def dispatch_all(
  133. self,
  134. report_data: Dict,
  135. report_type: str,
  136. update_info: Optional[Dict] = None,
  137. proxy_url: Optional[str] = None,
  138. mode: str = "daily",
  139. html_file_path: Optional[str] = None,
  140. rss_items: Optional[List[Dict]] = None,
  141. rss_new_items: Optional[List[Dict]] = None,
  142. ai_analysis: Optional[AIAnalysisResult] = None,
  143. standalone_data: Optional[Dict] = None,
  144. ) -> Dict[str, bool]:
  145. """
  146. 分发通知到所有已配置的渠道(支持热榜+RSS合并推送+AI分析+独立展示区)
  147. Args:
  148. report_data: 报告数据(由 prepare_report_data 生成)
  149. report_type: 报告类型(如 "当日汇总"、"实时增量")
  150. update_info: 版本更新信息(可选)
  151. proxy_url: 代理 URL(可选)
  152. mode: 报告模式 (daily/current/incremental)
  153. html_file_path: HTML 报告文件路径(邮件使用)
  154. rss_items: RSS 统计条目列表(用于 RSS 统计区块)
  155. rss_new_items: RSS 新增条目列表(用于 RSS 新增区块)
  156. ai_analysis: AI 分析结果(可选)
  157. standalone_data: 独立展示区数据(可选)
  158. Returns:
  159. Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功
  160. """
  161. results = {}
  162. # 获取区域显示配置
  163. display_regions = self.config.get("DISPLAY", {}).get("REGIONS", {})
  164. # 执行翻译(如果启用)
  165. report_data, rss_items, rss_new_items = self._translate_content(
  166. report_data, rss_items, rss_new_items
  167. )
  168. # 飞书
  169. if self.config.get("FEISHU_WEBHOOK_URL"):
  170. results["feishu"] = self._send_feishu(
  171. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  172. ai_analysis, display_regions, standalone_data
  173. )
  174. # 钉钉
  175. if self.config.get("DINGTALK_WEBHOOK_URL"):
  176. results["dingtalk"] = self._send_dingtalk(
  177. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  178. ai_analysis, display_regions, standalone_data
  179. )
  180. # 企业微信
  181. if self.config.get("WEWORK_WEBHOOK_URL"):
  182. results["wework"] = self._send_wework(
  183. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  184. ai_analysis, display_regions, standalone_data
  185. )
  186. # Telegram(需要配对验证)
  187. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  188. results["telegram"] = self._send_telegram(
  189. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  190. ai_analysis, display_regions, standalone_data
  191. )
  192. # ntfy(需要配对验证)
  193. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  194. results["ntfy"] = self._send_ntfy(
  195. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  196. ai_analysis, display_regions, standalone_data
  197. )
  198. # Bark
  199. if self.config.get("BARK_URL"):
  200. results["bark"] = self._send_bark(
  201. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  202. ai_analysis, display_regions, standalone_data
  203. )
  204. # Slack
  205. if self.config.get("SLACK_WEBHOOK_URL"):
  206. results["slack"] = self._send_slack(
  207. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  208. ai_analysis, display_regions, standalone_data
  209. )
  210. # 通用 Webhook
  211. if self.config.get("GENERIC_WEBHOOK_URL"):
  212. results["generic_webhook"] = self._send_generic_webhook(
  213. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  214. ai_analysis, display_regions, standalone_data
  215. )
  216. # 邮件(保持原有逻辑,已支持多收件人,AI 分析已嵌入 HTML)
  217. if (
  218. self.config.get("EMAIL_FROM")
  219. and self.config.get("EMAIL_PASSWORD")
  220. and self.config.get("EMAIL_TO")
  221. ):
  222. results["email"] = self._send_email(report_type, html_file_path)
  223. return results
  224. def _send_to_multi_accounts(
  225. self,
  226. channel_name: str,
  227. config_value: str,
  228. send_func: Callable[..., bool],
  229. **kwargs,
  230. ) -> bool:
  231. """
  232. 通用多账号发送逻辑
  233. Args:
  234. channel_name: 渠道名称(用于日志和账号数量限制提示)
  235. config_value: 配置值(可能包含多个账号,用 ; 分隔)
  236. send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool
  237. **kwargs: 传递给发送函数的其他参数
  238. Returns:
  239. bool: 任一账号发送成功则返回 True
  240. """
  241. accounts = parse_multi_account_config(config_value)
  242. if not accounts:
  243. return False
  244. accounts = limit_accounts(accounts, self.max_accounts, channel_name)
  245. results = []
  246. for i, account in enumerate(accounts):
  247. if account:
  248. account_label = f"账号{i+1}" if len(accounts) > 1 else ""
  249. result = send_func(account, account_label=account_label, **kwargs)
  250. results.append(result)
  251. return any(results) if results else False
  252. def _send_feishu(
  253. self,
  254. report_data: Dict,
  255. report_type: str,
  256. update_info: Optional[Dict],
  257. proxy_url: Optional[str],
  258. mode: str,
  259. rss_items: Optional[List[Dict]] = None,
  260. rss_new_items: Optional[List[Dict]] = None,
  261. ai_analysis: Optional[AIAnalysisResult] = None,
  262. display_regions: Optional[Dict] = None,
  263. standalone_data: Optional[Dict] = None,
  264. ) -> bool:
  265. """发送到飞书(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  266. display_regions = display_regions or {}
  267. # 根据区域开关决定是否发送对应内容
  268. if not display_regions.get("HOTLIST", True):
  269. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  270. return self._send_to_multi_accounts(
  271. channel_name="飞书",
  272. config_value=self.config["FEISHU_WEBHOOK_URL"],
  273. send_func=lambda url, account_label: send_to_feishu(
  274. webhook_url=url,
  275. report_data=report_data,
  276. report_type=report_type,
  277. update_info=update_info,
  278. proxy_url=proxy_url,
  279. mode=mode,
  280. account_label=account_label,
  281. batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000),
  282. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  283. split_content_func=self.split_content_func,
  284. get_time_func=self.get_time_func,
  285. rss_items=rss_items if display_regions.get("RSS", True) else None,
  286. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  287. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  288. display_regions=display_regions,
  289. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  290. ),
  291. )
  292. def _send_dingtalk(
  293. self,
  294. report_data: Dict,
  295. report_type: str,
  296. update_info: Optional[Dict],
  297. proxy_url: Optional[str],
  298. mode: str,
  299. rss_items: Optional[List[Dict]] = None,
  300. rss_new_items: Optional[List[Dict]] = None,
  301. ai_analysis: Optional[AIAnalysisResult] = None,
  302. display_regions: Optional[Dict] = None,
  303. standalone_data: Optional[Dict] = None,
  304. ) -> bool:
  305. """发送到钉钉(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  306. display_regions = display_regions or {}
  307. if not display_regions.get("HOTLIST", True):
  308. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  309. return self._send_to_multi_accounts(
  310. channel_name="钉钉",
  311. config_value=self.config["DINGTALK_WEBHOOK_URL"],
  312. send_func=lambda url, account_label: send_to_dingtalk(
  313. webhook_url=url,
  314. report_data=report_data,
  315. report_type=report_type,
  316. update_info=update_info,
  317. proxy_url=proxy_url,
  318. mode=mode,
  319. account_label=account_label,
  320. batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000),
  321. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  322. split_content_func=self.split_content_func,
  323. rss_items=rss_items if display_regions.get("RSS", True) else None,
  324. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  325. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  326. display_regions=display_regions,
  327. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  328. ),
  329. )
  330. def _send_wework(
  331. self,
  332. report_data: Dict,
  333. report_type: str,
  334. update_info: Optional[Dict],
  335. proxy_url: Optional[str],
  336. mode: str,
  337. rss_items: Optional[List[Dict]] = None,
  338. rss_new_items: Optional[List[Dict]] = None,
  339. ai_analysis: Optional[AIAnalysisResult] = None,
  340. display_regions: Optional[Dict] = None,
  341. standalone_data: Optional[Dict] = None,
  342. ) -> bool:
  343. """发送到企业微信(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  344. display_regions = display_regions or {}
  345. if not display_regions.get("HOTLIST", True):
  346. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  347. return self._send_to_multi_accounts(
  348. channel_name="企业微信",
  349. config_value=self.config["WEWORK_WEBHOOK_URL"],
  350. send_func=lambda url, account_label: send_to_wework(
  351. webhook_url=url,
  352. report_data=report_data,
  353. report_type=report_type,
  354. update_info=update_info,
  355. proxy_url=proxy_url,
  356. mode=mode,
  357. account_label=account_label,
  358. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  359. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  360. msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"),
  361. split_content_func=self.split_content_func,
  362. rss_items=rss_items if display_regions.get("RSS", True) else None,
  363. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  364. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  365. display_regions=display_regions,
  366. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  367. ),
  368. )
  369. def _send_telegram(
  370. self,
  371. report_data: Dict,
  372. report_type: str,
  373. update_info: Optional[Dict],
  374. proxy_url: Optional[str],
  375. mode: str,
  376. rss_items: Optional[List[Dict]] = None,
  377. rss_new_items: Optional[List[Dict]] = None,
  378. ai_analysis: Optional[AIAnalysisResult] = None,
  379. display_regions: Optional[Dict] = None,
  380. standalone_data: Optional[Dict] = None,
  381. ) -> bool:
  382. """发送到 Telegram(多账号,需验证 token 和 chat_id 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  383. display_regions = display_regions or {}
  384. if not display_regions.get("HOTLIST", True):
  385. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  386. telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  387. telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  388. if not telegram_tokens or not telegram_chat_ids:
  389. return False
  390. # 验证配对
  391. valid, count = validate_paired_configs(
  392. {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids},
  393. "Telegram",
  394. required_keys=["bot_token", "chat_id"],
  395. )
  396. if not valid or count == 0:
  397. return False
  398. # 限制账号数量
  399. telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram")
  400. telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)]
  401. results = []
  402. for i in range(len(telegram_tokens)):
  403. token = telegram_tokens[i]
  404. chat_id = telegram_chat_ids[i]
  405. if token and chat_id:
  406. account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else ""
  407. result = send_to_telegram(
  408. bot_token=token,
  409. chat_id=chat_id,
  410. report_data=report_data,
  411. report_type=report_type,
  412. update_info=update_info,
  413. proxy_url=proxy_url,
  414. mode=mode,
  415. account_label=account_label,
  416. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  417. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  418. split_content_func=self.split_content_func,
  419. rss_items=rss_items if display_regions.get("RSS", True) else None,
  420. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  421. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  422. display_regions=display_regions,
  423. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  424. )
  425. results.append(result)
  426. return any(results) if results else False
  427. def _send_ntfy(
  428. self,
  429. report_data: Dict,
  430. report_type: str,
  431. update_info: Optional[Dict],
  432. proxy_url: Optional[str],
  433. mode: str,
  434. rss_items: Optional[List[Dict]] = None,
  435. rss_new_items: Optional[List[Dict]] = None,
  436. ai_analysis: Optional[AIAnalysisResult] = None,
  437. display_regions: Optional[Dict] = None,
  438. standalone_data: Optional[Dict] = None,
  439. ) -> bool:
  440. """发送到 ntfy(多账号,需验证 topic 和 token 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  441. display_regions = display_regions or {}
  442. if not display_regions.get("HOTLIST", True):
  443. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  444. ntfy_server_url = self.config["NTFY_SERVER_URL"]
  445. ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  446. ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  447. if not ntfy_server_url or not ntfy_topics:
  448. return False
  449. # 验证 token 和 topic 数量一致(如果配置了 token)
  450. if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics):
  451. print(
  452. f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送"
  453. )
  454. return False
  455. # 限制账号数量
  456. ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy")
  457. if ntfy_tokens:
  458. ntfy_tokens = ntfy_tokens[: len(ntfy_topics)]
  459. results = []
  460. for i, topic in enumerate(ntfy_topics):
  461. if topic:
  462. token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else ""
  463. account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else ""
  464. result = send_to_ntfy(
  465. server_url=ntfy_server_url,
  466. topic=topic,
  467. token=token,
  468. report_data=report_data,
  469. report_type=report_type,
  470. update_info=update_info,
  471. proxy_url=proxy_url,
  472. mode=mode,
  473. account_label=account_label,
  474. batch_size=3800,
  475. split_content_func=self.split_content_func,
  476. rss_items=rss_items if display_regions.get("RSS", True) else None,
  477. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  478. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  479. display_regions=display_regions,
  480. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  481. )
  482. results.append(result)
  483. return any(results) if results else False
  484. def _send_bark(
  485. self,
  486. report_data: Dict,
  487. report_type: str,
  488. update_info: Optional[Dict],
  489. proxy_url: Optional[str],
  490. mode: str,
  491. rss_items: Optional[List[Dict]] = None,
  492. rss_new_items: Optional[List[Dict]] = None,
  493. ai_analysis: Optional[AIAnalysisResult] = None,
  494. display_regions: Optional[Dict] = None,
  495. standalone_data: Optional[Dict] = None,
  496. ) -> bool:
  497. """发送到 Bark(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  498. display_regions = display_regions or {}
  499. if not display_regions.get("HOTLIST", True):
  500. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  501. return self._send_to_multi_accounts(
  502. channel_name="Bark",
  503. config_value=self.config["BARK_URL"],
  504. send_func=lambda url, account_label: send_to_bark(
  505. bark_url=url,
  506. report_data=report_data,
  507. report_type=report_type,
  508. update_info=update_info,
  509. proxy_url=proxy_url,
  510. mode=mode,
  511. account_label=account_label,
  512. batch_size=self.config.get("BARK_BATCH_SIZE", 3600),
  513. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  514. split_content_func=self.split_content_func,
  515. rss_items=rss_items if display_regions.get("RSS", True) else None,
  516. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  517. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  518. display_regions=display_regions,
  519. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  520. ),
  521. )
  522. def _send_slack(
  523. self,
  524. report_data: Dict,
  525. report_type: str,
  526. update_info: Optional[Dict],
  527. proxy_url: Optional[str],
  528. mode: str,
  529. rss_items: Optional[List[Dict]] = None,
  530. rss_new_items: Optional[List[Dict]] = None,
  531. ai_analysis: Optional[AIAnalysisResult] = None,
  532. display_regions: Optional[Dict] = None,
  533. standalone_data: Optional[Dict] = None,
  534. ) -> bool:
  535. """发送到 Slack(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  536. display_regions = display_regions or {}
  537. if not display_regions.get("HOTLIST", True):
  538. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  539. return self._send_to_multi_accounts(
  540. channel_name="Slack",
  541. config_value=self.config["SLACK_WEBHOOK_URL"],
  542. send_func=lambda url, account_label: send_to_slack(
  543. webhook_url=url,
  544. report_data=report_data,
  545. report_type=report_type,
  546. update_info=update_info,
  547. proxy_url=proxy_url,
  548. mode=mode,
  549. account_label=account_label,
  550. batch_size=self.config.get("SLACK_BATCH_SIZE", 4000),
  551. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  552. split_content_func=self.split_content_func,
  553. rss_items=rss_items if display_regions.get("RSS", True) else None,
  554. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  555. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  556. display_regions=display_regions,
  557. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  558. ),
  559. )
  560. def _send_generic_webhook(
  561. self,
  562. report_data: Dict,
  563. report_type: str,
  564. update_info: Optional[Dict],
  565. proxy_url: Optional[str],
  566. mode: str,
  567. rss_items: Optional[List[Dict]] = None,
  568. rss_new_items: Optional[List[Dict]] = None,
  569. ai_analysis: Optional[AIAnalysisResult] = None,
  570. display_regions: Optional[Dict] = None,
  571. standalone_data: Optional[Dict] = None,
  572. ) -> bool:
  573. """发送到通用 Webhook(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  574. display_regions = display_regions or {}
  575. if not display_regions.get("HOTLIST", True):
  576. report_data = {"stats": [], "failed_ids": [], "new_titles": [], "id_to_name": {}}
  577. urls = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_URL", ""))
  578. templates = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_TEMPLATE", ""))
  579. if not urls:
  580. return False
  581. urls = limit_accounts(urls, self.max_accounts, "通用Webhook")
  582. results = []
  583. for i, url in enumerate(urls):
  584. if not url:
  585. continue
  586. template = ""
  587. if templates:
  588. if i < len(templates):
  589. template = templates[i]
  590. elif len(templates) == 1:
  591. template = templates[0] # 共用一个模板
  592. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  593. result = send_to_generic_webhook(
  594. webhook_url=url,
  595. payload_template=template,
  596. report_data=report_data,
  597. report_type=report_type,
  598. update_info=update_info,
  599. proxy_url=proxy_url,
  600. mode=mode,
  601. account_label=account_label,
  602. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  603. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  604. split_content_func=self.split_content_func,
  605. rss_items=rss_items if display_regions.get("RSS", True) else None,
  606. rss_new_items=rss_new_items if display_regions.get("RSS", True) else None,
  607. ai_analysis=ai_analysis if display_regions.get("AI_ANALYSIS", True) else None,
  608. display_regions=display_regions,
  609. standalone_data=standalone_data if display_regions.get("STANDALONE", False) else None,
  610. )
  611. results.append(result)
  612. return any(results) if results else False
  613. def _send_email(
  614. self,
  615. report_type: str,
  616. html_file_path: Optional[str],
  617. ) -> bool:
  618. """发送邮件(保持原有逻辑,已支持多收件人)
  619. Note:
  620. AI 分析内容已在 HTML 生成时嵌入,无需在此传递
  621. """
  622. return send_to_email(
  623. from_email=self.config["EMAIL_FROM"],
  624. password=self.config["EMAIL_PASSWORD"],
  625. to_email=self.config["EMAIL_TO"],
  626. report_type=report_type,
  627. html_file_path=html_file_path,
  628. custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""),
  629. custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""),
  630. get_time_func=self.get_time_func,
  631. )
  632. # === RSS 通知方法 ===
  633. def dispatch_rss(
  634. self,
  635. rss_items: List[Dict],
  636. feeds_info: Optional[Dict[str, str]] = None,
  637. proxy_url: Optional[str] = None,
  638. html_file_path: Optional[str] = None,
  639. ) -> Dict[str, bool]:
  640. """
  641. 分发 RSS 通知到所有已配置的渠道
  642. Args:
  643. rss_items: RSS 条目列表,每个条目包含:
  644. - title: 标题
  645. - feed_id: RSS 源 ID
  646. - feed_name: RSS 源名称
  647. - url: 链接
  648. - published_at: 发布时间
  649. - summary: 摘要(可选)
  650. - author: 作者(可选)
  651. feeds_info: RSS 源 ID 到名称的映射
  652. proxy_url: 代理 URL(可选)
  653. html_file_path: HTML 报告文件路径(邮件使用)
  654. Returns:
  655. Dict[str, bool]: 每个渠道的发送结果
  656. """
  657. if not rss_items:
  658. print("[RSS通知] 没有 RSS 内容,跳过通知")
  659. return {}
  660. results = {}
  661. report_type = "RSS 订阅更新"
  662. # 飞书
  663. if self.config.get("FEISHU_WEBHOOK_URL"):
  664. results["feishu"] = self._send_rss_feishu(
  665. rss_items, feeds_info, proxy_url
  666. )
  667. # 钉钉
  668. if self.config.get("DINGTALK_WEBHOOK_URL"):
  669. results["dingtalk"] = self._send_rss_dingtalk(
  670. rss_items, feeds_info, proxy_url
  671. )
  672. # 企业微信
  673. if self.config.get("WEWORK_WEBHOOK_URL"):
  674. results["wework"] = self._send_rss_markdown(
  675. rss_items, feeds_info, proxy_url, "wework"
  676. )
  677. # Telegram
  678. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  679. results["telegram"] = self._send_rss_markdown(
  680. rss_items, feeds_info, proxy_url, "telegram"
  681. )
  682. # ntfy
  683. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  684. results["ntfy"] = self._send_rss_markdown(
  685. rss_items, feeds_info, proxy_url, "ntfy"
  686. )
  687. # Bark
  688. if self.config.get("BARK_URL"):
  689. results["bark"] = self._send_rss_markdown(
  690. rss_items, feeds_info, proxy_url, "bark"
  691. )
  692. # Slack
  693. if self.config.get("SLACK_WEBHOOK_URL"):
  694. results["slack"] = self._send_rss_markdown(
  695. rss_items, feeds_info, proxy_url, "slack"
  696. )
  697. # 邮件
  698. if (
  699. self.config.get("EMAIL_FROM")
  700. and self.config.get("EMAIL_PASSWORD")
  701. and self.config.get("EMAIL_TO")
  702. ):
  703. results["email"] = self._send_email(report_type, html_file_path)
  704. return results
  705. def _send_rss_feishu(
  706. self,
  707. rss_items: List[Dict],
  708. feeds_info: Optional[Dict[str, str]],
  709. proxy_url: Optional[str],
  710. ) -> bool:
  711. """发送 RSS 到飞书"""
  712. import requests
  713. content = render_rss_feishu_content(
  714. rss_items=rss_items,
  715. feeds_info=feeds_info,
  716. get_time_func=self.get_time_func,
  717. )
  718. webhooks = parse_multi_account_config(self.config["FEISHU_WEBHOOK_URL"])
  719. webhooks = limit_accounts(webhooks, self.max_accounts, "飞书")
  720. results = []
  721. for i, webhook_url in enumerate(webhooks):
  722. if not webhook_url:
  723. continue
  724. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  725. try:
  726. # 分批发送
  727. batches = self.split_content_func(
  728. content, self.config.get("FEISHU_BATCH_SIZE", 29000)
  729. )
  730. for batch_idx, batch_content in enumerate(batches):
  731. payload = {
  732. "msg_type": "interactive",
  733. "card": {
  734. "header": {
  735. "title": {
  736. "tag": "plain_text",
  737. "content": f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}",
  738. },
  739. "template": "green",
  740. },
  741. "elements": [
  742. {"tag": "markdown", "content": batch_content}
  743. ],
  744. },
  745. }
  746. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  747. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  748. resp.raise_for_status()
  749. print(f"✅ 飞书{account_label} RSS 通知发送成功")
  750. results.append(True)
  751. except Exception as e:
  752. print(f"❌ 飞书{account_label} RSS 通知发送失败: {e}")
  753. results.append(False)
  754. return any(results) if results else False
  755. def _send_rss_dingtalk(
  756. self,
  757. rss_items: List[Dict],
  758. feeds_info: Optional[Dict[str, str]],
  759. proxy_url: Optional[str],
  760. ) -> bool:
  761. """发送 RSS 到钉钉"""
  762. import requests
  763. content = render_rss_dingtalk_content(
  764. rss_items=rss_items,
  765. feeds_info=feeds_info,
  766. get_time_func=self.get_time_func,
  767. )
  768. webhooks = parse_multi_account_config(self.config["DINGTALK_WEBHOOK_URL"])
  769. webhooks = limit_accounts(webhooks, self.max_accounts, "钉钉")
  770. results = []
  771. for i, webhook_url in enumerate(webhooks):
  772. if not webhook_url:
  773. continue
  774. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  775. try:
  776. batches = self.split_content_func(
  777. content, self.config.get("DINGTALK_BATCH_SIZE", 20000)
  778. )
  779. for batch_idx, batch_content in enumerate(batches):
  780. title = f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}"
  781. payload = {
  782. "msgtype": "markdown",
  783. "markdown": {
  784. "title": title,
  785. "text": batch_content,
  786. },
  787. }
  788. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  789. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  790. resp.raise_for_status()
  791. print(f"✅ 钉钉{account_label} RSS 通知发送成功")
  792. results.append(True)
  793. except Exception as e:
  794. print(f"❌ 钉钉{account_label} RSS 通知发送失败: {e}")
  795. results.append(False)
  796. return any(results) if results else False
  797. def _send_rss_markdown(
  798. self,
  799. rss_items: List[Dict],
  800. feeds_info: Optional[Dict[str, str]],
  801. proxy_url: Optional[str],
  802. channel: str,
  803. ) -> bool:
  804. """发送 RSS 到 Markdown 兼容渠道(企业微信、Telegram、ntfy、Bark、Slack)"""
  805. import requests
  806. content = render_rss_markdown_content(
  807. rss_items=rss_items,
  808. feeds_info=feeds_info,
  809. get_time_func=self.get_time_func,
  810. )
  811. try:
  812. if channel == "wework":
  813. return self._send_rss_wework(content, proxy_url)
  814. elif channel == "telegram":
  815. return self._send_rss_telegram(content, proxy_url)
  816. elif channel == "ntfy":
  817. return self._send_rss_ntfy(content, proxy_url)
  818. elif channel == "bark":
  819. return self._send_rss_bark(content, proxy_url)
  820. elif channel == "slack":
  821. return self._send_rss_slack(content, proxy_url)
  822. except Exception as e:
  823. print(f"❌ {channel} RSS 通知发送失败: {e}")
  824. return False
  825. return False
  826. def _send_rss_wework(self, content: str, proxy_url: Optional[str]) -> bool:
  827. """发送 RSS 到企业微信"""
  828. import requests
  829. webhooks = parse_multi_account_config(self.config["WEWORK_WEBHOOK_URL"])
  830. webhooks = limit_accounts(webhooks, self.max_accounts, "企业微信")
  831. results = []
  832. for i, webhook_url in enumerate(webhooks):
  833. if not webhook_url:
  834. continue
  835. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  836. try:
  837. batches = self.split_content_func(
  838. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  839. )
  840. for batch_content in batches:
  841. payload = {
  842. "msgtype": "markdown",
  843. "markdown": {"content": batch_content},
  844. }
  845. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  846. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  847. resp.raise_for_status()
  848. print(f"✅ 企业微信{account_label} RSS 通知发送成功")
  849. results.append(True)
  850. except Exception as e:
  851. print(f"❌ 企业微信{account_label} RSS 通知发送失败: {e}")
  852. results.append(False)
  853. return any(results) if results else False
  854. def _send_rss_telegram(self, content: str, proxy_url: Optional[str]) -> bool:
  855. """发送 RSS 到 Telegram"""
  856. import requests
  857. tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  858. chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  859. if not tokens or not chat_ids:
  860. return False
  861. results = []
  862. for i in range(min(len(tokens), len(chat_ids), self.max_accounts)):
  863. token = tokens[i]
  864. chat_id = chat_ids[i]
  865. if not token or not chat_id:
  866. continue
  867. account_label = f"账号{i+1}" if len(tokens) > 1 else ""
  868. try:
  869. batches = self.split_content_func(
  870. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  871. )
  872. for batch_content in batches:
  873. url = f"https://api.telegram.org/bot{token}/sendMessage"
  874. payload = {
  875. "chat_id": chat_id,
  876. "text": batch_content,
  877. "parse_mode": "Markdown",
  878. }
  879. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  880. resp = requests.post(url, json=payload, proxies=proxies, timeout=30)
  881. resp.raise_for_status()
  882. print(f"✅ Telegram{account_label} RSS 通知发送成功")
  883. results.append(True)
  884. except Exception as e:
  885. print(f"❌ Telegram{account_label} RSS 通知发送失败: {e}")
  886. results.append(False)
  887. return any(results) if results else False
  888. def _send_rss_ntfy(self, content: str, proxy_url: Optional[str]) -> bool:
  889. """发送 RSS 到 ntfy"""
  890. import requests
  891. server_url = self.config["NTFY_SERVER_URL"]
  892. topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  893. tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  894. if not server_url or not topics:
  895. return False
  896. topics = limit_accounts(topics, self.max_accounts, "ntfy")
  897. results = []
  898. for i, topic in enumerate(topics):
  899. if not topic:
  900. continue
  901. token = tokens[i] if tokens and i < len(tokens) else ""
  902. account_label = f"账号{i+1}" if len(topics) > 1 else ""
  903. try:
  904. batches = self.split_content_func(content, 3800)
  905. for batch_content in batches:
  906. url = f"{server_url.rstrip('/')}/{topic}"
  907. headers = {"Title": "RSS 订阅更新", "Markdown": "yes"}
  908. if token:
  909. headers["Authorization"] = f"Bearer {token}"
  910. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  911. resp = requests.post(
  912. url, data=batch_content.encode("utf-8"),
  913. headers=headers, proxies=proxies, timeout=30
  914. )
  915. resp.raise_for_status()
  916. print(f"✅ ntfy{account_label} RSS 通知发送成功")
  917. results.append(True)
  918. except Exception as e:
  919. print(f"❌ ntfy{account_label} RSS 通知发送失败: {e}")
  920. results.append(False)
  921. return any(results) if results else False
  922. def _send_rss_bark(self, content: str, proxy_url: Optional[str]) -> bool:
  923. """发送 RSS 到 Bark"""
  924. import requests
  925. import urllib.parse
  926. urls = parse_multi_account_config(self.config["BARK_URL"])
  927. urls = limit_accounts(urls, self.max_accounts, "Bark")
  928. results = []
  929. for i, bark_url in enumerate(urls):
  930. if not bark_url:
  931. continue
  932. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  933. try:
  934. batches = self.split_content_func(
  935. content, self.config.get("BARK_BATCH_SIZE", 3600)
  936. )
  937. for batch_content in batches:
  938. title = urllib.parse.quote("📰 RSS 订阅更新")
  939. body = urllib.parse.quote(batch_content)
  940. url = f"{bark_url.rstrip('/')}/{title}/{body}"
  941. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  942. resp = requests.get(url, proxies=proxies, timeout=30)
  943. resp.raise_for_status()
  944. print(f"✅ Bark{account_label} RSS 通知发送成功")
  945. results.append(True)
  946. except Exception as e:
  947. print(f"❌ Bark{account_label} RSS 通知发送失败: {e}")
  948. results.append(False)
  949. return any(results) if results else False
  950. def _send_rss_slack(self, content: str, proxy_url: Optional[str]) -> bool:
  951. """发送 RSS 到 Slack"""
  952. import requests
  953. webhooks = parse_multi_account_config(self.config["SLACK_WEBHOOK_URL"])
  954. webhooks = limit_accounts(webhooks, self.max_accounts, "Slack")
  955. results = []
  956. for i, webhook_url in enumerate(webhooks):
  957. if not webhook_url:
  958. continue
  959. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  960. try:
  961. batches = self.split_content_func(
  962. content, self.config.get("SLACK_BATCH_SIZE", 4000)
  963. )
  964. for batch_content in batches:
  965. payload = {
  966. "blocks": [
  967. {
  968. "type": "section",
  969. "text": {
  970. "type": "mrkdwn",
  971. "text": batch_content,
  972. },
  973. }
  974. ]
  975. }
  976. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  977. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  978. resp.raise_for_status()
  979. print(f"✅ Slack{account_label} RSS 通知发送成功")
  980. results.append(True)
  981. except Exception as e:
  982. print(f"❌ Slack{account_label} RSS 通知发送失败: {e}")
  983. results.append(False)
  984. return any(results) if results else False