# coding=utf-8 """ 批次处理模块 提供消息分批发送的辅助函数 """ from typing import List def get_batch_header(format_type: str, batch_num: int, total_batches: int) -> str: """根据 format_type 生成对应格式的批次头部 Args: format_type: 推送类型(telegram, slack, wework_text, bark, feishu, dingtalk, ntfy, wework) batch_num: 当前批次编号 total_batches: 总批次数 Returns: 格式化的批次头部字符串 """ if format_type == "telegram": return f"[第 {batch_num}/{total_batches} 批次]\n\n" elif format_type == "slack": return f"*[第 {batch_num}/{total_batches} 批次]*\n\n" elif format_type in ("wework_text", "bark"): # 企业微信文本模式和 Bark 使用纯文本格式 return f"[第 {batch_num}/{total_batches} 批次]\n\n" else: # 飞书、钉钉、ntfy、企业微信 markdown 模式 return f"**[第 {batch_num}/{total_batches} 批次]**\n\n" def get_max_batch_header_size(format_type: str) -> int: """估算批次头部的最大字节数(假设最多 99 批次) 用于在分批时预留空间,避免事后截断破坏内容完整性。 Args: format_type: 推送类型 Returns: 最大头部字节数 """ # 生成最坏情况的头部(99/99 批次) max_header = get_batch_header(format_type, 99, 99) return len(max_header.encode("utf-8")) def truncate_to_bytes(text: str, max_bytes: int) -> str: """安全截断字符串到指定字节数,避免截断多字节字符 Args: text: 要截断的文本 max_bytes: 最大字节数 Returns: 截断后的文本 """ text_bytes = text.encode("utf-8") if len(text_bytes) <= max_bytes: return text truncated = text_bytes[:max_bytes] for i in range(min(4, len(truncated))): try: return truncated[: len(truncated) - i].decode("utf-8") except UnicodeDecodeError: continue return "" def truncate_at_line_boundary(text: str, max_bytes: int) -> str: """在行边界处截断,确保不在标题或内容中间断开 先按字节截断,再回退到最近的换行符位置,保证每一行都完整。 Args: text: 要截断的文本 max_bytes: 最大字节数 Returns: 在最后一个完整行处结束的截断文本 """ if len(text.encode("utf-8")) <= max_bytes: return text rough_cut = truncate_to_bytes(text, max_bytes) last_newline = rough_cut.rfind("\n") if last_newline > 0: return rough_cut[:last_newline] return rough_cut def truncate_preserving_footer(content: str, max_bytes: int) -> str: """截断内容,优先保留尾部 footer(更新时间等),正文在行边界处截断 识别内容末尾的 footer 区域(更新时间、版本提示等), 对 footer 之前的正文部分在行边界处截断,再拼接完整 footer。 Args: content: 完整内容(正文 + footer) max_bytes: 最大字节数 Returns: 截断后的内容,footer 完整保留,正文在行边界处截断 """ if len(content.encode("utf-8")) <= max_bytes: return content # 各平台 footer 的常见开头模式 footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n 0: footer_start = pos break if footer_start <= 0: return truncate_at_line_boundary(content, max_bytes) footer = content[footer_start:] body = content[:footer_start] footer_size = len(footer.encode("utf-8")) if footer_size >= max_bytes: return truncate_at_line_boundary(content, max_bytes) truncated_body = truncate_at_line_boundary(body, max_bytes - footer_size) return truncated_body + footer def _split_oversized_batch(content: str, max_content_bytes: int) -> List[str]: """将超限批次按行边界拆分成多个子批次(保留 footer) Args: content: 超限的批次内容(含 footer) max_content_bytes: 每个子批次的最大字节数 Returns: 拆分后的子批次列表 """ # 识别 footer footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n 0: footer = content[pos:] body = content[:pos] break footer_size = len(footer.encode("utf-8")) available = max_content_bytes - footer_size if available <= 0: return [truncate_at_line_boundary(content, max_content_bytes)] # 按行拆分 body lines = body.split("\n") sub_batches = [] current = "" for line in lines: candidate = current + line + "\n" if len(candidate.encode("utf-8")) > available and current.strip(): sub_batches.append(current + footer) current = line + "\n" else: current = candidate if current.strip(): sub_batches.append(current + footer) return sub_batches if sub_batches else [content] def add_batch_headers( batches: List[str], format_type: str, max_bytes: int ) -> List[str]: """为批次添加头部,超限时拆分成多个子批次(不丢弃内容) Args: batches: 原始批次列表 format_type: 推送类型(bark, telegram, feishu 等) max_bytes: 该推送类型的最大字节限制 Returns: 添加头部后的批次列表 """ if len(batches) <= 1: return batches # 第一遍:拆分超限批次 expanded = [] max_header_size = get_max_batch_header_size(format_type) for content in batches: if len(content.encode("utf-8")) + max_header_size > max_bytes: expanded.extend(_split_oversized_batch(content, max_bytes - max_header_size)) else: expanded.append(content) # 第二遍:添加头部 if len(expanded) <= 1: return expanded total = len(expanded) result = [] for i, content in enumerate(expanded, 1): header = get_batch_header(format_type, i, total) header_size = len(header.encode("utf-8")) max_content_size = max_bytes - header_size if len(content.encode("utf-8")) > max_content_size: # 仍超限(极端情况:单行过长),行边界截断 content = truncate_preserving_footer(content, max_content_size) result.append(header + content) return result