dispatcher.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. # coding=utf-8
  2. """
  3. 通知调度器模块
  4. 提供统一的通知分发接口。
  5. 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。
  6. 使用示例:
  7. dispatcher = NotificationDispatcher(config, get_time_func, split_content_func)
  8. results = dispatcher.dispatch_all(report_data, report_type, ...)
  9. """
  10. from typing import Any, Callable, Dict, List, Optional
  11. from trendradar.core.config import (
  12. get_account_at_index,
  13. limit_accounts,
  14. parse_multi_account_config,
  15. validate_paired_configs,
  16. )
  17. from .senders import (
  18. send_to_bark,
  19. send_to_dingtalk,
  20. send_to_email,
  21. send_to_feishu,
  22. send_to_ntfy,
  23. send_to_slack,
  24. send_to_telegram,
  25. send_to_wework,
  26. )
  27. from .renderer import (
  28. render_rss_feishu_content,
  29. render_rss_dingtalk_content,
  30. render_rss_markdown_content,
  31. )
  32. class NotificationDispatcher:
  33. """
  34. 统一的多账号通知调度器
  35. 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。
  36. 内部处理账号解析、数量限制、配对验证等逻辑。
  37. """
  38. def __init__(
  39. self,
  40. config: Dict[str, Any],
  41. get_time_func: Callable,
  42. split_content_func: Callable,
  43. ):
  44. """
  45. 初始化通知调度器
  46. Args:
  47. config: 完整的配置字典,包含所有通知渠道的配置
  48. get_time_func: 获取当前时间的函数
  49. split_content_func: 内容分批函数
  50. """
  51. self.config = config
  52. self.get_time_func = get_time_func
  53. self.split_content_func = split_content_func
  54. self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3)
  55. def dispatch_all(
  56. self,
  57. report_data: Dict,
  58. report_type: str,
  59. update_info: Optional[Dict] = None,
  60. proxy_url: Optional[str] = None,
  61. mode: str = "daily",
  62. html_file_path: Optional[str] = None,
  63. rss_items: Optional[List[Dict]] = None,
  64. rss_new_items: Optional[List[Dict]] = None,
  65. ) -> Dict[str, bool]:
  66. """
  67. 分发通知到所有已配置的渠道(支持热榜+RSS合并推送)
  68. Args:
  69. report_data: 报告数据(由 prepare_report_data 生成)
  70. report_type: 报告类型(如 "当日汇总"、"实时增量")
  71. update_info: 版本更新信息(可选)
  72. proxy_url: 代理 URL(可选)
  73. mode: 报告模式 (daily/current/incremental)
  74. html_file_path: HTML 报告文件路径(邮件使用)
  75. rss_items: RSS 统计条目列表(用于 RSS 统计区块)
  76. rss_new_items: RSS 新增条目列表(用于 RSS 新增区块)
  77. Returns:
  78. Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功
  79. """
  80. results = {}
  81. # 飞书
  82. if self.config.get("FEISHU_WEBHOOK_URL"):
  83. results["feishu"] = self._send_feishu(
  84. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  85. )
  86. # 钉钉
  87. if self.config.get("DINGTALK_WEBHOOK_URL"):
  88. results["dingtalk"] = self._send_dingtalk(
  89. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  90. )
  91. # 企业微信
  92. if self.config.get("WEWORK_WEBHOOK_URL"):
  93. results["wework"] = self._send_wework(
  94. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  95. )
  96. # Telegram(需要配对验证)
  97. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  98. results["telegram"] = self._send_telegram(
  99. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  100. )
  101. # ntfy(需要配对验证)
  102. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  103. results["ntfy"] = self._send_ntfy(
  104. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  105. )
  106. # Bark
  107. if self.config.get("BARK_URL"):
  108. results["bark"] = self._send_bark(
  109. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  110. )
  111. # Slack
  112. if self.config.get("SLACK_WEBHOOK_URL"):
  113. results["slack"] = self._send_slack(
  114. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items
  115. )
  116. # 邮件(保持原有逻辑,已支持多收件人)
  117. if (
  118. self.config.get("EMAIL_FROM")
  119. and self.config.get("EMAIL_PASSWORD")
  120. and self.config.get("EMAIL_TO")
  121. ):
  122. results["email"] = self._send_email(report_type, html_file_path)
  123. return results
  124. def _send_to_multi_accounts(
  125. self,
  126. channel_name: str,
  127. config_value: str,
  128. send_func: Callable[..., bool],
  129. **kwargs,
  130. ) -> bool:
  131. """
  132. 通用多账号发送逻辑
  133. Args:
  134. channel_name: 渠道名称(用于日志和账号数量限制提示)
  135. config_value: 配置值(可能包含多个账号,用 ; 分隔)
  136. send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool
  137. **kwargs: 传递给发送函数的其他参数
  138. Returns:
  139. bool: 任一账号发送成功则返回 True
  140. """
  141. accounts = parse_multi_account_config(config_value)
  142. if not accounts:
  143. return False
  144. accounts = limit_accounts(accounts, self.max_accounts, channel_name)
  145. results = []
  146. for i, account in enumerate(accounts):
  147. if account:
  148. account_label = f"账号{i+1}" if len(accounts) > 1 else ""
  149. result = send_func(account, account_label=account_label, **kwargs)
  150. results.append(result)
  151. return any(results) if results else False
  152. def _send_feishu(
  153. self,
  154. report_data: Dict,
  155. report_type: str,
  156. update_info: Optional[Dict],
  157. proxy_url: Optional[str],
  158. mode: str,
  159. rss_items: Optional[List[Dict]] = None,
  160. rss_new_items: Optional[List[Dict]] = None,
  161. ) -> bool:
  162. """发送到飞书(多账号,支持热榜+RSS合并)"""
  163. return self._send_to_multi_accounts(
  164. channel_name="飞书",
  165. config_value=self.config["FEISHU_WEBHOOK_URL"],
  166. send_func=lambda url, account_label: send_to_feishu(
  167. webhook_url=url,
  168. report_data=report_data,
  169. report_type=report_type,
  170. update_info=update_info,
  171. proxy_url=proxy_url,
  172. mode=mode,
  173. account_label=account_label,
  174. batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000),
  175. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  176. split_content_func=self.split_content_func,
  177. get_time_func=self.get_time_func,
  178. rss_items=rss_items,
  179. rss_new_items=rss_new_items,
  180. ),
  181. )
  182. def _send_dingtalk(
  183. self,
  184. report_data: Dict,
  185. report_type: str,
  186. update_info: Optional[Dict],
  187. proxy_url: Optional[str],
  188. mode: str,
  189. rss_items: Optional[List[Dict]] = None,
  190. rss_new_items: Optional[List[Dict]] = None,
  191. ) -> bool:
  192. """发送到钉钉(多账号,支持热榜+RSS合并)"""
  193. return self._send_to_multi_accounts(
  194. channel_name="钉钉",
  195. config_value=self.config["DINGTALK_WEBHOOK_URL"],
  196. send_func=lambda url, account_label: send_to_dingtalk(
  197. webhook_url=url,
  198. report_data=report_data,
  199. report_type=report_type,
  200. update_info=update_info,
  201. proxy_url=proxy_url,
  202. mode=mode,
  203. account_label=account_label,
  204. batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000),
  205. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  206. split_content_func=self.split_content_func,
  207. rss_items=rss_items,
  208. rss_new_items=rss_new_items,
  209. ),
  210. )
  211. def _send_wework(
  212. self,
  213. report_data: Dict,
  214. report_type: str,
  215. update_info: Optional[Dict],
  216. proxy_url: Optional[str],
  217. mode: str,
  218. rss_items: Optional[List[Dict]] = None,
  219. rss_new_items: Optional[List[Dict]] = None,
  220. ) -> bool:
  221. """发送到企业微信(多账号,支持热榜+RSS合并)"""
  222. return self._send_to_multi_accounts(
  223. channel_name="企业微信",
  224. config_value=self.config["WEWORK_WEBHOOK_URL"],
  225. send_func=lambda url, account_label: send_to_wework(
  226. webhook_url=url,
  227. report_data=report_data,
  228. report_type=report_type,
  229. update_info=update_info,
  230. proxy_url=proxy_url,
  231. mode=mode,
  232. account_label=account_label,
  233. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  234. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  235. msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"),
  236. split_content_func=self.split_content_func,
  237. rss_items=rss_items,
  238. rss_new_items=rss_new_items,
  239. ),
  240. )
  241. def _send_telegram(
  242. self,
  243. report_data: Dict,
  244. report_type: str,
  245. update_info: Optional[Dict],
  246. proxy_url: Optional[str],
  247. mode: str,
  248. rss_items: Optional[List[Dict]] = None,
  249. rss_new_items: Optional[List[Dict]] = None,
  250. ) -> bool:
  251. """发送到 Telegram(多账号,需验证 token 和 chat_id 配对,支持热榜+RSS合并)"""
  252. telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  253. telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  254. if not telegram_tokens or not telegram_chat_ids:
  255. return False
  256. # 验证配对
  257. valid, count = validate_paired_configs(
  258. {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids},
  259. "Telegram",
  260. required_keys=["bot_token", "chat_id"],
  261. )
  262. if not valid or count == 0:
  263. return False
  264. # 限制账号数量
  265. telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram")
  266. telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)]
  267. results = []
  268. for i in range(len(telegram_tokens)):
  269. token = telegram_tokens[i]
  270. chat_id = telegram_chat_ids[i]
  271. if token and chat_id:
  272. account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else ""
  273. result = send_to_telegram(
  274. bot_token=token,
  275. chat_id=chat_id,
  276. report_data=report_data,
  277. report_type=report_type,
  278. update_info=update_info,
  279. proxy_url=proxy_url,
  280. mode=mode,
  281. account_label=account_label,
  282. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  283. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  284. split_content_func=self.split_content_func,
  285. rss_items=rss_items,
  286. rss_new_items=rss_new_items,
  287. )
  288. results.append(result)
  289. return any(results) if results else False
  290. def _send_ntfy(
  291. self,
  292. report_data: Dict,
  293. report_type: str,
  294. update_info: Optional[Dict],
  295. proxy_url: Optional[str],
  296. mode: str,
  297. rss_items: Optional[List[Dict]] = None,
  298. rss_new_items: Optional[List[Dict]] = None,
  299. ) -> bool:
  300. """发送到 ntfy(多账号,需验证 topic 和 token 配对,支持热榜+RSS合并)"""
  301. ntfy_server_url = self.config["NTFY_SERVER_URL"]
  302. ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  303. ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  304. if not ntfy_server_url or not ntfy_topics:
  305. return False
  306. # 验证 token 和 topic 数量一致(如果配置了 token)
  307. if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics):
  308. print(
  309. f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送"
  310. )
  311. return False
  312. # 限制账号数量
  313. ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy")
  314. if ntfy_tokens:
  315. ntfy_tokens = ntfy_tokens[: len(ntfy_topics)]
  316. results = []
  317. for i, topic in enumerate(ntfy_topics):
  318. if topic:
  319. token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else ""
  320. account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else ""
  321. result = send_to_ntfy(
  322. server_url=ntfy_server_url,
  323. topic=topic,
  324. token=token,
  325. report_data=report_data,
  326. report_type=report_type,
  327. update_info=update_info,
  328. proxy_url=proxy_url,
  329. mode=mode,
  330. account_label=account_label,
  331. batch_size=3800,
  332. split_content_func=self.split_content_func,
  333. rss_items=rss_items,
  334. rss_new_items=rss_new_items,
  335. )
  336. results.append(result)
  337. return any(results) if results else False
  338. def _send_bark(
  339. self,
  340. report_data: Dict,
  341. report_type: str,
  342. update_info: Optional[Dict],
  343. proxy_url: Optional[str],
  344. mode: str,
  345. rss_items: Optional[List[Dict]] = None,
  346. rss_new_items: Optional[List[Dict]] = None,
  347. ) -> bool:
  348. """发送到 Bark(多账号,支持热榜+RSS合并)"""
  349. return self._send_to_multi_accounts(
  350. channel_name="Bark",
  351. config_value=self.config["BARK_URL"],
  352. send_func=lambda url, account_label: send_to_bark(
  353. bark_url=url,
  354. report_data=report_data,
  355. report_type=report_type,
  356. update_info=update_info,
  357. proxy_url=proxy_url,
  358. mode=mode,
  359. account_label=account_label,
  360. batch_size=self.config.get("BARK_BATCH_SIZE", 3600),
  361. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  362. split_content_func=self.split_content_func,
  363. rss_items=rss_items,
  364. rss_new_items=rss_new_items,
  365. ),
  366. )
  367. def _send_slack(
  368. self,
  369. report_data: Dict,
  370. report_type: str,
  371. update_info: Optional[Dict],
  372. proxy_url: Optional[str],
  373. mode: str,
  374. rss_items: Optional[List[Dict]] = None,
  375. rss_new_items: Optional[List[Dict]] = None,
  376. ) -> bool:
  377. """发送到 Slack(多账号,支持热榜+RSS合并)"""
  378. return self._send_to_multi_accounts(
  379. channel_name="Slack",
  380. config_value=self.config["SLACK_WEBHOOK_URL"],
  381. send_func=lambda url, account_label: send_to_slack(
  382. webhook_url=url,
  383. report_data=report_data,
  384. report_type=report_type,
  385. update_info=update_info,
  386. proxy_url=proxy_url,
  387. mode=mode,
  388. account_label=account_label,
  389. batch_size=self.config.get("SLACK_BATCH_SIZE", 4000),
  390. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  391. split_content_func=self.split_content_func,
  392. rss_items=rss_items,
  393. rss_new_items=rss_new_items,
  394. ),
  395. )
  396. def _send_email(
  397. self,
  398. report_type: str,
  399. html_file_path: Optional[str],
  400. ) -> bool:
  401. """发送邮件(保持原有逻辑,已支持多收件人)"""
  402. return send_to_email(
  403. from_email=self.config["EMAIL_FROM"],
  404. password=self.config["EMAIL_PASSWORD"],
  405. to_email=self.config["EMAIL_TO"],
  406. report_type=report_type,
  407. html_file_path=html_file_path,
  408. custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""),
  409. custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""),
  410. get_time_func=self.get_time_func,
  411. )
  412. # === RSS 通知方法 ===
  413. def dispatch_rss(
  414. self,
  415. rss_items: List[Dict],
  416. feeds_info: Optional[Dict[str, str]] = None,
  417. proxy_url: Optional[str] = None,
  418. html_file_path: Optional[str] = None,
  419. ) -> Dict[str, bool]:
  420. """
  421. 分发 RSS 通知到所有已配置的渠道
  422. Args:
  423. rss_items: RSS 条目列表,每个条目包含:
  424. - title: 标题
  425. - feed_id: RSS 源 ID
  426. - feed_name: RSS 源名称
  427. - url: 链接
  428. - published_at: 发布时间
  429. - summary: 摘要(可选)
  430. - author: 作者(可选)
  431. feeds_info: RSS 源 ID 到名称的映射
  432. proxy_url: 代理 URL(可选)
  433. html_file_path: HTML 报告文件路径(邮件使用)
  434. Returns:
  435. Dict[str, bool]: 每个渠道的发送结果
  436. """
  437. if not rss_items:
  438. print("[RSS通知] 没有 RSS 内容,跳过通知")
  439. return {}
  440. results = {}
  441. report_type = "RSS 订阅更新"
  442. # 飞书
  443. if self.config.get("FEISHU_WEBHOOK_URL"):
  444. results["feishu"] = self._send_rss_feishu(
  445. rss_items, feeds_info, proxy_url
  446. )
  447. # 钉钉
  448. if self.config.get("DINGTALK_WEBHOOK_URL"):
  449. results["dingtalk"] = self._send_rss_dingtalk(
  450. rss_items, feeds_info, proxy_url
  451. )
  452. # 企业微信
  453. if self.config.get("WEWORK_WEBHOOK_URL"):
  454. results["wework"] = self._send_rss_markdown(
  455. rss_items, feeds_info, proxy_url, "wework"
  456. )
  457. # Telegram
  458. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  459. results["telegram"] = self._send_rss_markdown(
  460. rss_items, feeds_info, proxy_url, "telegram"
  461. )
  462. # ntfy
  463. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  464. results["ntfy"] = self._send_rss_markdown(
  465. rss_items, feeds_info, proxy_url, "ntfy"
  466. )
  467. # Bark
  468. if self.config.get("BARK_URL"):
  469. results["bark"] = self._send_rss_markdown(
  470. rss_items, feeds_info, proxy_url, "bark"
  471. )
  472. # Slack
  473. if self.config.get("SLACK_WEBHOOK_URL"):
  474. results["slack"] = self._send_rss_markdown(
  475. rss_items, feeds_info, proxy_url, "slack"
  476. )
  477. # 邮件
  478. if (
  479. self.config.get("EMAIL_FROM")
  480. and self.config.get("EMAIL_PASSWORD")
  481. and self.config.get("EMAIL_TO")
  482. ):
  483. results["email"] = self._send_email(report_type, html_file_path)
  484. return results
  485. def _send_rss_feishu(
  486. self,
  487. rss_items: List[Dict],
  488. feeds_info: Optional[Dict[str, str]],
  489. proxy_url: Optional[str],
  490. ) -> bool:
  491. """发送 RSS 到飞书"""
  492. import requests
  493. content = render_rss_feishu_content(
  494. rss_items=rss_items,
  495. feeds_info=feeds_info,
  496. get_time_func=self.get_time_func,
  497. )
  498. webhooks = parse_multi_account_config(self.config["FEISHU_WEBHOOK_URL"])
  499. webhooks = limit_accounts(webhooks, self.max_accounts, "飞书")
  500. results = []
  501. for i, webhook_url in enumerate(webhooks):
  502. if not webhook_url:
  503. continue
  504. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  505. try:
  506. # 分批发送
  507. batches = self.split_content_func(
  508. content, self.config.get("FEISHU_BATCH_SIZE", 29000)
  509. )
  510. for batch_idx, batch_content in enumerate(batches):
  511. payload = {
  512. "msg_type": "interactive",
  513. "card": {
  514. "header": {
  515. "title": {
  516. "tag": "plain_text",
  517. "content": f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}",
  518. },
  519. "template": "green",
  520. },
  521. "elements": [
  522. {"tag": "markdown", "content": batch_content}
  523. ],
  524. },
  525. }
  526. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  527. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  528. resp.raise_for_status()
  529. print(f"✅ 飞书{account_label} RSS 通知发送成功")
  530. results.append(True)
  531. except Exception as e:
  532. print(f"❌ 飞书{account_label} RSS 通知发送失败: {e}")
  533. results.append(False)
  534. return any(results) if results else False
  535. def _send_rss_dingtalk(
  536. self,
  537. rss_items: List[Dict],
  538. feeds_info: Optional[Dict[str, str]],
  539. proxy_url: Optional[str],
  540. ) -> bool:
  541. """发送 RSS 到钉钉"""
  542. import requests
  543. content = render_rss_dingtalk_content(
  544. rss_items=rss_items,
  545. feeds_info=feeds_info,
  546. get_time_func=self.get_time_func,
  547. )
  548. webhooks = parse_multi_account_config(self.config["DINGTALK_WEBHOOK_URL"])
  549. webhooks = limit_accounts(webhooks, self.max_accounts, "钉钉")
  550. results = []
  551. for i, webhook_url in enumerate(webhooks):
  552. if not webhook_url:
  553. continue
  554. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  555. try:
  556. batches = self.split_content_func(
  557. content, self.config.get("DINGTALK_BATCH_SIZE", 20000)
  558. )
  559. for batch_idx, batch_content in enumerate(batches):
  560. title = f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}"
  561. payload = {
  562. "msgtype": "markdown",
  563. "markdown": {
  564. "title": title,
  565. "text": batch_content,
  566. },
  567. }
  568. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  569. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  570. resp.raise_for_status()
  571. print(f"✅ 钉钉{account_label} RSS 通知发送成功")
  572. results.append(True)
  573. except Exception as e:
  574. print(f"❌ 钉钉{account_label} RSS 通知发送失败: {e}")
  575. results.append(False)
  576. return any(results) if results else False
  577. def _send_rss_markdown(
  578. self,
  579. rss_items: List[Dict],
  580. feeds_info: Optional[Dict[str, str]],
  581. proxy_url: Optional[str],
  582. channel: str,
  583. ) -> bool:
  584. """发送 RSS 到 Markdown 兼容渠道(企业微信、Telegram、ntfy、Bark、Slack)"""
  585. import requests
  586. content = render_rss_markdown_content(
  587. rss_items=rss_items,
  588. feeds_info=feeds_info,
  589. get_time_func=self.get_time_func,
  590. )
  591. try:
  592. if channel == "wework":
  593. return self._send_rss_wework(content, proxy_url)
  594. elif channel == "telegram":
  595. return self._send_rss_telegram(content, proxy_url)
  596. elif channel == "ntfy":
  597. return self._send_rss_ntfy(content, proxy_url)
  598. elif channel == "bark":
  599. return self._send_rss_bark(content, proxy_url)
  600. elif channel == "slack":
  601. return self._send_rss_slack(content, proxy_url)
  602. except Exception as e:
  603. print(f"❌ {channel} RSS 通知发送失败: {e}")
  604. return False
  605. return False
  606. def _send_rss_wework(self, content: str, proxy_url: Optional[str]) -> bool:
  607. """发送 RSS 到企业微信"""
  608. import requests
  609. webhooks = parse_multi_account_config(self.config["WEWORK_WEBHOOK_URL"])
  610. webhooks = limit_accounts(webhooks, self.max_accounts, "企业微信")
  611. results = []
  612. for i, webhook_url in enumerate(webhooks):
  613. if not webhook_url:
  614. continue
  615. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  616. try:
  617. batches = self.split_content_func(
  618. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  619. )
  620. for batch_content in batches:
  621. payload = {
  622. "msgtype": "markdown",
  623. "markdown": {"content": batch_content},
  624. }
  625. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  626. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  627. resp.raise_for_status()
  628. print(f"✅ 企业微信{account_label} RSS 通知发送成功")
  629. results.append(True)
  630. except Exception as e:
  631. print(f"❌ 企业微信{account_label} RSS 通知发送失败: {e}")
  632. results.append(False)
  633. return any(results) if results else False
  634. def _send_rss_telegram(self, content: str, proxy_url: Optional[str]) -> bool:
  635. """发送 RSS 到 Telegram"""
  636. import requests
  637. tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  638. chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  639. if not tokens or not chat_ids:
  640. return False
  641. results = []
  642. for i in range(min(len(tokens), len(chat_ids), self.max_accounts)):
  643. token = tokens[i]
  644. chat_id = chat_ids[i]
  645. if not token or not chat_id:
  646. continue
  647. account_label = f"账号{i+1}" if len(tokens) > 1 else ""
  648. try:
  649. batches = self.split_content_func(
  650. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  651. )
  652. for batch_content in batches:
  653. url = f"https://api.telegram.org/bot{token}/sendMessage"
  654. payload = {
  655. "chat_id": chat_id,
  656. "text": batch_content,
  657. "parse_mode": "Markdown",
  658. }
  659. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  660. resp = requests.post(url, json=payload, proxies=proxies, timeout=30)
  661. resp.raise_for_status()
  662. print(f"✅ Telegram{account_label} RSS 通知发送成功")
  663. results.append(True)
  664. except Exception as e:
  665. print(f"❌ Telegram{account_label} RSS 通知发送失败: {e}")
  666. results.append(False)
  667. return any(results) if results else False
  668. def _send_rss_ntfy(self, content: str, proxy_url: Optional[str]) -> bool:
  669. """发送 RSS 到 ntfy"""
  670. import requests
  671. server_url = self.config["NTFY_SERVER_URL"]
  672. topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  673. tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  674. if not server_url or not topics:
  675. return False
  676. topics = limit_accounts(topics, self.max_accounts, "ntfy")
  677. results = []
  678. for i, topic in enumerate(topics):
  679. if not topic:
  680. continue
  681. token = tokens[i] if tokens and i < len(tokens) else ""
  682. account_label = f"账号{i+1}" if len(topics) > 1 else ""
  683. try:
  684. batches = self.split_content_func(content, 3800)
  685. for batch_content in batches:
  686. url = f"{server_url.rstrip('/')}/{topic}"
  687. headers = {"Title": "RSS 订阅更新", "Markdown": "yes"}
  688. if token:
  689. headers["Authorization"] = f"Bearer {token}"
  690. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  691. resp = requests.post(
  692. url, data=batch_content.encode("utf-8"),
  693. headers=headers, proxies=proxies, timeout=30
  694. )
  695. resp.raise_for_status()
  696. print(f"✅ ntfy{account_label} RSS 通知发送成功")
  697. results.append(True)
  698. except Exception as e:
  699. print(f"❌ ntfy{account_label} RSS 通知发送失败: {e}")
  700. results.append(False)
  701. return any(results) if results else False
  702. def _send_rss_bark(self, content: str, proxy_url: Optional[str]) -> bool:
  703. """发送 RSS 到 Bark"""
  704. import requests
  705. import urllib.parse
  706. urls = parse_multi_account_config(self.config["BARK_URL"])
  707. urls = limit_accounts(urls, self.max_accounts, "Bark")
  708. results = []
  709. for i, bark_url in enumerate(urls):
  710. if not bark_url:
  711. continue
  712. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  713. try:
  714. batches = self.split_content_func(
  715. content, self.config.get("BARK_BATCH_SIZE", 3600)
  716. )
  717. for batch_content in batches:
  718. title = urllib.parse.quote("📰 RSS 订阅更新")
  719. body = urllib.parse.quote(batch_content)
  720. url = f"{bark_url.rstrip('/')}/{title}/{body}"
  721. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  722. resp = requests.get(url, proxies=proxies, timeout=30)
  723. resp.raise_for_status()
  724. print(f"✅ Bark{account_label} RSS 通知发送成功")
  725. results.append(True)
  726. except Exception as e:
  727. print(f"❌ Bark{account_label} RSS 通知发送失败: {e}")
  728. results.append(False)
  729. return any(results) if results else False
  730. def _send_rss_slack(self, content: str, proxy_url: Optional[str]) -> bool:
  731. """发送 RSS 到 Slack"""
  732. import requests
  733. webhooks = parse_multi_account_config(self.config["SLACK_WEBHOOK_URL"])
  734. webhooks = limit_accounts(webhooks, self.max_accounts, "Slack")
  735. results = []
  736. for i, webhook_url in enumerate(webhooks):
  737. if not webhook_url:
  738. continue
  739. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  740. try:
  741. batches = self.split_content_func(
  742. content, self.config.get("SLACK_BATCH_SIZE", 4000)
  743. )
  744. for batch_content in batches:
  745. payload = {
  746. "blocks": [
  747. {
  748. "type": "section",
  749. "text": {
  750. "type": "mrkdwn",
  751. "text": batch_content,
  752. },
  753. }
  754. ]
  755. }
  756. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  757. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  758. resp.raise_for_status()
  759. print(f"✅ Slack{account_label} RSS 通知发送成功")
  760. results.append(True)
  761. except Exception as e:
  762. print(f"❌ Slack{account_label} RSS 通知发送失败: {e}")
  763. results.append(False)
  764. return any(results) if results else False