dispatcher.py 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049
  1. # coding=utf-8
  2. """
  3. 通知调度器模块
  4. 提供统一的通知分发接口。
  5. 支持所有通知渠道的多账号配置,使用 `;` 分隔多个账号。
  6. 使用示例:
  7. dispatcher = NotificationDispatcher(config, get_time_func, split_content_func)
  8. results = dispatcher.dispatch_all(report_data, report_type, ...)
  9. """
  10. from __future__ import annotations
  11. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
  12. from trendradar.core.config import (
  13. get_account_at_index,
  14. limit_accounts,
  15. parse_multi_account_config,
  16. validate_paired_configs,
  17. )
  18. from .senders import (
  19. send_to_bark,
  20. send_to_dingtalk,
  21. send_to_email,
  22. send_to_feishu,
  23. send_to_ntfy,
  24. send_to_slack,
  25. send_to_telegram,
  26. send_to_wework,
  27. send_to_generic_webhook,
  28. )
  29. from .renderer import (
  30. render_rss_feishu_content,
  31. render_rss_dingtalk_content,
  32. render_rss_markdown_content,
  33. )
  34. # 类型检查时导入,运行时不导入(避免循环导入)
  35. if TYPE_CHECKING:
  36. from trendradar.ai import AIAnalysisResult
  37. class NotificationDispatcher:
  38. """
  39. 统一的多账号通知调度器
  40. 将多账号发送逻辑封装,提供简洁的 dispatch_all 接口。
  41. 内部处理账号解析、数量限制、配对验证等逻辑。
  42. """
  43. def __init__(
  44. self,
  45. config: Dict[str, Any],
  46. get_time_func: Callable,
  47. split_content_func: Callable,
  48. ):
  49. """
  50. 初始化通知调度器
  51. Args:
  52. config: 完整的配置字典,包含所有通知渠道的配置
  53. get_time_func: 获取当前时间的函数
  54. split_content_func: 内容分批函数
  55. """
  56. self.config = config
  57. self.get_time_func = get_time_func
  58. self.split_content_func = split_content_func
  59. self.max_accounts = config.get("MAX_ACCOUNTS_PER_CHANNEL", 3)
  60. def dispatch_all(
  61. self,
  62. report_data: Dict,
  63. report_type: str,
  64. update_info: Optional[Dict] = None,
  65. proxy_url: Optional[str] = None,
  66. mode: str = "daily",
  67. html_file_path: Optional[str] = None,
  68. rss_items: Optional[List[Dict]] = None,
  69. rss_new_items: Optional[List[Dict]] = None,
  70. ai_analysis: Optional[AIAnalysisResult] = None,
  71. standalone_data: Optional[Dict] = None,
  72. ) -> Dict[str, bool]:
  73. """
  74. 分发通知到所有已配置的渠道(支持热榜+RSS合并推送+AI分析+独立展示区)
  75. Args:
  76. report_data: 报告数据(由 prepare_report_data 生成)
  77. report_type: 报告类型(如 "当日汇总"、"实时增量")
  78. update_info: 版本更新信息(可选)
  79. proxy_url: 代理 URL(可选)
  80. mode: 报告模式 (daily/current/incremental)
  81. html_file_path: HTML 报告文件路径(邮件使用)
  82. rss_items: RSS 统计条目列表(用于 RSS 统计区块)
  83. rss_new_items: RSS 新增条目列表(用于 RSS 新增区块)
  84. ai_analysis: AI 分析结果(可选)
  85. standalone_data: 独立展示区数据(可选)
  86. Returns:
  87. Dict[str, bool]: 每个渠道的发送结果,key 为渠道名,value 为是否成功
  88. """
  89. results = {}
  90. # 获取 AI 推送模式
  91. ai_config = self.config.get("AI_ANALYSIS", {})
  92. ai_push_mode = ai_config.get("PUSH_MODE", "both")
  93. # 飞书
  94. if self.config.get("FEISHU_WEBHOOK_URL"):
  95. results["feishu"] = self._send_feishu(
  96. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  97. ai_analysis, ai_push_mode, standalone_data
  98. )
  99. # 钉钉
  100. if self.config.get("DINGTALK_WEBHOOK_URL"):
  101. results["dingtalk"] = self._send_dingtalk(
  102. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  103. ai_analysis, ai_push_mode, standalone_data
  104. )
  105. # 企业微信
  106. if self.config.get("WEWORK_WEBHOOK_URL"):
  107. results["wework"] = self._send_wework(
  108. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  109. ai_analysis, ai_push_mode, standalone_data
  110. )
  111. # Telegram(需要配对验证)
  112. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  113. results["telegram"] = self._send_telegram(
  114. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  115. ai_analysis, ai_push_mode, standalone_data
  116. )
  117. # ntfy(需要配对验证)
  118. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  119. results["ntfy"] = self._send_ntfy(
  120. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  121. ai_analysis, ai_push_mode, standalone_data
  122. )
  123. # Bark
  124. if self.config.get("BARK_URL"):
  125. results["bark"] = self._send_bark(
  126. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  127. ai_analysis, ai_push_mode, standalone_data
  128. )
  129. # Slack
  130. if self.config.get("SLACK_WEBHOOK_URL"):
  131. results["slack"] = self._send_slack(
  132. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  133. ai_analysis, ai_push_mode, standalone_data
  134. )
  135. # 通用 Webhook
  136. if self.config.get("GENERIC_WEBHOOK_URL"):
  137. results["generic_webhook"] = self._send_generic_webhook(
  138. report_data, report_type, update_info, proxy_url, mode, rss_items, rss_new_items,
  139. ai_analysis, ai_push_mode, standalone_data
  140. )
  141. # 邮件(保持原有逻辑,已支持多收件人)
  142. if (
  143. self.config.get("EMAIL_FROM")
  144. and self.config.get("EMAIL_PASSWORD")
  145. and self.config.get("EMAIL_TO")
  146. ):
  147. results["email"] = self._send_email(report_type, html_file_path, ai_analysis, ai_push_mode)
  148. return results
  149. def _send_to_multi_accounts(
  150. self,
  151. channel_name: str,
  152. config_value: str,
  153. send_func: Callable[..., bool],
  154. **kwargs,
  155. ) -> bool:
  156. """
  157. 通用多账号发送逻辑
  158. Args:
  159. channel_name: 渠道名称(用于日志和账号数量限制提示)
  160. config_value: 配置值(可能包含多个账号,用 ; 分隔)
  161. send_func: 发送函数,签名为 (account, account_label=..., **kwargs) -> bool
  162. **kwargs: 传递给发送函数的其他参数
  163. Returns:
  164. bool: 任一账号发送成功则返回 True
  165. """
  166. accounts = parse_multi_account_config(config_value)
  167. if not accounts:
  168. return False
  169. accounts = limit_accounts(accounts, self.max_accounts, channel_name)
  170. results = []
  171. for i, account in enumerate(accounts):
  172. if account:
  173. account_label = f"账号{i+1}" if len(accounts) > 1 else ""
  174. result = send_func(account, account_label=account_label, **kwargs)
  175. results.append(result)
  176. return any(results) if results else False
  177. def _send_feishu(
  178. self,
  179. report_data: Dict,
  180. report_type: str,
  181. update_info: Optional[Dict],
  182. proxy_url: Optional[str],
  183. mode: str,
  184. rss_items: Optional[List[Dict]] = None,
  185. rss_new_items: Optional[List[Dict]] = None,
  186. ai_analysis: Optional[AIAnalysisResult] = None,
  187. ai_push_mode: str = "both",
  188. standalone_data: Optional[Dict] = None,
  189. ) -> bool:
  190. """发送到飞书(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  191. # 根据 AI 推送模式决定是否发送原始内容
  192. if ai_push_mode == "only_analysis" and ai_analysis:
  193. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  194. return self._send_to_multi_accounts(
  195. channel_name="飞书",
  196. config_value=self.config["FEISHU_WEBHOOK_URL"],
  197. send_func=lambda url, account_label: send_to_feishu(
  198. webhook_url=url,
  199. report_data=report_data,
  200. report_type=report_type,
  201. update_info=update_info,
  202. proxy_url=proxy_url,
  203. mode=mode,
  204. account_label=account_label,
  205. batch_size=self.config.get("FEISHU_BATCH_SIZE", 29000),
  206. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  207. split_content_func=self.split_content_func,
  208. get_time_func=self.get_time_func,
  209. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  210. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  211. ai_analysis=ai_analysis,
  212. ai_push_mode=ai_push_mode,
  213. standalone_data=standalone_data,
  214. ),
  215. )
  216. def _send_dingtalk(
  217. self,
  218. report_data: Dict,
  219. report_type: str,
  220. update_info: Optional[Dict],
  221. proxy_url: Optional[str],
  222. mode: str,
  223. rss_items: Optional[List[Dict]] = None,
  224. rss_new_items: Optional[List[Dict]] = None,
  225. ai_analysis: Optional[AIAnalysisResult] = None,
  226. ai_push_mode: str = "both",
  227. standalone_data: Optional[Dict] = None,
  228. ) -> bool:
  229. """发送到钉钉(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  230. if ai_push_mode == "only_analysis" and ai_analysis:
  231. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  232. return self._send_to_multi_accounts(
  233. channel_name="钉钉",
  234. config_value=self.config["DINGTALK_WEBHOOK_URL"],
  235. send_func=lambda url, account_label: send_to_dingtalk(
  236. webhook_url=url,
  237. report_data=report_data,
  238. report_type=report_type,
  239. update_info=update_info,
  240. proxy_url=proxy_url,
  241. mode=mode,
  242. account_label=account_label,
  243. batch_size=self.config.get("DINGTALK_BATCH_SIZE", 20000),
  244. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  245. split_content_func=self.split_content_func,
  246. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  247. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  248. ai_analysis=ai_analysis,
  249. ai_push_mode=ai_push_mode,
  250. standalone_data=standalone_data,
  251. ),
  252. )
  253. def _send_wework(
  254. self,
  255. report_data: Dict,
  256. report_type: str,
  257. update_info: Optional[Dict],
  258. proxy_url: Optional[str],
  259. mode: str,
  260. rss_items: Optional[List[Dict]] = None,
  261. rss_new_items: Optional[List[Dict]] = None,
  262. ai_analysis: Optional[AIAnalysisResult] = None,
  263. ai_push_mode: str = "both",
  264. standalone_data: Optional[Dict] = None,
  265. ) -> bool:
  266. """发送到企业微信(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  267. if ai_push_mode == "only_analysis" and ai_analysis:
  268. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  269. return self._send_to_multi_accounts(
  270. channel_name="企业微信",
  271. config_value=self.config["WEWORK_WEBHOOK_URL"],
  272. send_func=lambda url, account_label: send_to_wework(
  273. webhook_url=url,
  274. report_data=report_data,
  275. report_type=report_type,
  276. update_info=update_info,
  277. proxy_url=proxy_url,
  278. mode=mode,
  279. account_label=account_label,
  280. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  281. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  282. msg_type=self.config.get("WEWORK_MSG_TYPE", "markdown"),
  283. split_content_func=self.split_content_func,
  284. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  285. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  286. ai_analysis=ai_analysis,
  287. ai_push_mode=ai_push_mode,
  288. standalone_data=standalone_data,
  289. ),
  290. )
  291. def _send_telegram(
  292. self,
  293. report_data: Dict,
  294. report_type: str,
  295. update_info: Optional[Dict],
  296. proxy_url: Optional[str],
  297. mode: str,
  298. rss_items: Optional[List[Dict]] = None,
  299. rss_new_items: Optional[List[Dict]] = None,
  300. ai_analysis: Optional[AIAnalysisResult] = None,
  301. ai_push_mode: str = "both",
  302. standalone_data: Optional[Dict] = None,
  303. ) -> bool:
  304. """发送到 Telegram(多账号,需验证 token 和 chat_id 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  305. if ai_push_mode == "only_analysis" and ai_analysis:
  306. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  307. telegram_tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  308. telegram_chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  309. if not telegram_tokens or not telegram_chat_ids:
  310. return False
  311. # 验证配对
  312. valid, count = validate_paired_configs(
  313. {"bot_token": telegram_tokens, "chat_id": telegram_chat_ids},
  314. "Telegram",
  315. required_keys=["bot_token", "chat_id"],
  316. )
  317. if not valid or count == 0:
  318. return False
  319. # 限制账号数量
  320. telegram_tokens = limit_accounts(telegram_tokens, self.max_accounts, "Telegram")
  321. telegram_chat_ids = telegram_chat_ids[: len(telegram_tokens)]
  322. results = []
  323. for i in range(len(telegram_tokens)):
  324. token = telegram_tokens[i]
  325. chat_id = telegram_chat_ids[i]
  326. if token and chat_id:
  327. account_label = f"账号{i+1}" if len(telegram_tokens) > 1 else ""
  328. result = send_to_telegram(
  329. bot_token=token,
  330. chat_id=chat_id,
  331. report_data=report_data,
  332. report_type=report_type,
  333. update_info=update_info,
  334. proxy_url=proxy_url,
  335. mode=mode,
  336. account_label=account_label,
  337. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  338. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  339. split_content_func=self.split_content_func,
  340. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  341. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  342. ai_analysis=ai_analysis,
  343. ai_push_mode=ai_push_mode,
  344. standalone_data=standalone_data,
  345. )
  346. results.append(result)
  347. return any(results) if results else False
  348. def _send_ntfy(
  349. self,
  350. report_data: Dict,
  351. report_type: str,
  352. update_info: Optional[Dict],
  353. proxy_url: Optional[str],
  354. mode: str,
  355. rss_items: Optional[List[Dict]] = None,
  356. rss_new_items: Optional[List[Dict]] = None,
  357. ai_analysis: Optional[AIAnalysisResult] = None,
  358. ai_push_mode: str = "both",
  359. standalone_data: Optional[Dict] = None,
  360. ) -> bool:
  361. """发送到 ntfy(多账号,需验证 topic 和 token 配对,支持热榜+RSS合并+AI分析+独立展示区)"""
  362. if ai_push_mode == "only_analysis" and ai_analysis:
  363. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  364. ntfy_server_url = self.config["NTFY_SERVER_URL"]
  365. ntfy_topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  366. ntfy_tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  367. if not ntfy_server_url or not ntfy_topics:
  368. return False
  369. # 验证 token 和 topic 数量一致(如果配置了 token)
  370. if ntfy_tokens and len(ntfy_tokens) != len(ntfy_topics):
  371. print(
  372. f"❌ ntfy 配置错误:topic 数量({len(ntfy_topics)})与 token 数量({len(ntfy_tokens)})不一致,跳过 ntfy 推送"
  373. )
  374. return False
  375. # 限制账号数量
  376. ntfy_topics = limit_accounts(ntfy_topics, self.max_accounts, "ntfy")
  377. if ntfy_tokens:
  378. ntfy_tokens = ntfy_tokens[: len(ntfy_topics)]
  379. results = []
  380. for i, topic in enumerate(ntfy_topics):
  381. if topic:
  382. token = get_account_at_index(ntfy_tokens, i, "") if ntfy_tokens else ""
  383. account_label = f"账号{i+1}" if len(ntfy_topics) > 1 else ""
  384. result = send_to_ntfy(
  385. server_url=ntfy_server_url,
  386. topic=topic,
  387. token=token,
  388. report_data=report_data,
  389. report_type=report_type,
  390. update_info=update_info,
  391. proxy_url=proxy_url,
  392. mode=mode,
  393. account_label=account_label,
  394. batch_size=3800,
  395. split_content_func=self.split_content_func,
  396. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  397. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  398. ai_analysis=ai_analysis,
  399. ai_push_mode=ai_push_mode,
  400. standalone_data=standalone_data,
  401. )
  402. results.append(result)
  403. return any(results) if results else False
  404. def _send_bark(
  405. self,
  406. report_data: Dict,
  407. report_type: str,
  408. update_info: Optional[Dict],
  409. proxy_url: Optional[str],
  410. mode: str,
  411. rss_items: Optional[List[Dict]] = None,
  412. rss_new_items: Optional[List[Dict]] = None,
  413. ai_analysis: Optional[AIAnalysisResult] = None,
  414. ai_push_mode: str = "both",
  415. standalone_data: Optional[Dict] = None,
  416. ) -> bool:
  417. """发送到 Bark(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  418. if ai_push_mode == "only_analysis" and ai_analysis:
  419. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  420. return self._send_to_multi_accounts(
  421. channel_name="Bark",
  422. config_value=self.config["BARK_URL"],
  423. send_func=lambda url, account_label: send_to_bark(
  424. bark_url=url,
  425. report_data=report_data,
  426. report_type=report_type,
  427. update_info=update_info,
  428. proxy_url=proxy_url,
  429. mode=mode,
  430. account_label=account_label,
  431. batch_size=self.config.get("BARK_BATCH_SIZE", 3600),
  432. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  433. split_content_func=self.split_content_func,
  434. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  435. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  436. ai_analysis=ai_analysis,
  437. ai_push_mode=ai_push_mode,
  438. standalone_data=standalone_data,
  439. ),
  440. )
  441. def _send_slack(
  442. self,
  443. report_data: Dict,
  444. report_type: str,
  445. update_info: Optional[Dict],
  446. proxy_url: Optional[str],
  447. mode: str,
  448. rss_items: Optional[List[Dict]] = None,
  449. rss_new_items: Optional[List[Dict]] = None,
  450. ai_analysis: Optional[AIAnalysisResult] = None,
  451. ai_push_mode: str = "both",
  452. standalone_data: Optional[Dict] = None,
  453. ) -> bool:
  454. """发送到 Slack(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  455. if ai_push_mode == "only_analysis" and ai_analysis:
  456. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  457. return self._send_to_multi_accounts(
  458. channel_name="Slack",
  459. config_value=self.config["SLACK_WEBHOOK_URL"],
  460. send_func=lambda url, account_label: send_to_slack(
  461. webhook_url=url,
  462. report_data=report_data,
  463. report_type=report_type,
  464. update_info=update_info,
  465. proxy_url=proxy_url,
  466. mode=mode,
  467. account_label=account_label,
  468. batch_size=self.config.get("SLACK_BATCH_SIZE", 4000),
  469. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  470. split_content_func=self.split_content_func,
  471. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  472. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  473. ai_analysis=ai_analysis,
  474. ai_push_mode=ai_push_mode,
  475. standalone_data=standalone_data,
  476. ),
  477. )
  478. def _send_generic_webhook(
  479. self,
  480. report_data: Dict,
  481. report_type: str,
  482. update_info: Optional[Dict],
  483. proxy_url: Optional[str],
  484. mode: str,
  485. rss_items: Optional[List[Dict]] = None,
  486. rss_new_items: Optional[List[Dict]] = None,
  487. ai_analysis: Optional[AIAnalysisResult] = None,
  488. ai_push_mode: str = "both",
  489. standalone_data: Optional[Dict] = None,
  490. ) -> bool:
  491. """发送到通用 Webhook(多账号,支持热榜+RSS合并+AI分析+独立展示区)"""
  492. if ai_push_mode == "only_analysis" and ai_analysis:
  493. report_data = {"stats": [], "failed_ids": [], "new_titles": {}, "id_to_name": {}}
  494. urls = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_URL", ""))
  495. templates = parse_multi_account_config(self.config.get("GENERIC_WEBHOOK_TEMPLATE", ""))
  496. if not urls:
  497. return False
  498. urls = limit_accounts(urls, self.max_accounts, "通用Webhook")
  499. results = []
  500. for i, url in enumerate(urls):
  501. if not url:
  502. continue
  503. template = ""
  504. if templates:
  505. if i < len(templates):
  506. template = templates[i]
  507. elif len(templates) == 1:
  508. template = templates[0] # 共用一个模板
  509. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  510. result = send_to_generic_webhook(
  511. webhook_url=url,
  512. payload_template=template,
  513. report_data=report_data,
  514. report_type=report_type,
  515. update_info=update_info,
  516. proxy_url=proxy_url,
  517. mode=mode,
  518. account_label=account_label,
  519. batch_size=self.config.get("MESSAGE_BATCH_SIZE", 4000),
  520. batch_interval=self.config.get("BATCH_SEND_INTERVAL", 1.0),
  521. split_content_func=self.split_content_func,
  522. rss_items=rss_items if ai_push_mode != "only_analysis" else None,
  523. rss_new_items=rss_new_items if ai_push_mode != "only_analysis" else None,
  524. ai_analysis=ai_analysis,
  525. ai_push_mode=ai_push_mode,
  526. standalone_data=standalone_data,
  527. )
  528. results.append(result)
  529. return any(results) if results else False
  530. def _send_email(
  531. self,
  532. report_type: str,
  533. html_file_path: Optional[str],
  534. ai_analysis: Optional[AIAnalysisResult] = None,
  535. ai_push_mode: str = "both",
  536. ) -> bool:
  537. """发送邮件(保持原有逻辑,已支持多收件人,支持AI分析)"""
  538. return send_to_email(
  539. from_email=self.config["EMAIL_FROM"],
  540. password=self.config["EMAIL_PASSWORD"],
  541. to_email=self.config["EMAIL_TO"],
  542. report_type=report_type,
  543. html_file_path=html_file_path,
  544. custom_smtp_server=self.config.get("EMAIL_SMTP_SERVER", ""),
  545. custom_smtp_port=self.config.get("EMAIL_SMTP_PORT", ""),
  546. get_time_func=self.get_time_func,
  547. ai_analysis=ai_analysis,
  548. ai_push_mode=ai_push_mode,
  549. )
  550. # === RSS 通知方法 ===
  551. def dispatch_rss(
  552. self,
  553. rss_items: List[Dict],
  554. feeds_info: Optional[Dict[str, str]] = None,
  555. proxy_url: Optional[str] = None,
  556. html_file_path: Optional[str] = None,
  557. ) -> Dict[str, bool]:
  558. """
  559. 分发 RSS 通知到所有已配置的渠道
  560. Args:
  561. rss_items: RSS 条目列表,每个条目包含:
  562. - title: 标题
  563. - feed_id: RSS 源 ID
  564. - feed_name: RSS 源名称
  565. - url: 链接
  566. - published_at: 发布时间
  567. - summary: 摘要(可选)
  568. - author: 作者(可选)
  569. feeds_info: RSS 源 ID 到名称的映射
  570. proxy_url: 代理 URL(可选)
  571. html_file_path: HTML 报告文件路径(邮件使用)
  572. Returns:
  573. Dict[str, bool]: 每个渠道的发送结果
  574. """
  575. if not rss_items:
  576. print("[RSS通知] 没有 RSS 内容,跳过通知")
  577. return {}
  578. results = {}
  579. report_type = "RSS 订阅更新"
  580. # 飞书
  581. if self.config.get("FEISHU_WEBHOOK_URL"):
  582. results["feishu"] = self._send_rss_feishu(
  583. rss_items, feeds_info, proxy_url
  584. )
  585. # 钉钉
  586. if self.config.get("DINGTALK_WEBHOOK_URL"):
  587. results["dingtalk"] = self._send_rss_dingtalk(
  588. rss_items, feeds_info, proxy_url
  589. )
  590. # 企业微信
  591. if self.config.get("WEWORK_WEBHOOK_URL"):
  592. results["wework"] = self._send_rss_markdown(
  593. rss_items, feeds_info, proxy_url, "wework"
  594. )
  595. # Telegram
  596. if self.config.get("TELEGRAM_BOT_TOKEN") and self.config.get("TELEGRAM_CHAT_ID"):
  597. results["telegram"] = self._send_rss_markdown(
  598. rss_items, feeds_info, proxy_url, "telegram"
  599. )
  600. # ntfy
  601. if self.config.get("NTFY_SERVER_URL") and self.config.get("NTFY_TOPIC"):
  602. results["ntfy"] = self._send_rss_markdown(
  603. rss_items, feeds_info, proxy_url, "ntfy"
  604. )
  605. # Bark
  606. if self.config.get("BARK_URL"):
  607. results["bark"] = self._send_rss_markdown(
  608. rss_items, feeds_info, proxy_url, "bark"
  609. )
  610. # Slack
  611. if self.config.get("SLACK_WEBHOOK_URL"):
  612. results["slack"] = self._send_rss_markdown(
  613. rss_items, feeds_info, proxy_url, "slack"
  614. )
  615. # 邮件
  616. if (
  617. self.config.get("EMAIL_FROM")
  618. and self.config.get("EMAIL_PASSWORD")
  619. and self.config.get("EMAIL_TO")
  620. ):
  621. results["email"] = self._send_email(report_type, html_file_path)
  622. return results
  623. def _send_rss_feishu(
  624. self,
  625. rss_items: List[Dict],
  626. feeds_info: Optional[Dict[str, str]],
  627. proxy_url: Optional[str],
  628. ) -> bool:
  629. """发送 RSS 到飞书"""
  630. import requests
  631. content = render_rss_feishu_content(
  632. rss_items=rss_items,
  633. feeds_info=feeds_info,
  634. get_time_func=self.get_time_func,
  635. )
  636. webhooks = parse_multi_account_config(self.config["FEISHU_WEBHOOK_URL"])
  637. webhooks = limit_accounts(webhooks, self.max_accounts, "飞书")
  638. results = []
  639. for i, webhook_url in enumerate(webhooks):
  640. if not webhook_url:
  641. continue
  642. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  643. try:
  644. # 分批发送
  645. batches = self.split_content_func(
  646. content, self.config.get("FEISHU_BATCH_SIZE", 29000)
  647. )
  648. for batch_idx, batch_content in enumerate(batches):
  649. payload = {
  650. "msg_type": "interactive",
  651. "card": {
  652. "header": {
  653. "title": {
  654. "tag": "plain_text",
  655. "content": f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}",
  656. },
  657. "template": "green",
  658. },
  659. "elements": [
  660. {"tag": "markdown", "content": batch_content}
  661. ],
  662. },
  663. }
  664. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  665. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  666. resp.raise_for_status()
  667. print(f"✅ 飞书{account_label} RSS 通知发送成功")
  668. results.append(True)
  669. except Exception as e:
  670. print(f"❌ 飞书{account_label} RSS 通知发送失败: {e}")
  671. results.append(False)
  672. return any(results) if results else False
  673. def _send_rss_dingtalk(
  674. self,
  675. rss_items: List[Dict],
  676. feeds_info: Optional[Dict[str, str]],
  677. proxy_url: Optional[str],
  678. ) -> bool:
  679. """发送 RSS 到钉钉"""
  680. import requests
  681. content = render_rss_dingtalk_content(
  682. rss_items=rss_items,
  683. feeds_info=feeds_info,
  684. get_time_func=self.get_time_func,
  685. )
  686. webhooks = parse_multi_account_config(self.config["DINGTALK_WEBHOOK_URL"])
  687. webhooks = limit_accounts(webhooks, self.max_accounts, "钉钉")
  688. results = []
  689. for i, webhook_url in enumerate(webhooks):
  690. if not webhook_url:
  691. continue
  692. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  693. try:
  694. batches = self.split_content_func(
  695. content, self.config.get("DINGTALK_BATCH_SIZE", 20000)
  696. )
  697. for batch_idx, batch_content in enumerate(batches):
  698. title = f"📰 RSS 订阅更新 {f'({batch_idx + 1}/{len(batches)})' if len(batches) > 1 else ''}"
  699. payload = {
  700. "msgtype": "markdown",
  701. "markdown": {
  702. "title": title,
  703. "text": batch_content,
  704. },
  705. }
  706. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  707. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  708. resp.raise_for_status()
  709. print(f"✅ 钉钉{account_label} RSS 通知发送成功")
  710. results.append(True)
  711. except Exception as e:
  712. print(f"❌ 钉钉{account_label} RSS 通知发送失败: {e}")
  713. results.append(False)
  714. return any(results) if results else False
  715. def _send_rss_markdown(
  716. self,
  717. rss_items: List[Dict],
  718. feeds_info: Optional[Dict[str, str]],
  719. proxy_url: Optional[str],
  720. channel: str,
  721. ) -> bool:
  722. """发送 RSS 到 Markdown 兼容渠道(企业微信、Telegram、ntfy、Bark、Slack)"""
  723. import requests
  724. content = render_rss_markdown_content(
  725. rss_items=rss_items,
  726. feeds_info=feeds_info,
  727. get_time_func=self.get_time_func,
  728. )
  729. try:
  730. if channel == "wework":
  731. return self._send_rss_wework(content, proxy_url)
  732. elif channel == "telegram":
  733. return self._send_rss_telegram(content, proxy_url)
  734. elif channel == "ntfy":
  735. return self._send_rss_ntfy(content, proxy_url)
  736. elif channel == "bark":
  737. return self._send_rss_bark(content, proxy_url)
  738. elif channel == "slack":
  739. return self._send_rss_slack(content, proxy_url)
  740. except Exception as e:
  741. print(f"❌ {channel} RSS 通知发送失败: {e}")
  742. return False
  743. return False
  744. def _send_rss_wework(self, content: str, proxy_url: Optional[str]) -> bool:
  745. """发送 RSS 到企业微信"""
  746. import requests
  747. webhooks = parse_multi_account_config(self.config["WEWORK_WEBHOOK_URL"])
  748. webhooks = limit_accounts(webhooks, self.max_accounts, "企业微信")
  749. results = []
  750. for i, webhook_url in enumerate(webhooks):
  751. if not webhook_url:
  752. continue
  753. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  754. try:
  755. batches = self.split_content_func(
  756. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  757. )
  758. for batch_content in batches:
  759. payload = {
  760. "msgtype": "markdown",
  761. "markdown": {"content": batch_content},
  762. }
  763. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  764. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  765. resp.raise_for_status()
  766. print(f"✅ 企业微信{account_label} RSS 通知发送成功")
  767. results.append(True)
  768. except Exception as e:
  769. print(f"❌ 企业微信{account_label} RSS 通知发送失败: {e}")
  770. results.append(False)
  771. return any(results) if results else False
  772. def _send_rss_telegram(self, content: str, proxy_url: Optional[str]) -> bool:
  773. """发送 RSS 到 Telegram"""
  774. import requests
  775. tokens = parse_multi_account_config(self.config["TELEGRAM_BOT_TOKEN"])
  776. chat_ids = parse_multi_account_config(self.config["TELEGRAM_CHAT_ID"])
  777. if not tokens or not chat_ids:
  778. return False
  779. results = []
  780. for i in range(min(len(tokens), len(chat_ids), self.max_accounts)):
  781. token = tokens[i]
  782. chat_id = chat_ids[i]
  783. if not token or not chat_id:
  784. continue
  785. account_label = f"账号{i+1}" if len(tokens) > 1 else ""
  786. try:
  787. batches = self.split_content_func(
  788. content, self.config.get("MESSAGE_BATCH_SIZE", 4000)
  789. )
  790. for batch_content in batches:
  791. url = f"https://api.telegram.org/bot{token}/sendMessage"
  792. payload = {
  793. "chat_id": chat_id,
  794. "text": batch_content,
  795. "parse_mode": "Markdown",
  796. }
  797. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  798. resp = requests.post(url, json=payload, proxies=proxies, timeout=30)
  799. resp.raise_for_status()
  800. print(f"✅ Telegram{account_label} RSS 通知发送成功")
  801. results.append(True)
  802. except Exception as e:
  803. print(f"❌ Telegram{account_label} RSS 通知发送失败: {e}")
  804. results.append(False)
  805. return any(results) if results else False
  806. def _send_rss_ntfy(self, content: str, proxy_url: Optional[str]) -> bool:
  807. """发送 RSS 到 ntfy"""
  808. import requests
  809. server_url = self.config["NTFY_SERVER_URL"]
  810. topics = parse_multi_account_config(self.config["NTFY_TOPIC"])
  811. tokens = parse_multi_account_config(self.config.get("NTFY_TOKEN", ""))
  812. if not server_url or not topics:
  813. return False
  814. topics = limit_accounts(topics, self.max_accounts, "ntfy")
  815. results = []
  816. for i, topic in enumerate(topics):
  817. if not topic:
  818. continue
  819. token = tokens[i] if tokens and i < len(tokens) else ""
  820. account_label = f"账号{i+1}" if len(topics) > 1 else ""
  821. try:
  822. batches = self.split_content_func(content, 3800)
  823. for batch_content in batches:
  824. url = f"{server_url.rstrip('/')}/{topic}"
  825. headers = {"Title": "RSS 订阅更新", "Markdown": "yes"}
  826. if token:
  827. headers["Authorization"] = f"Bearer {token}"
  828. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  829. resp = requests.post(
  830. url, data=batch_content.encode("utf-8"),
  831. headers=headers, proxies=proxies, timeout=30
  832. )
  833. resp.raise_for_status()
  834. print(f"✅ ntfy{account_label} RSS 通知发送成功")
  835. results.append(True)
  836. except Exception as e:
  837. print(f"❌ ntfy{account_label} RSS 通知发送失败: {e}")
  838. results.append(False)
  839. return any(results) if results else False
  840. def _send_rss_bark(self, content: str, proxy_url: Optional[str]) -> bool:
  841. """发送 RSS 到 Bark"""
  842. import requests
  843. import urllib.parse
  844. urls = parse_multi_account_config(self.config["BARK_URL"])
  845. urls = limit_accounts(urls, self.max_accounts, "Bark")
  846. results = []
  847. for i, bark_url in enumerate(urls):
  848. if not bark_url:
  849. continue
  850. account_label = f"账号{i+1}" if len(urls) > 1 else ""
  851. try:
  852. batches = self.split_content_func(
  853. content, self.config.get("BARK_BATCH_SIZE", 3600)
  854. )
  855. for batch_content in batches:
  856. title = urllib.parse.quote("📰 RSS 订阅更新")
  857. body = urllib.parse.quote(batch_content)
  858. url = f"{bark_url.rstrip('/')}/{title}/{body}"
  859. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  860. resp = requests.get(url, proxies=proxies, timeout=30)
  861. resp.raise_for_status()
  862. print(f"✅ Bark{account_label} RSS 通知发送成功")
  863. results.append(True)
  864. except Exception as e:
  865. print(f"❌ Bark{account_label} RSS 通知发送失败: {e}")
  866. results.append(False)
  867. return any(results) if results else False
  868. def _send_rss_slack(self, content: str, proxy_url: Optional[str]) -> bool:
  869. """发送 RSS 到 Slack"""
  870. import requests
  871. webhooks = parse_multi_account_config(self.config["SLACK_WEBHOOK_URL"])
  872. webhooks = limit_accounts(webhooks, self.max_accounts, "Slack")
  873. results = []
  874. for i, webhook_url in enumerate(webhooks):
  875. if not webhook_url:
  876. continue
  877. account_label = f"账号{i+1}" if len(webhooks) > 1 else ""
  878. try:
  879. batches = self.split_content_func(
  880. content, self.config.get("SLACK_BATCH_SIZE", 4000)
  881. )
  882. for batch_content in batches:
  883. payload = {
  884. "blocks": [
  885. {
  886. "type": "section",
  887. "text": {
  888. "type": "mrkdwn",
  889. "text": batch_content,
  890. },
  891. }
  892. ]
  893. }
  894. proxies = {"http": proxy_url, "https": proxy_url} if proxy_url else None
  895. resp = requests.post(webhook_url, json=payload, proxies=proxies, timeout=30)
  896. resp.raise_for_status()
  897. print(f"✅ Slack{account_label} RSS 通知发送成功")
  898. results.append(True)
  899. except Exception as e:
  900. print(f"❌ Slack{account_label} RSS 通知发送失败: {e}")
  901. results.append(False)
  902. return any(results) if results else False