ソースを参照

fix(notification): 修复消息分批截断丢弃内容的问题

sansan 1 ヶ月 前
コミット
3194a7a7e8
2 ファイル変更343 行追加67 行削除
  1. 126 20
      trendradar/notification/batch.py
  2. 217 47
      trendradar/notification/splitter.py

+ 126 - 20
trendradar/notification/batch.py

@@ -61,24 +61,125 @@ def truncate_to_bytes(text: str, max_bytes: int) -> str:
     if len(text_bytes) <= max_bytes:
         return text
 
-    # 截断到指定字节数
     truncated = text_bytes[:max_bytes]
-
-    # 处理可能的不完整 UTF-8 字符
     for i in range(min(4, len(truncated))):
         try:
             return truncated[: len(truncated) - i].decode("utf-8")
         except UnicodeDecodeError:
             continue
-
-    # 极端情况:返回空字符串
     return ""
 
 
+def truncate_at_line_boundary(text: str, max_bytes: int) -> str:
+    """在行边界处截断,确保不在标题或内容中间断开
+
+    先按字节截断,再回退到最近的换行符位置,保证每一行都完整。
+
+    Args:
+        text: 要截断的文本
+        max_bytes: 最大字节数
+
+    Returns:
+        在最后一个完整行处结束的截断文本
+    """
+    if len(text.encode("utf-8")) <= max_bytes:
+        return text
+
+    rough_cut = truncate_to_bytes(text, max_bytes)
+    last_newline = rough_cut.rfind("\n")
+    if last_newline > 0:
+        return rough_cut[:last_newline]
+    return rough_cut
+
+
+def truncate_preserving_footer(content: str, max_bytes: int) -> str:
+    """截断内容,优先保留尾部 footer(更新时间等),正文在行边界处截断
+
+    识别内容末尾的 footer 区域(更新时间、版本提示等),
+    对 footer 之前的正文部分在行边界处截断,再拼接完整 footer。
+
+    Args:
+        content: 完整内容(正文 + footer)
+        max_bytes: 最大字节数
+
+    Returns:
+        截断后的内容,footer 完整保留,正文在行边界处截断
+    """
+    if len(content.encode("utf-8")) <= max_bytes:
+        return content
+
+    # 各平台 footer 的常见开头模式
+    footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n<font", "\n\n_", "\n\n更新时间"]
+    footer_start = -1
+    for marker in footer_markers:
+        pos = content.rfind(marker)
+        if pos > 0:
+            footer_start = pos
+            break
+
+    if footer_start <= 0:
+        return truncate_at_line_boundary(content, max_bytes)
+
+    footer = content[footer_start:]
+    body = content[:footer_start]
+    footer_size = len(footer.encode("utf-8"))
+
+    if footer_size >= max_bytes:
+        return truncate_at_line_boundary(content, max_bytes)
+
+    truncated_body = truncate_at_line_boundary(body, max_bytes - footer_size)
+    return truncated_body + footer
+
+
+def _split_oversized_batch(content: str, max_content_bytes: int) -> List[str]:
+    """将超限批次按行边界拆分成多个子批次(保留 footer)
+
+    Args:
+        content: 超限的批次内容(含 footer)
+        max_content_bytes: 每个子批次的最大字节数
+
+    Returns:
+        拆分后的子批次列表
+    """
+    # 识别 footer
+    footer_markers = ["\n\n\n> ", "\n\n> ", "\n\n<font", "\n\n_", "\n\n更新时间"]
+    footer = ""
+    body = content
+    for marker in footer_markers:
+        pos = content.rfind(marker)
+        if pos > 0:
+            footer = content[pos:]
+            body = content[:pos]
+            break
+
+    footer_size = len(footer.encode("utf-8"))
+    available = max_content_bytes - footer_size
+    if available <= 0:
+        return [truncate_at_line_boundary(content, max_content_bytes)]
+
+    # 按行拆分 body
+    lines = body.split("\n")
+    sub_batches = []
+    current = ""
+
+    for line in lines:
+        candidate = current + line + "\n"
+        if len(candidate.encode("utf-8")) > available and current.strip():
+            sub_batches.append(current + footer)
+            current = line + "\n"
+        else:
+            current = candidate
+
+    if current.strip():
+        sub_batches.append(current + footer)
+
+    return sub_batches if sub_batches else [content]
+
+
 def add_batch_headers(
     batches: List[str], format_type: str, max_bytes: int
 ) -> List[str]:
