| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052 |
- # coding=utf-8
- """
- 消息分批处理模块
- 提供消息内容分批拆分功能,确保消息大小不超过各平台限制
- """
- from datetime import datetime
- from typing import Dict, List, Optional, Callable
- from trendradar.report.formatter import format_title_for_platform
- from trendradar.utils.time import format_iso_time_friendly
- # 默认批次大小配置
- DEFAULT_BATCH_SIZES = {
- "dingtalk": 20000,
- "feishu": 29000,
- "ntfy": 3800,
- "default": 4000,
- }
- def split_content_into_batches(
- report_data: Dict,
- format_type: str,
- update_info: Optional[Dict] = None,
- max_bytes: Optional[int] = None,
- mode: str = "daily",
- batch_sizes: Optional[Dict[str, int]] = None,
- feishu_separator: str = "---",
- reverse_content_order: bool = False,
- get_time_func: Optional[Callable[[], datetime]] = None,
- rss_items: Optional[list] = None,
- rss_new_items: Optional[list] = None,
- timezone: str = "Asia/Shanghai",
- display_mode: str = "keyword",
- ) -> List[str]:
- """分批处理消息内容,确保词组标题+至少第一条新闻的完整性(支持热榜+RSS合并)
- 热榜统计与RSS统计并列显示,热榜新增与RSS新增并列显示。
- reverse_content_order 控制统计和新增的前后顺序。
- Args:
- report_data: 报告数据字典,包含 stats, new_titles, failed_ids, total_new_count
- format_type: 格式类型 (feishu, dingtalk, wework, telegram, ntfy, bark, slack)
- update_info: 版本更新信息(可选)
- max_bytes: 最大字节数(可选,如果不指定则使用默认配置)
- mode: 报告模式 (daily, incremental, current)
- batch_sizes: 批次大小配置字典(可选)
- feishu_separator: 飞书消息分隔符
- reverse_content_order: 是否反转内容顺序(新增在前,统计在后)
- get_time_func: 获取当前时间的函数(可选)
- rss_items: RSS 统计条目列表(按源分组,用于合并推送)
- rss_new_items: RSS 新增条目列表(可选,用于新增区块)
- timezone: 时区名称(用于 RSS 时间格式化)
- display_mode: 显示模式 (keyword=按关键词分组, platform=按平台分组)
- Returns:
- 分批后的消息内容列表
- """
- # 合并批次大小配置
- sizes = {**DEFAULT_BATCH_SIZES, **(batch_sizes or {})}
- if max_bytes is None:
- if format_type == "dingtalk":
- max_bytes = sizes.get("dingtalk", 20000)
- elif format_type == "feishu":
- max_bytes = sizes.get("feishu", 29000)
- elif format_type == "ntfy":
- max_bytes = sizes.get("ntfy", 3800)
- else:
- max_bytes = sizes.get("default", 4000)
- batches = []
- total_titles = sum(
- len(stat["titles"]) for stat in report_data["stats"] if stat["count"] > 0
- )
- now = get_time_func() if get_time_func else datetime.now()
- base_header = ""
- if format_type in ("wework", "bark"):
- base_header = f"**总新闻数:** {total_titles}\n\n\n\n"
- elif format_type == "telegram":
- base_header = f"总新闻数: {total_titles}\n\n"
- elif format_type == "ntfy":
- base_header = f"**总新闻数:** {total_titles}\n\n"
- elif format_type == "feishu":
- base_header = ""
- elif format_type == "dingtalk":
- base_header = f"**总新闻数:** {total_titles}\n\n"
- base_header += f"**时间:** {now.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
- base_header += f"**类型:** 热点分析报告\n\n"
- base_header += "---\n\n"
- elif format_type == "slack":
- base_header = f"*总新闻数:* {total_titles}\n\n"
- base_footer = ""
- if format_type in ("wework", "bark"):
- base_footer = f"\n\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- elif format_type == "telegram":
- base_footer = f"\n\n更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\nTrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}"
- elif format_type == "ntfy":
- base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- elif format_type == "feishu":
- base_footer = f"\n\n<font color='grey'>更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}</font>"
- if update_info:
- base_footer += f"\n<font color='grey'>TrendRadar 发现新版本 {update_info['remote_version']},当前 {update_info['current_version']}</font>"
- elif format_type == "dingtalk":
- base_footer = f"\n\n> 更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}"
- if update_info:
- base_footer += f"\n> TrendRadar 发现新版本 **{update_info['remote_version']}**,当前 **{update_info['current_version']}**"
- elif format_type == "slack":
- base_footer = f"\n\n_更新时间:{now.strftime('%Y-%m-%d %H:%M:%S')}_"
- if update_info:
- base_footer += f"\n_TrendRadar 发现新版本 *{update_info['remote_version']}*,当前 *{update_info['current_version']}_"
- # 根据 display_mode 选择统计标题
- stats_title = "热点词汇统计" if display_mode == "keyword" else "热点新闻统计"
- stats_header = ""
- if report_data["stats"]:
- if format_type in ("wework", "bark"):
- stats_header = f"📊 **{stats_title}**\n\n"
- elif format_type == "telegram":
- stats_header = f"📊 {stats_title}\n\n"
- elif format_type == "ntfy":
- stats_header = f"📊 **{stats_title}**\n\n"
- elif format_type == "feishu":
- stats_header = f"📊 **{stats_title}**\n\n"
- elif format_type == "dingtalk":
- stats_header = f"📊 **{stats_title}**\n\n"
- elif format_type == "slack":
- stats_header = f"📊 *{stats_title}*\n\n"
- current_batch = base_header
- current_batch_has_content = False
- if (
- not report_data["stats"]
- and not report_data["new_titles"]
- and not report_data["failed_ids"]
- ):
- if mode == "incremental":
- mode_text = "增量模式下暂无新增匹配的热点词汇"
- elif mode == "current":
- mode_text = "当前榜单模式下暂无匹配的热点词汇"
- else:
- mode_text = "暂无匹配的热点词汇"
- simple_content = f"📭 {mode_text}\n\n"
- final_content = base_header + simple_content + base_footer
- batches.append(final_content)
- return batches
- # 定义处理热点词汇统计的函数
- def process_stats_section(current_batch, current_batch_has_content, batches):
- """处理热点词汇统计"""
- if not report_data["stats"]:
- return current_batch, current_batch_has_content, batches
- total_count = len(report_data["stats"])
- # 添加统计标题
- test_content = current_batch + stats_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- < max_bytes
- ):
- current_batch = test_content
- current_batch_has_content = True
- else:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header
- current_batch_has_content = True
- # 逐个处理词组(确保词组标题+第一条新闻的原子性)
- for i, stat in enumerate(report_data["stats"]):
- word = stat["word"]
- count = stat["count"]
- sequence_display = f"[{i + 1}/{total_count}]"
- # 构建词组标题
- word_header = ""
- if format_type in ("wework", "bark"):
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "telegram":
- if count >= 10:
- word_header = f"🔥 {sequence_display} {word} : {count} 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} {word} : {count} 条\n\n"
- else:
- word_header = f"📌 {sequence_display} {word} : {count} 条\n\n"
- elif format_type == "ntfy":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "feishu":
- if count >= 10:
- word_header = f"🔥 <font color='grey'>{sequence_display}</font> **{word}** : <font color='red'>{count}</font> 条\n\n"
- elif count >= 5:
- word_header = f"📈 <font color='grey'>{sequence_display}</font> **{word}** : <font color='orange'>{count}</font> 条\n\n"
- else:
- word_header = f"📌 <font color='grey'>{sequence_display}</font> **{word}** : {count} 条\n\n"
- elif format_type == "dingtalk":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "slack":
- if count >= 10:
- word_header = (
- f"🔥 {sequence_display} *{word}* : *{count}* 条\n\n"
- )
- elif count >= 5:
- word_header = (
- f"📈 {sequence_display} *{word}* : *{count}* 条\n\n"
- )
- else:
- word_header = f"📌 {sequence_display} *{word}* : {count} 条\n\n"
- # 构建第一条新闻
- # display_mode: keyword=显示来源, platform=显示关键词
- show_source = display_mode == "keyword"
- show_keyword = display_mode == "platform"
- first_news_line = ""
- if stat["titles"]:
- first_title_data = stat["titles"][0]
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform(
- "wework", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform(
- "ntfy", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "slack":
- formatted_title = format_title_for_platform(
- "slack", first_title_data, show_source=show_source, show_keyword=show_keyword
- )
- else:
- formatted_title = f"{first_title_data['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- if len(stat["titles"]) > 1:
- first_news_line += "\n"
- # 原子性检查:词组标题+第一条新闻必须一起处理
- word_with_first_news = word_header + first_news_line
- test_content = current_batch + word_with_first_news
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- # 当前批次容纳不下,开启新批次
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header + word_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新闻条目
- for j in range(start_index, len(stat["titles"])):
- title_data = stat["titles"][j]
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform(
- "wework", title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform(
- "ntfy", title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data, show_source=show_source, show_keyword=show_keyword
- )
- elif format_type == "slack":
- formatted_title = format_title_for_platform(
- "slack", title_data, show_source=show_source, show_keyword=show_keyword
- )
- else:
- formatted_title = f"{title_data['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- if j < len(stat["titles"]) - 1:
- news_line += "\n"
- test_content = current_batch + news_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + stats_header + word_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 词组间分隔符
- if i < len(report_data["stats"]) - 1:
- separator = ""
- if format_type in ("wework", "bark"):
- separator = f"\n\n\n\n"
- elif format_type == "telegram":
- separator = f"\n\n"
- elif format_type == "ntfy":
- separator = f"\n\n"
- elif format_type == "feishu":
- separator = f"\n{feishu_separator}\n\n"
- elif format_type == "dingtalk":
- separator = f"\n---\n\n"
- elif format_type == "slack":
- separator = f"\n\n"
- test_content = current_batch + separator
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- < max_bytes
- ):
- current_batch = test_content
- return current_batch, current_batch_has_content, batches
- # 定义处理新增新闻的函数
- def process_new_titles_section(current_batch, current_batch_has_content, batches):
- """处理新增新闻"""
- if not report_data["new_titles"]:
- return current_batch, current_batch_has_content, batches
- new_header = ""
- if format_type in ("wework", "bark"):
- new_header = f"\n\n\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "telegram":
- new_header = (
- f"\n\n🆕 本次新增热点新闻 (共 {report_data['total_new_count']} 条)\n\n"
- )
- elif format_type == "ntfy":
- new_header = f"\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "feishu":
- new_header = f"\n{feishu_separator}\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "dingtalk":
- new_header = f"\n---\n\n🆕 **本次新增热点新闻** (共 {report_data['total_new_count']} 条)\n\n"
- elif format_type == "slack":
- new_header = f"\n\n🆕 *本次新增热点新闻* (共 {report_data['total_new_count']} 条)\n\n"
- test_content = current_batch + new_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 逐个处理新增新闻来源
- for source_data in report_data["new_titles"]:
- source_header = ""
- if format_type in ("wework", "bark"):
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "telegram":
- source_header = f"{source_data['source_name']} ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "ntfy":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "feishu":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "dingtalk":
- source_header = f"**{source_data['source_name']}** ({len(source_data['titles'])} 条):\n\n"
- elif format_type == "slack":
- source_header = f"*{source_data['source_name']}* ({len(source_data['titles'])} 条):\n\n"
- # 构建第一条新增新闻
- first_news_line = ""
- if source_data["titles"]:
- first_title_data = source_data["titles"][0]
- title_data_copy = first_title_data.copy()
- title_data_copy["is_new"] = False
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform(
- "wework", title_data_copy, show_source=False
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data_copy, show_source=False
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data_copy, show_source=False
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data_copy, show_source=False
- )
- elif format_type == "slack":
- formatted_title = format_title_for_platform(
- "slack", title_data_copy, show_source=False
- )
- else:
- formatted_title = f"{title_data_copy['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- # 原子性检查:来源标题+第一条新闻
- source_with_first_news = source_header + first_news_line
- test_content = current_batch + source_with_first_news
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新增新闻
- for j in range(start_index, len(source_data["titles"])):
- title_data = source_data["titles"][j]
- title_data_copy = title_data.copy()
- title_data_copy["is_new"] = False
- if format_type == "wework":
- formatted_title = format_title_for_platform(
- "wework", title_data_copy, show_source=False
- )
- elif format_type == "telegram":
- formatted_title = format_title_for_platform(
- "telegram", title_data_copy, show_source=False
- )
- elif format_type == "feishu":
- formatted_title = format_title_for_platform(
- "feishu", title_data_copy, show_source=False
- )
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform(
- "dingtalk", title_data_copy, show_source=False
- )
- elif format_type == "slack":
- formatted_title = format_title_for_platform(
- "slack", title_data_copy, show_source=False
- )
- else:
- formatted_title = f"{title_data_copy['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- test_content = current_batch + news_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- current_batch += "\n"
- return current_batch, current_batch_has_content, batches
- # 根据配置决定处理顺序
- if reverse_content_order:
- # 新增热点在前,热点词汇统计在后
- # 1. 处理热榜新增
- current_batch, current_batch_has_content, batches = process_new_titles_section(
- current_batch, current_batch_has_content, batches
- )
- # 2. 处理 RSS 新增(如果有)
- if rss_new_items:
- current_batch, current_batch_has_content, batches = _process_rss_new_titles_section(
- rss_new_items, format_type, feishu_separator, base_header, base_footer,
- max_bytes, current_batch, current_batch_has_content, batches, timezone
- )
- # 3. 处理热榜统计
- current_batch, current_batch_has_content, batches = process_stats_section(
- current_batch, current_batch_has_content, batches
- )
- # 4. 处理 RSS 统计(如果有)
- if rss_items:
- current_batch, current_batch_has_content, batches = _process_rss_stats_section(
- rss_items, format_type, feishu_separator, base_header, base_footer,
- max_bytes, current_batch, current_batch_has_content, batches, timezone
- )
- else:
- # 默认:热点词汇统计在前,新增热点在后
- # 1. 处理热榜统计
- current_batch, current_batch_has_content, batches = process_stats_section(
- current_batch, current_batch_has_content, batches
- )
- # 2. 处理 RSS 统计(如果有)
- if rss_items:
- current_batch, current_batch_has_content, batches = _process_rss_stats_section(
- rss_items, format_type, feishu_separator, base_header, base_footer,
- max_bytes, current_batch, current_batch_has_content, batches, timezone
- )
- # 3. 处理热榜新增
- current_batch, current_batch_has_content, batches = process_new_titles_section(
- current_batch, current_batch_has_content, batches
- )
- # 4. 处理 RSS 新增(如果有)
- if rss_new_items:
- current_batch, current_batch_has_content, batches = _process_rss_new_titles_section(
- rss_new_items, format_type, feishu_separator, base_header, base_footer,
- max_bytes, current_batch, current_batch_has_content, batches, timezone
- )
- if report_data["failed_ids"]:
- failed_header = ""
- if format_type == "wework":
- failed_header = f"\n\n\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "telegram":
- failed_header = f"\n\n⚠️ 数据获取失败的平台:\n\n"
- elif format_type == "ntfy":
- failed_header = f"\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "feishu":
- failed_header = f"\n{feishu_separator}\n\n⚠️ **数据获取失败的平台:**\n\n"
- elif format_type == "dingtalk":
- failed_header = f"\n---\n\n⚠️ **数据获取失败的平台:**\n\n"
- test_content = current_batch + failed_header
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + failed_header
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- for i, id_value in enumerate(report_data["failed_ids"], 1):
- if format_type == "feishu":
- failed_line = f" • <font color='red'>{id_value}</font>\n"
- elif format_type == "dingtalk":
- failed_line = f" • **{id_value}**\n"
- else:
- failed_line = f" • {id_value}\n"
- test_content = current_batch + failed_line
- if (
- len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8"))
- >= max_bytes
- ):
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + failed_header + failed_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 完成最后批次
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- return batches
- def _process_rss_stats_section(
- rss_stats: list,
- format_type: str,
- feishu_separator: str,
- base_header: str,
- base_footer: str,
- max_bytes: int,
- current_batch: str,
- current_batch_has_content: bool,
- batches: List[str],
- timezone: str = "Asia/Shanghai",
- ) -> tuple:
- """处理 RSS 统计区块(按关键词分组,与热榜统计格式一致)
- Args:
- rss_stats: RSS 关键词统计列表,格式与热榜 stats 一致:
- [{"word": "AI", "count": 5, "titles": [...]}]
- format_type: 格式类型
- feishu_separator: 飞书分隔符
- base_header: 基础头部
- base_footer: 基础尾部
- max_bytes: 最大字节数
- current_batch: 当前批次内容
- current_batch_has_content: 当前批次是否有内容
- batches: 已完成的批次列表
- timezone: 时区名称
- Returns:
- (current_batch, current_batch_has_content, batches) 元组
- """
- if not rss_stats:
- return current_batch, current_batch_has_content, batches
- # 计算总条目数
- total_items = sum(stat["count"] for stat in rss_stats)
- total_keywords = len(rss_stats)
- # RSS 统计区块标题
- rss_header = ""
- if format_type == "feishu":
- rss_header = f"\n{feishu_separator}\n\n📰 **RSS 订阅统计** (共 {total_items} 条)\n\n"
- elif format_type == "dingtalk":
- rss_header = f"\n---\n\n📰 **RSS 订阅统计** (共 {total_items} 条)\n\n"
- elif format_type == "telegram":
- rss_header = f"\n\n📰 RSS 订阅统计 (共 {total_items} 条)\n\n"
- elif format_type == "slack":
- rss_header = f"\n\n📰 *RSS 订阅统计* (共 {total_items} 条)\n\n"
- else:
- rss_header = f"\n\n📰 **RSS 订阅统计** (共 {total_items} 条)\n\n"
- # 添加 RSS 标题
- test_content = current_batch + rss_header
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) < max_bytes:
- current_batch = test_content
- current_batch_has_content = True
- else:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + rss_header
- current_batch_has_content = True
- # 逐个处理关键词组(与热榜一致)
- for i, stat in enumerate(rss_stats):
- word = stat["word"]
- count = stat["count"]
- sequence_display = f"[{i + 1}/{total_keywords}]"
- # 构建关键词标题(与热榜格式一致)
- word_header = ""
- if format_type in ("wework", "bark"):
- if count >= 10:
- word_header = f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "telegram":
- if count >= 10:
- word_header = f"🔥 {sequence_display} {word} : {count} 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} {word} : {count} 条\n\n"
- else:
- word_header = f"📌 {sequence_display} {word} : {count} 条\n\n"
- elif format_type == "ntfy":
- if count >= 10:
- word_header = f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "feishu":
- if count >= 10:
- word_header = f"🔥 <font color='grey'>{sequence_display}</font> **{word}** : <font color='red'>{count}</font> 条\n\n"
- elif count >= 5:
- word_header = f"📈 <font color='grey'>{sequence_display}</font> **{word}** : <font color='orange'>{count}</font> 条\n\n"
- else:
- word_header = f"📌 <font color='grey'>{sequence_display}</font> **{word}** : {count} 条\n\n"
- elif format_type == "dingtalk":
- if count >= 10:
- word_header = f"🔥 {sequence_display} **{word}** : **{count}** 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} **{word}** : **{count}** 条\n\n"
- else:
- word_header = f"📌 {sequence_display} **{word}** : {count} 条\n\n"
- elif format_type == "slack":
- if count >= 10:
- word_header = f"🔥 {sequence_display} *{word}* : *{count}* 条\n\n"
- elif count >= 5:
- word_header = f"📈 {sequence_display} *{word}* : *{count}* 条\n\n"
- else:
- word_header = f"📌 {sequence_display} *{word}* : {count} 条\n\n"
- # 构建第一条新闻(使用 format_title_for_platform)
- first_news_line = ""
- if stat["titles"]:
- first_title_data = stat["titles"][0]
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform("wework", first_title_data, show_source=True)
- elif format_type == "telegram":
- formatted_title = format_title_for_platform("telegram", first_title_data, show_source=True)
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform("ntfy", first_title_data, show_source=True)
- elif format_type == "feishu":
- formatted_title = format_title_for_platform("feishu", first_title_data, show_source=True)
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform("dingtalk", first_title_data, show_source=True)
- elif format_type == "slack":
- formatted_title = format_title_for_platform("slack", first_title_data, show_source=True)
- else:
- formatted_title = f"{first_title_data['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- if len(stat["titles"]) > 1:
- first_news_line += "\n"
- # 原子性检查:关键词标题 + 第一条新闻必须一起处理
- word_with_first_news = word_header + first_news_line
- test_content = current_batch + word_with_first_news
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + rss_header + word_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新闻条目
- for j in range(start_index, len(stat["titles"])):
- title_data = stat["titles"][j]
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform("wework", title_data, show_source=True)
- elif format_type == "telegram":
- formatted_title = format_title_for_platform("telegram", title_data, show_source=True)
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform("ntfy", title_data, show_source=True)
- elif format_type == "feishu":
- formatted_title = format_title_for_platform("feishu", title_data, show_source=True)
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform("dingtalk", title_data, show_source=True)
- elif format_type == "slack":
- formatted_title = format_title_for_platform("slack", title_data, show_source=True)
- else:
- formatted_title = f"{title_data['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- if j < len(stat["titles"]) - 1:
- news_line += "\n"
- test_content = current_batch + news_line
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + rss_header + word_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 关键词间分隔符
- if i < len(rss_stats) - 1:
- separator = ""
- if format_type in ("wework", "bark"):
- separator = "\n\n\n\n"
- elif format_type == "telegram":
- separator = "\n\n"
- elif format_type == "ntfy":
- separator = "\n\n"
- elif format_type == "feishu":
- separator = f"\n{feishu_separator}\n\n"
- elif format_type == "dingtalk":
- separator = "\n---\n\n"
- elif format_type == "slack":
- separator = "\n\n"
- test_content = current_batch + separator
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) < max_bytes:
- current_batch = test_content
- return current_batch, current_batch_has_content, batches
- def _process_rss_new_titles_section(
- rss_new_stats: list,
- format_type: str,
- feishu_separator: str,
- base_header: str,
- base_footer: str,
- max_bytes: int,
- current_batch: str,
- current_batch_has_content: bool,
- batches: List[str],
- timezone: str = "Asia/Shanghai",
- ) -> tuple:
- """处理 RSS 新增区块(按来源分组,与热榜新增格式一致)
- Args:
- rss_new_stats: RSS 新增关键词统计列表,格式与热榜 stats 一致:
- [{"word": "AI", "count": 5, "titles": [...]}]
- format_type: 格式类型
- feishu_separator: 飞书分隔符
- base_header: 基础头部
- base_footer: 基础尾部
- max_bytes: 最大字节数
- current_batch: 当前批次内容
- current_batch_has_content: 当前批次是否有内容
- batches: 已完成的批次列表
- timezone: 时区名称
- Returns:
- (current_batch, current_batch_has_content, batches) 元组
- """
- if not rss_new_stats:
- return current_batch, current_batch_has_content, batches
- # 从关键词分组中提取所有条目,重新按来源分组
- source_map = {}
- for stat in rss_new_stats:
- for title_data in stat.get("titles", []):
- source_name = title_data.get("source_name", "未知来源")
- if source_name not in source_map:
- source_map[source_name] = []
- source_map[source_name].append(title_data)
- if not source_map:
- return current_batch, current_batch_has_content, batches
- # 计算总条目数
- total_items = sum(len(titles) for titles in source_map.values())
- # RSS 新增区块标题
- new_header = ""
- if format_type in ("wework", "bark"):
- new_header = f"\n\n\n\n🆕 **RSS 本次新增** (共 {total_items} 条)\n\n"
- elif format_type == "telegram":
- new_header = f"\n\n🆕 RSS 本次新增 (共 {total_items} 条)\n\n"
- elif format_type == "ntfy":
- new_header = f"\n\n🆕 **RSS 本次新增** (共 {total_items} 条)\n\n"
- elif format_type == "feishu":
- new_header = f"\n{feishu_separator}\n\n🆕 **RSS 本次新增** (共 {total_items} 条)\n\n"
- elif format_type == "dingtalk":
- new_header = f"\n---\n\n🆕 **RSS 本次新增** (共 {total_items} 条)\n\n"
- elif format_type == "slack":
- new_header = f"\n\n🆕 *RSS 本次新增* (共 {total_items} 条)\n\n"
- # 添加 RSS 新增标题
- test_content = current_batch + new_header
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 按来源分组显示(与热榜新增格式一致)
- source_list = list(source_map.items())
- for i, (source_name, titles) in enumerate(source_list):
- count = len(titles)
- # 构建来源标题(与热榜新增格式一致)
- source_header = ""
- if format_type in ("wework", "bark"):
- source_header = f"**{source_name}** ({count} 条):\n\n"
- elif format_type == "telegram":
- source_header = f"{source_name} ({count} 条):\n\n"
- elif format_type == "ntfy":
- source_header = f"**{source_name}** ({count} 条):\n\n"
- elif format_type == "feishu":
- source_header = f"**{source_name}** ({count} 条):\n\n"
- elif format_type == "dingtalk":
- source_header = f"**{source_name}** ({count} 条):\n\n"
- elif format_type == "slack":
- source_header = f"*{source_name}* ({count} 条):\n\n"
- # 构建第一条新闻(不显示来源,禁用 new emoji)
- first_news_line = ""
- if titles:
- first_title_data = titles[0].copy()
- first_title_data["is_new"] = False
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform("wework", first_title_data, show_source=False)
- elif format_type == "telegram":
- formatted_title = format_title_for_platform("telegram", first_title_data, show_source=False)
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform("ntfy", first_title_data, show_source=False)
- elif format_type == "feishu":
- formatted_title = format_title_for_platform("feishu", first_title_data, show_source=False)
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform("dingtalk", first_title_data, show_source=False)
- elif format_type == "slack":
- formatted_title = format_title_for_platform("slack", first_title_data, show_source=False)
- else:
- formatted_title = f"{first_title_data['title']}"
- first_news_line = f" 1. {formatted_title}\n"
- # 原子性检查:来源标题 + 第一条新闻必须一起处理
- source_with_first_news = source_header + first_news_line
- test_content = current_batch + source_with_first_news
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_with_first_news
- current_batch_has_content = True
- start_index = 1
- else:
- current_batch = test_content
- current_batch_has_content = True
- start_index = 1
- # 处理剩余新闻条目(禁用 new emoji)
- for j in range(start_index, len(titles)):
- title_data = titles[j].copy()
- title_data["is_new"] = False
- if format_type in ("wework", "bark"):
- formatted_title = format_title_for_platform("wework", title_data, show_source=False)
- elif format_type == "telegram":
- formatted_title = format_title_for_platform("telegram", title_data, show_source=False)
- elif format_type == "ntfy":
- formatted_title = format_title_for_platform("ntfy", title_data, show_source=False)
- elif format_type == "feishu":
- formatted_title = format_title_for_platform("feishu", title_data, show_source=False)
- elif format_type == "dingtalk":
- formatted_title = format_title_for_platform("dingtalk", title_data, show_source=False)
- elif format_type == "slack":
- formatted_title = format_title_for_platform("slack", title_data, show_source=False)
- else:
- formatted_title = f"{title_data['title']}"
- news_line = f" {j + 1}. {formatted_title}\n"
- test_content = current_batch + news_line
- if len(test_content.encode("utf-8")) + len(base_footer.encode("utf-8")) >= max_bytes:
- if current_batch_has_content:
- batches.append(current_batch + base_footer)
- current_batch = base_header + new_header + source_header + news_line
- current_batch_has_content = True
- else:
- current_batch = test_content
- current_batch_has_content = True
- # 来源间添加空行(与热榜新增格式一致)
- current_batch += "\n"
- return current_batch, current_batch_has_content, batches
- def _format_rss_item_line(
- item: Dict,
- index: int,
- format_type: str,
- timezone: str = "Asia/Shanghai",
- ) -> str:
- """格式化单条 RSS 条目
- Args:
- item: RSS 条目字典
- index: 序号
- format_type: 格式类型
- timezone: 时区名称
- Returns:
- 格式化后的条目行字符串
- """
- title = item.get("title", "")
- url = item.get("url", "")
- published_at = item.get("published_at", "")
- # 使用友好时间格式
- if published_at:
- friendly_time = format_iso_time_friendly(published_at, timezone, include_date=True)
- else:
- friendly_time = ""
- # 构建条目行
- if format_type == "feishu":
- if url:
- item_line = f" {index}. [{title}]({url})"
- else:
- item_line = f" {index}. {title}"
- if friendly_time:
- item_line += f" <font color='grey'>- {friendly_time}</font>"
- elif format_type == "telegram":
- if url:
- item_line = f" {index}. {title} ({url})"
- else:
- item_line = f" {index}. {title}"
- if friendly_time:
- item_line += f" - {friendly_time}"
- else:
- if url:
- item_line = f" {index}. [{title}]({url})"
- else:
- item_line = f" {index}. {title}"
- if friendly_time:
- item_line += f" `{friendly_time}`"
- item_line += "\n"
- return item_line
|