batch.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # coding=utf-8
  2. """
  3. 批次处理模块
  4. 提供消息分批发送的辅助函数
  5. """
  6. from typing import List
  7. def get_batch_header(format_type: str, batch_num: int, total_batches: int) -> str:
  8. """根据 format_type 生成对应格式的批次头部
  9. Args:
  10. format_type: 推送类型(telegram, slack, wework_text, bark, feishu, dingtalk, ntfy, wework)
  11. batch_num: 当前批次编号
  12. total_batches: 总批次数
  13. Returns:
  14. 格式化的批次头部字符串
  15. """
  16. if format_type == "telegram":
  17. return f"<b>[第 {batch_num}/{total_batches} 批次]</b>\n\n"
  18. elif format_type == "slack":
  19. return f"*[第 {batch_num}/{total_batches} 批次]*\n\n"
  20. elif format_type in ("wework_text", "bark"):
  21. # 企业微信文本模式和 Bark 使用纯文本格式
  22. return f"[第 {batch_num}/{total_batches} 批次]\n\n"
  23. else:
  24. # 飞书、钉钉、ntfy、企业微信 markdown 模式
  25. return f"**[第 {batch_num}/{total_batches} 批次]**\n\n"
  26. def get_max_batch_header_size(format_type: str) -> int:
  27. """估算批次头部的最大字节数(假设最多 99 批次)
  28. 用于在分批时预留空间,避免事后截断破坏内容完整性。
  29. Args:
  30. format_type: 推送类型
  31. Returns:
  32. 最大头部字节数
  33. """
  34. # 生成最坏情况的头部(99/99 批次)
  35. max_header = get_batch_header(format_type, 99, 99)
  36. return len(max_header.encode("utf-8"))
  37. def truncate_to_bytes(text: str, max_bytes: int) -> str:
  38. """安全截断字符串到指定字节数,避免截断多字节字符
  39. Args:
  40. text: 要截断的文本
  41. max_bytes: 最大字节数
  42. Returns:
  43. 截断后的文本
  44. """
  45. text_bytes = text.encode("utf-8")
  46. if len(text_bytes) <= max_bytes:
  47. return text
  48. truncated = text_bytes[:max_bytes]
  49. for i in range(min(4, len(truncated))):
  50. try:
  51. return truncated[: len(truncated) - i].decode("utf-8")
  52. except UnicodeDecodeError:
  53. continue
  54. return ""
  55. def truncate_at_line_boundary(text: str, max_bytes: int) -> str:
  56. """在行边界处截断,确保不在标题或内容中间断开
  57. 先按字节截断,再回退到最近的换行符位置,保证每一行都完整。
  58. Args:
  59. text: 要截断的文本
  60. max_bytes: 最大字节数
  61. Returns:
  62. 在最后一个完整行处结束的截断文本
  63. """
  64. if len(text.encode("utf-8")) <= max_bytes:
  65. return text
  66. rough_cut = truncate_to_bytes(text, max_bytes)
  67. last_newline = rough_cut.rfind("\n")
  68. if last_newline > 0:
  69. return rough_cut[:last_newline]
  70. return rough_cut
  71. def truncate_preserving_footer(content: str, max_bytes: int) -> str:
  72. """截断内容,优先保留尾部 footer(更新时间等),正文在行边界处截断
  73. 识别内容末尾的 footer 区域(更新时间、版本提示等),
  74. 对 footer 之前的正文部分在行边界处截断,再拼接完整 footer。
  75. Args:
  76. content: 完整内容(正文 + footer)
  77. max_bytes: 最大字节数
  78. Returns:
  79. 截断后的内容,footer 完整保留,正文在行边界处截断
  80. """
  81. if len(content.encode("utf-8")) <= max_bytes:
  82. return content
  83. # 各平台 footer 的常见开头模式
  84. footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n<font", "\n\n_", "\n\n更新时间"]
  85. footer_start = -1
  86. for marker in footer_markers:
  87. pos = content.rfind(marker)
  88. if pos > 0:
  89. footer_start = pos
  90. break
  91. if footer_start <= 0:
  92. return truncate_at_line_boundary(content, max_bytes)
  93. footer = content[footer_start:]
  94. body = content[:footer_start]
  95. footer_size = len(footer.encode("utf-8"))
  96. if footer_size >= max_bytes:
  97. return truncate_at_line_boundary(content, max_bytes)
  98. truncated_body = truncate_at_line_boundary(body, max_bytes - footer_size)
  99. return truncated_body + footer
  100. def _split_oversized_batch(content: str, max_content_bytes: int) -> List[str]:
  101. """将超限批次按行边界拆分成多个子批次(保留 footer)
  102. Args:
  103. content: 超限的批次内容(含 footer)
  104. max_content_bytes: 每个子批次的最大字节数
  105. Returns:
  106. 拆分后的子批次列表
  107. """
  108. # 识别 footer
  109. footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n<font", "\n\n_", "\n\n更新时间"]
  110. footer = ""
  111. body = content
  112. for marker in footer_markers:
  113. pos = content.rfind(marker)
  114. if pos > 0:
  115. footer = content[pos:]
  116. body = content[:pos]
  117. break
  118. footer_size = len(footer.encode("utf-8"))
  119. available = max_content_bytes - footer_size
  120. if available <= 0:
  121. return [truncate_at_line_boundary(content, max_content_bytes)]
  122. # 按行拆分 body
  123. lines = body.split("\n")
  124. sub_batches = []
  125. current = ""
  126. for line in lines:
  127. candidate = current + line + "\n"
  128. if len(candidate.encode("utf-8")) > available and current.strip():
  129. sub_batches.append(current + footer)
  130. current = line + "\n"
  131. else:
  132. current = candidate
  133. if current.strip():
  134. sub_batches.append(current + footer)
  135. return sub_batches if sub_batches else [content]
  136. def add_batch_headers(
  137. batches: List[str], format_type: str, max_bytes: int
  138. ) -> List[str]:
  139. """为批次添加头部,超限时拆分成多个子批次(不丢弃内容)
  140. Args:
  141. batches: 原始批次列表
  142. format_type: 推送类型(bark, telegram, feishu 等)
  143. max_bytes: 该推送类型的最大字节限制
  144. Returns:
  145. 添加头部后的批次列表
  146. """
  147. if len(batches) <= 1:
  148. return batches
  149. # 第一遍:拆分超限批次
  150. expanded = []
  151. max_header_size = get_max_batch_header_size(format_type)
  152. for content in batches:
  153. if len(content.encode("utf-8")) + max_header_size > max_bytes:
  154. expanded.extend(_split_oversized_batch(content, max_bytes - max_header_size))
  155. else:
  156. expanded.append(content)
  157. # 第二遍:添加头部
  158. if len(expanded) <= 1:
  159. return expanded
  160. total = len(expanded)
  161. result = []
  162. for i, content in enumerate(expanded, 1):
  163. header = get_batch_header(format_type, i, total)
  164. header_size = len(header.encode("utf-8"))
  165. max_content_size = max_bytes - header_size
  166. if len(content.encode("utf-8")) > max_content_size:
  167. # 仍超限(极端情况:单行过长),行边界截断
  168. content = truncate_preserving_footer(content, max_content_size)
  169. result.append(header + content)
  170. return result