-    """为批次添加头部,动态计算确保总大小不超过限制
+    """为批次添加头部,超限时拆分成多个子批次(不丢弃内容)
 
     Args:
         batches: 原始批次列表
@@ -91,24 +192,29 @@ def add_batch_headers(
     if len(batches) <= 1:
         return batches
 
-    total = len(batches)
-    result = []
+    # 第一遍:拆分超限批次
+    expanded = []
+    max_header_size = get_max_batch_header_size(format_type)
+    for content in batches:
+        if len(content.encode("utf-8")) + max_header_size > max_bytes:
+            expanded.extend(_split_oversized_batch(content, max_bytes - max_header_size))
+        else:
+            expanded.append(content)
+
+    # 第二遍:添加头部
+    if len(expanded) <= 1:
+        return expanded
 
-    for i, content in enumerate(batches, 1):
-        # 生成批次头部
+    total = len(expanded)
+    result = []
+    for i, content in enumerate(expanded, 1):
         header = get_batch_header(format_type, i, total)
         header_size = len(header.encode("utf-8"))
-
-        # 动态计算允许的最大内容大小
         max_content_size = max_bytes - header_size
-        content_size = len(content.encode("utf-8"))
-
-        # 如果超出,截断到安全大小
-        if content_size > max_content_size:
-            print(
-                f"警告:{format_type} 第 {i}/{total} 批次内容({content_size}字节) + 头部({header_size}字节) 超出限制({max_bytes}字节),截断到 {max_content_size} 字节"
-            )
-            content = truncate_to_bytes(content, max_content_size)
+
+        if len(content.encode("utf-8")) > max_content_size:
+            # 仍超限(极端情况:单行过长),行边界截断
+            content = truncate_preserving_footer(content, max_content_size)
 
         result.append(header + content)
 

+ 217 - 47
trendradar/notification/splitter.py

@@ -11,6 +11,113 @@ from typing import Dict, List, Optional, Callable
 from trendradar.report.formatter import format_title_for_platform
 from trendradar.report.helpers import format_rank_display
 from trendradar.utils.time import DEFAULT_TIMEZONE, format_iso_time_friendly, convert_time_for_display
+from trendradar.notification.batch import truncate_at_line_boundary
+
+
+# === 分批安全辅助函数 ===
+
+def _split_content_by_lines(
+    content: str, footer: str, max_bytes: int, base_header: str
+) -> List[str]:
+    """将超长内容按行边界拆分成多个完整批次(每个批次带 footer)
+
+    不会丢弃任何内容,溢出部分自动分配到后续批次。
+
+    Args:
+        content: 正文内容(不含 footer,可能含 base_header)
+        footer: 尾部内容(更新时间等)
+        max_bytes: 单批次最大字节数
+        base_header: 后续批次的头部
+
+    Returns:
+        完整批次列表(每个元素 = 正文 + footer,大小 ≤ max_bytes)
+    """
+    footer_size = len(footer.encode("utf-8"))
+    result_batches = []
+    lines = content.split("\n")
+
+    current = ""
+    for line in lines:
+        candidate = current + line + "\n"
+        if len(candidate.encode("utf-8")) + footer_size > max_bytes and current.strip():
+            result_batches.append(current + footer)
+            current = base_header + line + "\n"
+        else:
+            current = candidate
+
+    if current.strip():
+        result_batches.append(current + footer)
+
+    return result_batches
+
+
+def _safe_append_batch(
+    batches: List[str], content: str, footer: str, max_bytes: int,
+    base_header: str = ""
+) -> None:
+    """安全追加批次,超限时按行拆分成多个批次(不丢弃内容)
+
+    Args:
+        batches: 批次列表(原地修改)
+        content: 正文内容(不含 footer)
+        footer: 尾部内容(更新时间等)
+        max_bytes: 最大字节数
+        base_header: 溢出时后续批次的头部
+    """
+    full = content + footer
+    if len(full.encode("utf-8")) <= max_bytes:
+        batches.append(full)
+        return
+
+    split_batches = _split_content_by_lines(content, footer, max_bytes, base_header)
+    if split_batches:
+        batches.extend(split_batches)
+    else:
+        # 极端情况:单行就超限,强制截断
+        batches.append(truncate_at_line_boundary(full, max_bytes))
+
+
+def _safe_new_batch(
+    new_content: str, footer: str, max_bytes: int, base_header: str,
+    batches: List[str] = None
+) -> str:
+    """安全创建新批次,超限时将溢出内容拆分到 batches 中,返回最后一段作为 current_batch
+
+    Args:
+        new_content: 新批次完整内容(含 base_header + section_header + ...)
+        footer: 尾部内容
+        max_bytes: 最大字节数
+        base_header: 基础头部
+        batches: 批次列表,溢出部分追加到此(可选)
+
+    Returns:
+        可安全继续追加内容的 current_batch(大小 + footer ≤ max_bytes)
+    """
+    if len((new_content + footer).encode("utf-8")) <= max_bytes:
+        return new_content
+
+    if batches is None:
+        # 无法拆分到 batches,退回行边界截断
+        footer_size = len(footer.encode("utf-8"))
+        available = max_bytes - footer_size
+        header_size = len(base_header.encode("utf-8"))
+        if available <= header_size:
+            return base_header
+        return truncate_at_line_boundary(new_content, available)
+
+    # 拆分:前面的部分存入 batches,最后一段作为 current_batch 返回
+    split_batches = _split_content_by_lines(new_content, footer, max_bytes, base_header)
+    if len(split_batches) <= 1:
+        # 无法再拆,直接返回(由后续 _safe_append_batch 兜底)
+        return new_content
+
+    # 前 N-1 个批次存入 batches
+    batches.extend(split_batches[:-1])
+    # 最后一个批次去掉 footer 作为 current_batch(后续还会追加内容)
+    last = split_batches[-1]
+    if last.endswith(footer):
+        return last[: -len(footer)]
+    return last
 
 
 # 默认批次大小配置
@@ -271,9 +378,10 @@ def split_content_into_batches(
             current_batch_has_content = True
         else:
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            # 新批次开头不需要分割线,使用原始 stats_header
-            current_batch = base_header + stats_header
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + stats_header, base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
 
         # 逐个处理词组(确保词组标题+第一条新闻的原子性)
@@ -389,10 +497,12 @@ def split_content_into_batches(
                 len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
                 >= max_bytes
             ):
-                # 当前批次容纳不下,开启新批次
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + stats_header + word_with_first_news
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + stats_header + word_with_first_news,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
                 start_index = 1
             else:
@@ -440,8 +550,11 @@ def split_content_into_batches(
                     >= max_bytes
                 ):
                     if current_batch_has_content:
-                        batches.append(current_batch + base_footer)
-                    current_batch = base_header + stats_header + word_header + news_line
+                        _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                    current_batch = _safe_new_batch(
+                        base_header + stats_header + word_header + news_line,
+                        base_footer, max_bytes, base_header, batches
+                    )
                     current_batch_has_content = True
                 else:
                     current_batch = test_content
@@ -517,8 +630,10 @@ def split_content_into_batches(
             >= max_bytes
         ):
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + new_header
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + new_header, base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
         else:
             current_batch = test_content
@@ -581,8 +696,11 @@ def split_content_into_batches(
                 >= max_bytes
             ):
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + new_header + source_with_first_news
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + new_header + source_with_first_news,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
                 start_index = 1
             else:
@@ -627,8 +745,11 @@ def split_content_into_batches(
                     >= max_bytes
                 ):
                     if current_batch_has_content:
-                        batches.append(current_batch + base_footer)
-                    current_batch = base_header + new_header + source_header + news_line
+                        _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                    current_batch = _safe_new_batch(
+                        base_header + new_header + source_header + news_line,
+                        base_footer, max_bytes, base_header, batches
+                    )
                     current_batch_has_content = True
                 else:
                     current_batch = test_content
@@ -668,13 +789,27 @@ def split_content_into_batches(
             current_batch = test_content
             current_batch_has_content = True
         else:
-            # 当前批次容纳不下,开启新批次
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            # AI 内容可能很长,需要考虑是否需要进一步分割
-            ai_with_header = base_header + ai_content
-            current_batch = ai_with_header
-            current_batch_has_content = True
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+
+            # AI 内容可能很长,按行拆分成多个批次
+            footer_size = len(base_footer.encode("utf-8"))
+            header_size = len(base_header.encode("utf-8"))
+            available = max_bytes - footer_size - header_size
+
+            ai_lines = ai_content.split("\n")
+            current_batch = base_header
+            current_batch_has_content = False
+
+            for line in ai_lines:
+                test_line = line + "\n" if not line.endswith("\n") else line
+                test_content = current_batch + test_line
+                if len(test_content.encode("utf-8")) + footer_size >= max_bytes and current_batch_has_content:
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                    current_batch = base_header + test_line
+                else:
+                    current_batch = test_content
+                current_batch_has_content = True
 
         return current_batch, current_batch_has_content, batches
 
@@ -789,8 +924,10 @@ def split_content_into_batches(
             >= max_bytes
         ):
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + failed_header
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + failed_header, base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
         else:
             current_batch = test_content
@@ -810,8 +947,11 @@ def split_content_into_batches(
                 >= max_bytes
             ):
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + failed_header + failed_line
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + failed_header + failed_line,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
             else:
                 current_batch = test_content
@@ -819,7 +959,7 @@ def split_content_into_batches(
 
     # 完成最后批次
     if current_batch_has_content:
-        batches.append(current_batch + base_footer)
+        _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
 
     return batches
 
@@ -899,8 +1039,10 @@ def _process_rss_stats_section(
         current_batch_has_content = True
     else:
         if current_batch_has_content:
-            batches.append(current_batch + base_footer)
-        current_batch = base_header + rss_header
+            _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+        current_batch = _safe_new_batch(
+            base_header + rss_header, base_footer, max_bytes, base_header, batches
+        )
         current_batch_has_content = True
 
     # 逐个处理关键词组(与热榜一致)
@@ -983,8 +1125,11 @@ def _process_rss_stats_section(
 
         if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + rss_header + word_with_first_news
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + rss_header + word_with_first_news,
+                base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
             start_index = 1
         else:
@@ -1017,8 +1162,11 @@ def _process_rss_stats_section(
             test_content = current_batch + news_line
             if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + rss_header + word_header + news_line
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + rss_header + word_header + news_line,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
             else:
                 current_batch = test_content
@@ -1132,8 +1280,10 @@ def _process_rss_new_titles_section(
     test_content = current_batch + new_header
     if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
         if current_batch_has_content:
-            batches.append(current_batch + base_footer)
-        current_batch = base_header + new_header
+            _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+        current_batch = _safe_new_batch(
+            base_header + new_header, base_footer, max_bytes, base_header, batches
+        )
         current_batch_has_content = True
     else:
         current_batch = test_content
@@ -1187,8 +1337,11 @@ def _process_rss_new_titles_section(
 
         if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + new_header + source_with_first_news
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + new_header + source_with_first_news,
+                base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
             start_index = 1
         else:
@@ -1220,8 +1373,11 @@ def _process_rss_new_titles_section(
             test_content = current_batch + news_line
             if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + new_header + source_header + news_line
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + new_header + source_header + news_line,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
             else:
                 current_batch = test_content
@@ -1377,8 +1533,10 @@ def _process_standalone_section(
         current_batch_has_content = True
     else:
         if current_batch_has_content:
-            batches.append(current_batch + base_footer)
-        current_batch = base_header + section_header
+            _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+        current_batch = _safe_new_batch(
+            base_header + section_header, base_footer, max_bytes, base_header, batches
+        )
         current_batch_has_content = True
 
     # 处理热榜平台
@@ -1414,8 +1572,11 @@ def _process_standalone_section(
 
         if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + section_header + platform_with_first
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + section_header + platform_with_first,
+                base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
             start_index = 1
         else:
@@ -1430,8 +1591,11 @@ def _process_standalone_section(
             test_content = current_batch + item_line
             if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + section_header + platform_header + item_line
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + section_header + platform_header + item_line,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
             else:
                 current_batch = test_content
@@ -1472,8 +1636,11 @@ def _process_standalone_section(
 
         if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
             if current_batch_has_content:
-                batches.append(current_batch + base_footer)
-            current_batch = base_header + section_header + feed_with_first
+                _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+            current_batch = _safe_new_batch(
+                base_header + section_header + feed_with_first,
+                base_footer, max_bytes, base_header, batches
+            )
             current_batch_has_content = True
             start_index = 1
         else:
@@ -1488,8 +1655,11 @@ def _process_standalone_section(
             test_content = current_batch + item_line
             if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
                 if current_batch_has_content:
-                    batches.append(current_batch + base_footer)
-                current_batch = base_header + section_header + feed_header + item_line
+                    _safe_append_batch(batches, current_batch, base_footer, max_bytes, base_header)
+                current_batch = _safe_new_batch(
+                    base_header + section_header + feed_header + item_line,
+                    base_footer, max_bytes, base_header, batches
+                )
                 current_batch_has_content = True
             else:
                 current_batch = test_content