splitter.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. # coding=utf-8
  2. """
  3. 消息分批处理模块
  4. 提供消息内容分批拆分功能,确保消息大小不超过各平台限制
  5. """
  6. from datetime import datetime
  7. from typing import Dict, List, Optional, Callable
  8. from trendradar.report.formatter import format_title_for_platform
  9. # 默认批次大小配置
  10. DEFAULT_BATCH_SIZES = {
  11. "dingtalk": 20000,
  12. "feishu": 29000,
  13. "ntfy": 3800,
  14. "default": 4000,
  15. }
  16. def split_content_into_batches(
  17. report_data: Dict,
  18. format_type: str,
  19. update_info: Optional[Dict] = None,
  20. max_bytes: Optional[int] = None,
  21. mode: str = "daily",
  22. batch_sizes: Optional[Dict[str, int]] = None,
  23. feishu_separator: str = "---",
  24. reverse_content_order: bool = False,
  25. get_time_func: Optional[Callable[[], datetime]] = None,
  26. ) -> List[str]:
  27. """分批处理消息内容,确保词组标题+至少第一条新闻的完整性
  28. Args:
  29. report_data: 报告数据字典,包含 stats, new_titles, failed_ids, total_new_count
  30. format_type: 格式类型 (feishu, dingtalk, wework, telegram, ntfy, bark, slack)
  31. update_info: 版本更新信息(可选)
  32. max_bytes: 最大字节数(可选,如果不指定则使用默认配置)
  33. mode: 报告模式 (daily, incremental, current)
  34. batch_sizes: 批次大小配置字典(可选)
  35. feishu_separator: 飞书消息分隔符
  36. reverse_content_order: 是否反转内容顺序(新增在前)
  37. get_time_func: 获取当前时间的函数(可选)
  38. Returns:
  39. 分批后的消息内容列表
  40. """
  41. # 合并批次大小配置
  42. sizes = {**DEFAULT_BATCH_SIZES, **(batch_sizes or {})}
  43. if max_bytes is None:
  44. if format_type == "dingtalk":
  45. max_bytes = sizes.get("dingtalk", 20000)
  46. elif format_type == "feishu":
  47. max_bytes = sizes.get("feishu", 29000)
  48. elif format_type == "ntfy":
  49. max_bytes = sizes.get("ntfy", 3800)
  50. else:
  51. max_bytes = sizes.get("default", 4000)
  52. batches = []
  53. total_titles = sum(
  54. len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
  55. )
  56. now = get_time_func() if get_time_func else datetime.now()
  57. base_header = ""
  58. if format_type in ("wework", "bark"):
  59. base_header = f"**总新闻数:** {total_titles}\n\n\n\n"
  60. elif format_type == "telegram":
  61. base_header = f"总新闻数: {total_titles}\n\n"
  62. elif format_type == "ntfy":
  63. base_header = f"**总新闻数:** {total_titles}\n\n"
  64. elif format_type == "feishu":
  65. base_header = ""
  66. elif format_type == "dingtalk":
  67. base_header = f"**总新闻数:** {total_titles}\n\n"
  68. base_header += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
  69. base_header += f"**类型:** 热点分析报告\n\n"
  70. base_header += "---\n\n"
  71. elif format_type == "slack":
  72. base_header = f"*总新闻数:* {total_titles}\n\n"
  73. base_footer = ""
  74. if format_type in ("wework", "bark"):
  75. base_footer = f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
  76. if update_info:
  77. base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
  78. elif format_type == "telegram":
  79. base_footer = f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
  80. if update_info:
  81. base_footer += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}"
  82. elif format_type == "ntfy":
  83. base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
  84. if update_info:
  85. base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
  86. elif format_type == "feishu":
  87. base_footer = f"\n\n<font color='grey'>更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}</font>"
  88. if update_info:
  89. base_footer += f"\n<font color='grey'>TrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}</font>"
  90. elif format_type == "dingtalk":
  91. base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
  92. if update_info:
  93. base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
  94. elif format_type == "slack":
  95. base_footer = f"\n\n_更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}_"
  96. if update_info:
  97. base_footer += f"\n_TrendRadar 发现新版本 *{update_info['remote_version']}*,当前 *{update_info['current_version']}_"
  98. stats_header = ""
  99. if report_data["stats"]:
  100. if format_type in ("wework", "bark"):
  101. stats_header = f"📊 **热点词汇统计**\n\n"
  102. elif format_type == "telegram":
  103. stats_header = f"📊 热点词汇统计\n\n"
  104. elif format_type == "ntfy":
  105. stats_header = f"📊 **热点词汇统计**\n\n"
  106. elif format_type == "feishu":
  107. stats_header = f"📊 **热点词汇统计**\n\n"
  108. elif format_type == "dingtalk":
  109. stats_header = f"📊 **热点词汇统计**\n\n"
  110. elif format_type == "slack":
  111. stats_header = f"📊 *热点词汇统计*\n\n"
  112. current_batch = base_header
  113. current_batch_has_content = False
  114. if (
  115. not report_data["stats"]
  116. and not report_data["new_titles"]
  117. and not report_data["failed_ids"]
  118. ):
  119. if mode == "incremental":
  120. mode_text = "增量模式下暂无新增匹配的热点词汇"
  121. elif mode == "current":
  122. mode_text = "当前榜单模式下暂无匹配的热点词汇"
  123. else:
  124. mode_text = "暂无匹配的热点词汇"
  125. simple_content = f"📭 {mode_text}\n\n"
  126. final_content = base_header + simple_content + base_footer
  127. batches.append(final_content)
  128. return batches
  129. # 定义处理热点词汇统计的函数
  130. def process_stats_section(current_batch, current_batch_has_content, batches):
  131. """处理热点词汇统计"""
  132. if not report_data["stats"]:
  133. return current_batch, current_batch_has_content, batches
  134. total_count = len(report_data["stats"])
  135. # 添加统计标题
  136. test_content = current_batch + stats_header
  137. if (
  138. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  139. < max_bytes
  140. ):
  141. current_batch = test_content
  142. current_batch_has_content = True
  143. else:
  144. if current_batch_has_content:
  145. batches.append(current_batch + base_footer)
  146. current_batch = base_header + stats_header
  147. current_batch_has_content = True
  148. # 逐个处理词组(确保词组标题+第一条新闻的原子性)
  149. for i, stat in enumerate(report_data["stats"]):
  150. word = stat["word"]
  151. count = stat["count"]
  152. sequence_display = f"[{i + 1}/{total_count}]"
  153. # 构建词组标题
  154. word_header = ""
  155. if format_type in ("wework", "bark"):
  156. if count >= 10:
  157. word_header = (
  158. f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
  159. )
  160. elif count >= 5:
  161. word_header = (
  162. f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
  163. )
  164. else:
  165. word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
  166. elif format_type == "telegram":
  167. if count >= 10:
  168. word_header = f"🔥 {sequence_display} {word} : {count} 条\n\n"
  169. elif count >= 5:
  170. word_header = f"📈 {sequence_display} {word} : {count} 条\n\n"
  171. else:
  172. word_header = f"📌 {sequence_display} {word} : {count} 条\n\n"
  173. elif format_type == "ntfy":
  174. if count >= 10:
  175. word_header = (
  176. f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
  177. )
  178. elif count >= 5:
  179. word_header = (
  180. f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
  181. )
  182. else:
  183. word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
  184. elif format_type == "feishu":
  185. if count >= 10:
  186. word_header = f"🔥 <font color='grey'>{sequence_display}</font> **{word}** : <font color='red'>{count}</font> 条\n\n"
  187. elif count >= 5:
  188. word_header = f"📈 <font color='grey'>{sequence_display}</font> **{word}** : <font color='orange'>{count}</font> 条\n\n"
  189. else:
  190. word_header = f"📌 <font color='grey'>{sequence_display}</font> **{word}** : {count} 条\n\n"
  191. elif format_type == "dingtalk":
  192. if count >= 10:
  193. word_header = (
  194. f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
  195. )
  196. elif count >= 5:
  197. word_header = (
  198. f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
  199. )
  200. else:
  201. word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
  202. elif format_type == "slack":
  203. if count >= 10:
  204. word_header = (
  205. f"🔥 {sequence_display} *{word}* : *{count}* 条\n\n"
  206. )
  207. elif count >= 5:
  208. word_header = (
  209. f"📈 {sequence_display} *{word}* : *{count}* 条\n\n"
  210. )
  211. else:
  212. word_header = f"📌 {sequence_display} *{word}* : {count} 条\n\n"
  213. # 构建第一条新闻
  214. first_news_line = ""
  215. if stat["titles"]:
  216. first_title_data = stat["titles"][0]
  217. if format_type in ("wework", "bark"):
  218. formatted_title = format_title_for_platform(
  219. "wework", first_title_data, show_source=True
  220. )
  221. elif format_type == "telegram":
  222. formatted_title = format_title_for_platform(
  223. "telegram", first_title_data, show_source=True
  224. )
  225. elif format_type == "ntfy":
  226. formatted_title = format_title_for_platform(
  227. "ntfy", first_title_data, show_source=True
  228. )
  229. elif format_type == "feishu":
  230. formatted_title = format_title_for_platform(
  231. "feishu", first_title_data, show_source=True
  232. )
  233. elif format_type == "dingtalk":
  234. formatted_title = format_title_for_platform(
  235. "dingtalk", first_title_data, show_source=True
  236. )
  237. elif format_type == "slack":
  238. formatted_title = format_title_for_platform(
  239. "slack", first_title_data, show_source=True
  240. )
  241. else:
  242. formatted_title = f"{first_title_data['title']}"
  243. first_news_line = f" 1. {formatted_title}\n"
  244. if len(stat["titles"]) > 1:
  245. first_news_line += "\n"
  246. # 原子性检查:词组标题+第一条新闻必须一起处理
  247. word_with_first_news = word_header + first_news_line
  248. test_content = current_batch + word_with_first_news
  249. if (
  250. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  251. >= max_bytes
  252. ):
  253. # 当前批次容纳不下,开启新批次
  254. if current_batch_has_content:
  255. batches.append(current_batch + base_footer)
  256. current_batch = base_header + stats_header + word_with_first_news
  257. current_batch_has_content = True
  258. start_index = 1
  259. else:
  260. current_batch = test_content
  261. current_batch_has_content = True
  262. start_index = 1
  263. # 处理剩余新闻条目
  264. for j in range(start_index, len(stat["titles"])):
  265. title_data = stat["titles"][j]
  266. if format_type in ("wework", "bark"):
  267. formatted_title = format_title_for_platform(
  268. "wework", title_data, show_source=True
  269. )
  270. elif format_type == "telegram":
  271. formatted_title = format_title_for_platform(
  272. "telegram", title_data, show_source=True
  273. )
  274. elif format_type == "ntfy":
  275. formatted_title = format_title_for_platform(
  276. "ntfy", title_data, show_source=True
  277. )
  278. elif format_type == "feishu":
  279. formatted_title = format_title_for_platform(
  280. "feishu", title_data, show_source=True
  281. )
  282. elif format_type == "dingtalk":
  283. formatted_title = format_title_for_platform(
  284. "dingtalk", title_data, show_source=True
  285. )
  286. elif format_type == "slack":
  287. formatted_title = format_title_for_platform(
  288. "slack", title_data, show_source=True
  289. )
  290. else:
  291. formatted_title = f"{title_data['title']}"
  292. news_line = f" {j + 1}. {formatted_title}\n"
  293. if j < len(stat["titles"]) - 1:
  294. news_line += "\n"
  295. test_content = current_batch + news_line
  296. if (
  297. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  298. >= max_bytes
  299. ):
  300. if current_batch_has_content:
  301. batches.append(current_batch + base_footer)
  302. current_batch = base_header + stats_header + word_header + news_line
  303. current_batch_has_content = True
  304. else:
  305. current_batch = test_content
  306. current_batch_has_content = True
  307. # 词组间分隔符
  308. if i < len(report_data["stats"]) - 1:
  309. separator = ""
  310. if format_type in ("wework", "bark"):
  311. separator = f"\n\n\n\n"
  312. elif format_type == "telegram":
  313. separator = f"\n\n"
  314. elif format_type == "ntfy":
  315. separator = f"\n\n"
  316. elif format_type == "feishu":
  317. separator = f"\n{feishu_separator}\n\n"
  318. elif format_type == "dingtalk":
  319. separator = f"\n---\n\n"
  320. elif format_type == "slack":
  321. separator = f"\n\n"
  322. test_content = current_batch + separator
  323. if (
  324. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  325. < max_bytes
  326. ):
  327. current_batch = test_content
  328. return current_batch, current_batch_has_content, batches
  329. # 定义处理新增新闻的函数
  330. def process_new_titles_section(current_batch, current_batch_has_content, batches):
  331. """处理新增新闻"""
  332. if not report_data["new_titles"]:
  333. return current_batch, current_batch_has_content, batches
  334. new_header = ""
  335. if format_type in ("wework", "bark"):
  336. new_header = f"\n\n\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
  337. elif format_type == "telegram":
  338. new_header = (
  339. f"\n\n🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n"
  340. )
  341. elif format_type == "ntfy":
  342. new_header = f"\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
  343. elif format_type == "feishu":
  344. new_header = f"\n{feishu_separator}\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
  345. elif format_type == "dingtalk":
  346. new_header = f"\n---\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
  347. elif format_type == "slack":
  348. new_header = f"\n\n🆕 *本次新增热点新闻* (共 {report_data['total_new_count']} 条)\n\n"
  349. test_content = current_batch + new_header
  350. if (
  351. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  352. >= max_bytes
  353. ):
  354. if current_batch_has_content:
  355. batches.append(current_batch + base_footer)
  356. current_batch = base_header + new_header
  357. current_batch_has_content = True
  358. else:
  359. current_batch = test_content
  360. current_batch_has_content = True
  361. # 逐个处理新增新闻来源
  362. for source_data in report_data["new_titles"]:
  363. source_header = ""
  364. if format_type in ("wework", "bark"):
  365. source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
  366. elif format_type == "telegram":
  367. source_header = f"{source_data['source_name']} ({len(source_data['titles'])} 条):\n\n"
  368. elif format_type == "ntfy":
  369. source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
  370. elif format_type == "feishu":
  371. source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
  372. elif format_type == "dingtalk":
  373. source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
  374. elif format_type == "slack":
  375. source_header = f"*{source_data['source_name']}* ({len(source_data['titles'])} 条):\n\n"
  376. # 构建第一条新增新闻
  377. first_news_line = ""
  378. if source_data["titles"]:
  379. first_title_data = source_data["titles"][0]
  380. title_data_copy = first_title_data.copy()
  381. title_data_copy["is_new"] = False
  382. if format_type in ("wework", "bark"):
  383. formatted_title = format_title_for_platform(
  384. "wework", title_data_copy, show_source=False
  385. )
  386. elif format_type == "telegram":
  387. formatted_title = format_title_for_platform(
  388. "telegram", title_data_copy, show_source=False
  389. )
  390. elif format_type == "feishu":
  391. formatted_title = format_title_for_platform(
  392. "feishu", title_data_copy, show_source=False
  393. )
  394. elif format_type == "dingtalk":
  395. formatted_title = format_title_for_platform(
  396. "dingtalk", title_data_copy, show_source=False
  397. )
  398. elif format_type == "slack":
  399. formatted_title = format_title_for_platform(
  400. "slack", title_data_copy, show_source=False
  401. )
  402. else:
  403. formatted_title = f"{title_data_copy['title']}"
  404. first_news_line = f" 1. {formatted_title}\n"
  405. # 原子性检查:来源标题+第一条新闻
  406. source_with_first_news = source_header + first_news_line
  407. test_content = current_batch + source_with_first_news
  408. if (
  409. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  410. >= max_bytes
  411. ):
  412. if current_batch_has_content:
  413. batches.append(current_batch + base_footer)
  414. current_batch = base_header + new_header + source_with_first_news
  415. current_batch_has_content = True
  416. start_index = 1
  417. else:
  418. current_batch = test_content
  419. current_batch_has_content = True
  420. start_index = 1
  421. # 处理剩余新增新闻
  422. for j in range(start_index, len(source_data["titles"])):
  423. title_data = source_data["titles"][j]
  424. title_data_copy = title_data.copy()
  425. title_data_copy["is_new"] = False
  426. if format_type == "wework":
  427. formatted_title = format_title_for_platform(
  428. "wework", title_data_copy, show_source=False
  429. )
  430. elif format_type == "telegram":
  431. formatted_title = format_title_for_platform(
  432. "telegram", title_data_copy, show_source=False
  433. )
  434. elif format_type == "feishu":
  435. formatted_title = format_title_for_platform(
  436. "feishu", title_data_copy, show_source=False
  437. )
  438. elif format_type == "dingtalk":
  439. formatted_title = format_title_for_platform(
  440. "dingtalk", title_data_copy, show_source=False
  441. )
  442. elif format_type == "slack":
  443. formatted_title = format_title_for_platform(
  444. "slack", title_data_copy, show_source=False
  445. )
  446. else:
  447. formatted_title = f"{title_data_copy['title']}"
  448. news_line = f" {j + 1}. {formatted_title}\n"
  449. test_content = current_batch + news_line
  450. if (
  451. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  452. >= max_bytes
  453. ):
  454. if current_batch_has_content:
  455. batches.append(current_batch + base_footer)
  456. current_batch = base_header + new_header + source_header + news_line
  457. current_batch_has_content = True
  458. else:
  459. current_batch = test_content
  460. current_batch_has_content = True
  461. current_batch += "\n"
  462. return current_batch, current_batch_has_content, batches
  463. # 根据配置决定处理顺序
  464. if reverse_content_order:
  465. # 新增热点在前,热点词汇统计在后
  466. current_batch, current_batch_has_content, batches = process_new_titles_section(
  467. current_batch, current_batch_has_content, batches
  468. )
  469. current_batch, current_batch_has_content, batches = process_stats_section(
  470. current_batch, current_batch_has_content, batches
  471. )
  472. else:
  473. # 默认:热点词汇统计在前,新增热点在后
  474. current_batch, current_batch_has_content, batches = process_stats_section(
  475. current_batch, current_batch_has_content, batches
  476. )
  477. current_batch, current_batch_has_content, batches = process_new_titles_section(
  478. current_batch, current_batch_has_content, batches
  479. )
  480. if report_data["failed_ids"]:
  481. failed_header = ""
  482. if format_type == "wework":
  483. failed_header = f"\n\n\n\n⚠️ **数据获取失败的平台:**\n\n"
  484. elif format_type == "telegram":
  485. failed_header = f"\n\n⚠️ 数据获取失败的平台:\n\n"
  486. elif format_type == "ntfy":
  487. failed_header = f"\n\n⚠️ **数据获取失败的平台:**\n\n"
  488. elif format_type == "feishu":
  489. failed_header = f"\n{feishu_separator}\n\n⚠️ **数据获取失败的平台:**\n\n"
  490. elif format_type == "dingtalk":
  491. failed_header = f"\n---\n\n⚠️ **数据获取失败的平台:**\n\n"
  492. test_content = current_batch + failed_header
  493. if (
  494. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  495. >= max_bytes
  496. ):
  497. if current_batch_has_content:
  498. batches.append(current_batch + base_footer)
  499. current_batch = base_header + failed_header
  500. current_batch_has_content = True
  501. else:
  502. current_batch = test_content
  503. current_batch_has_content = True
  504. for i, id_value in enumerate(report_data["failed_ids"], 1):
  505. if format_type == "feishu":
  506. failed_line = f" • <font color='red'>{id_value}</font>\n"
  507. elif format_type == "dingtalk":
  508. failed_line = f" • **{id_value}**\n"
  509. else:
  510. failed_line = f" • {id_value}\n"
  511. test_content = current_batch + failed_line
  512. if (
  513. len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
  514. >= max_bytes
  515. ):
  516. if current_batch_has_content:
  517. batches.append(current_batch + base_footer)
  518. current_batch = base_header + failed_header + failed_line
  519. current_batch_has_content = True
  520. else:
  521. current_batch = test_content
  522. current_batch_has_content = True
  523. # 完成最后批次
  524. if current_batch_has_content:
  525. batches.append(current_batch + base_footer)
  526. return batches