# coding=utf-8 """ HTML 报告渲染模块 提供 HTML 格式的热点新闻报告生成功能 """ from datetime import datetime from typing import Any, Dict, List, Optional, Callable from trendradar.report.helpers import html_escape from trendradar.utils.time import convert_time_for_display from trendradar.ai.formatter import render_ai_analysis_html_rich def render_html_content( report_data: Dict, total_titles: int, mode: str = "daily", update_info: Optional[Dict] = None, *, region_order: Optional[List[str]] = None, get_time_func: Optional[Callable[[], datetime]] = None, rss_items: Optional[List[Dict]] = None, rss_new_items: Optional[List[Dict]] = None, display_mode: str = "keyword", standalone_data: Optional[Dict] = None, ai_analysis: Optional[Any] = None, show_new_section: bool = True, ) -> str: """渲染HTML内容 Args: report_data: 报告数据字典,包含 stats, new_titles, failed_ids, total_new_count total_titles: 新闻总数 mode: 报告模式 ("daily", "current", "incremental") update_info: 更新信息(可选) region_order: 区域显示顺序列表 get_time_func: 获取当前时间的函数(可选,默认使用 datetime.now) rss_items: RSS 统计条目列表(可选) rss_new_items: RSS 新增条目列表(可选) display_mode: 显示模式 ("keyword"=按关键词分组, "platform"=按平台分组) standalone_data: 独立展示区数据(可选),包含 platforms 和 rss_feeds ai_analysis: AI 分析结果对象(可选),AIAnalysisResult 实例 show_new_section: 是否显示新增热点区域 Returns: 渲染后的 HTML 字符串 """ # 默认区域顺序 default_region_order = ["hotlist", "rss", "new_items", "standalone", "ai_analysis"] if region_order is None: region_order = default_region_order html = """ 热点新闻分析
TrendRadar
热点新闻分析
报告类型 """ # 处理报告类型显示(根据 mode 直接显示) if mode == "current": html += "当前榜单" elif mode == "incremental": html += "增量分析" else: html += "全天汇总" html += """
新闻总数 """ html += f"{total_titles} 条" # 计算筛选后的热点新闻数量 hot_news_count = sum(len(stat["titles"]) for stat in report_data["stats"]) html += """
热点新闻 """ html += f"{hot_news_count} 条" html += """
生成时间 """ # 使用提供的时间函数或默认 datetime.now if get_time_func: now = get_time_func() else: now = datetime.now() html += now.strftime("%m-%d %H:%M") html += """
""" # 处理失败ID错误信息 if report_data["failed_ids"]: html += """
⚠️ 请求失败的平台
    """ for id_value in report_data["failed_ids"]: html += f'
  • {html_escape(id_value)}
  • ' html += """
""" # 生成热点词汇统计部分的HTML stats_html = "" tab_bar_html = "" if report_data["stats"]: total_count = len(report_data["stats"]) # 生成 Tab 栏 HTML tab_bar_html = '
' for tab_i, tab_stat in enumerate(report_data["stats"]): escaped_tab_word = html_escape(tab_stat["word"]) tab_count = tab_stat["count"] tab_bar_html += f'' tab_bar_html += '' tab_bar_html += '
' for i, stat in enumerate(report_data["stats"], 1): count = stat["count"] # 确定热度等级 if count >= 10: count_class = "hot" elif count >= 5: count_class = "warm" else: count_class = "" escaped_word = html_escape(stat["word"]) stats_html += f"""
{escaped_word}
{count} 条
{i}/{total_count}
""" # 处理每个词组下的新闻标题,给每条新闻标上序号 for j, title_data in enumerate(stat["titles"], 1): is_new = title_data.get("is_new", False) new_class = "new" if is_new else "" stats_html += f"""
{j}
""" # 根据 display_mode 决定显示来源还是关键词 if display_mode == "keyword": # keyword 模式:显示来源 stats_html += f'{html_escape(title_data["source_name"])}' else: # platform 模式:显示关键词 matched_keyword = title_data.get("matched_keyword", "") if matched_keyword: stats_html += f'[{html_escape(matched_keyword)}]' # 处理排名显示 ranks = title_data.get("ranks", []) if ranks: min_rank = min(ranks) max_rank = max(ranks) rank_threshold = title_data.get("rank_threshold", 10) # 确定排名等级 if min_rank <= 3: rank_class = "top" elif min_rank <= rank_threshold: rank_class = "high" else: rank_class = "" if min_rank == max_rank: rank_text = str(min_rank) else: rank_text = f"{min_rank}-{max_rank}" stats_html += f'{rank_text}' # 处理时间显示 time_display = title_data.get("time_display", "") if time_display: # 简化时间显示格式,将波浪线替换为~ simplified_time = ( time_display.replace(" ~ ", "~") .replace("[", "") .replace("]", "") ) stats_html += ( f'{html_escape(simplified_time)}' ) # 处理出现次数 count_info = title_data.get("count", 1) if count_info > 1: stats_html += f'{count_info}次' stats_html += """
""" # 处理标题和链接 escaped_title = html_escape(title_data["title"]) link_url = title_data.get("mobile_url") or title_data.get("url", "") if link_url: escaped_url = html_escape(link_url) stats_html += f'{escaped_title}' else: stats_html += escaped_title stats_html += """
""" stats_html += """
""" # 给热榜统计添加外层包装 if stats_html: stats_html = f"""
{tab_bar_html}{stats_html}
""" # 生成新增新闻区域的HTML new_titles_html = "" if show_new_section and report_data["new_titles"]: new_titles_html += f"""
本次新增热点 (共 {report_data['total_new_count']} 条)
""" for source_data in report_data["new_titles"]: escaped_source = html_escape(source_data["source_name"]) titles_count = len(source_data["titles"]) new_titles_html += f"""
{escaped_source} · {titles_count}条
""" # 为新增新闻也添加序号 for idx, title_data in enumerate(source_data["titles"], 1): ranks = title_data.get("ranks", []) # 处理新增新闻的排名显示 rank_class = "" if ranks: min_rank = min(ranks) if min_rank <= 3: rank_class = "top" elif min_rank <= title_data.get("rank_threshold", 10): rank_class = "high" if len(ranks) == 1: rank_text = str(ranks[0]) else: rank_text = f"{min(ranks)}-{max(ranks)}" else: rank_text = "?" new_titles_html += f"""
{idx}
{rank_text}
""" # 处理新增新闻的链接 escaped_title = html_escape(title_data["title"]) link_url = title_data.get("mobile_url") or title_data.get("url", "") if link_url: escaped_url = html_escape(link_url) new_titles_html += f'{escaped_title}' else: new_titles_html += escaped_title new_titles_html += """
""" new_titles_html += """
""" new_titles_html += """
""" # 生成 RSS 统计内容 def render_rss_stats_html(stats: List[Dict], title: str = "RSS 订阅更新") -> str: """渲染 RSS 统计区块 HTML Args: stats: RSS 分组统计列表,格式与热榜一致: [ { "word": "关键词", "count": 5, "titles": [ { "title": "标题", "source_name": "Feed 名称", "time_display": "12-29 08:20", "url": "...", "is_new": True/False } ] } ] title: 区块标题 Returns: 渲染后的 HTML 字符串 """ if not stats: return "" # 计算总条目数 total_count = sum(stat.get("count", 0) for stat in stats) if total_count == 0: return "" rss_html = f"""
{title}
{total_count} 条
""" # 按关键词分组渲染(与热榜格式一致) for stat in stats: keyword = stat.get("word", "") titles = stat.get("titles", []) if not titles: continue keyword_count = len(titles) rss_html += f"""
{html_escape(keyword)}
{keyword_count} 条
""" for title_data in titles: item_title = title_data.get("title", "") url = title_data.get("url", "") time_display = title_data.get("time_display", "") source_name = title_data.get("source_name", "") is_new = title_data.get("is_new", False) rss_html += """
""" if time_display: rss_html += f'{html_escape(time_display)}' if source_name: rss_html += f'{html_escape(source_name)}' if is_new: rss_html += 'NEW' rss_html += """
""" escaped_title = html_escape(item_title) if url: escaped_url = html_escape(url) rss_html += f'{escaped_title}' else: rss_html += escaped_title rss_html += """
""" rss_html += """
""" rss_html += """
""" return rss_html # 生成独立展示区内容 def render_standalone_html(data: Optional[Dict]) -> str: """渲染独立展示区 HTML(复用热点词汇统计区样式) Args: data: 独立展示数据,格式: { "platforms": [ { "id": "zhihu", "name": "知乎热榜", "items": [ { "title": "标题", "url": "链接", "rank": 1, "ranks": [1, 2, 1], "first_time": "08:00", "last_time": "12:30", "count": 3, } ] } ], "rss_feeds": [ { "id": "hacker-news", "name": "Hacker News", "items": [ { "title": "标题", "url": "链接", "published_at": "2025-01-07T08:00:00", "author": "作者", } ] } ] } Returns: 渲染后的 HTML 字符串 """ if not data: return "" platforms = data.get("platforms", []) rss_feeds = data.get("rss_feeds", []) if not platforms and not rss_feeds: return "" # 计算总条目数 total_platform_items = sum(len(p.get("items", [])) for p in platforms) total_rss_items = sum(len(f.get("items", [])) for f in rss_feeds) total_count = total_platform_items + total_rss_items if total_count == 0: return "" # 收集所有分组信息用于生成 tab all_groups = [] for p in platforms: items = p.get("items", []) if items: all_groups.append({"name": p.get("name", p.get("id", "")), "count": len(items)}) for f in rss_feeds: items = f.get("items", []) if items: all_groups.append({"name": f.get("name", f.get("id", "")), "count": len(items)}) standalone_html = f"""
独立展示区
{total_count} 条
""" # 生成 tab 栏(2+ 分组时) if len(all_groups) >= 2: standalone_html += """
""" for idx, g in enumerate(all_groups): active = ' active' if idx == 0 else '' standalone_html += f""" """ standalone_html += f"""
""" standalone_html += """
""" group_idx = 0 # 渲染热榜平台(复用 word-group 结构) for platform in platforms: platform_name = platform.get("name", platform.get("id", "")) items = platform.get("items", []) if not items: continue standalone_html += f"""
{html_escape(platform_name)}
{len(items)} 条
""" # 渲染每个条目(复用 news-item 结构) for j, item in enumerate(items, 1): title = item.get("title", "") url = item.get("url", "") or item.get("mobileUrl", "") rank = item.get("rank", 0) ranks = item.get("ranks", []) first_time = item.get("first_time", "") last_time = item.get("last_time", "") count = item.get("count", 1) standalone_html += f"""
{j}
""" # 排名显示(复用 rank-num 样式,无 # 前缀) if ranks: min_rank = min(ranks) max_rank = max(ranks) # 确定排名等级 if min_rank <= 3: rank_class = "top" elif min_rank <= 10: rank_class = "high" else: rank_class = "" if min_rank == max_rank: rank_text = str(min_rank) else: rank_text = f"{min_rank}-{max_rank}" standalone_html += f'{rank_text}' elif rank > 0: if rank <= 3: rank_class = "top" elif rank <= 10: rank_class = "high" else: rank_class = "" standalone_html += f'{rank}' # 时间显示(复用 time-info 样式,将 HH-MM 转换为 HH:MM) if first_time and last_time and first_time != last_time: first_time_display = convert_time_for_display(first_time) last_time_display = convert_time_for_display(last_time) standalone_html += f'{html_escape(first_time_display)}~{html_escape(last_time_display)}' elif first_time: first_time_display = convert_time_for_display(first_time) standalone_html += f'{html_escape(first_time_display)}' # 出现次数(复用 count-info 样式) if count > 1: standalone_html += f'{count}次' standalone_html += """
""" # 标题和链接(复用 news-link 样式) escaped_title = html_escape(title) if url: escaped_url = html_escape(url) standalone_html += f'{escaped_title}' else: standalone_html += escaped_title standalone_html += """
""" standalone_html += """
""" group_idx += 1 # 渲染 RSS 源(复用相同结构) for feed in rss_feeds: feed_name = feed.get("name", feed.get("id", "")) items = feed.get("items", []) if not items: continue standalone_html += f"""
{html_escape(feed_name)}
{len(items)} 条
""" for j, item in enumerate(items, 1): title = item.get("title", "") url = item.get("url", "") published_at = item.get("published_at", "") author = item.get("author", "") standalone_html += f"""
{j}
""" # 时间显示(格式化 ISO 时间) if published_at: try: from datetime import datetime as dt if "T" in published_at: dt_obj = dt.fromisoformat(published_at.replace("Z", "+00:00")) time_display = dt_obj.strftime("%m-%d %H:%M") else: time_display = published_at except: time_display = published_at standalone_html += f'{html_escape(time_display)}' # 作者显示 if author: standalone_html += f'{html_escape(author)}' standalone_html += """
""" escaped_title = html_escape(title) if url: escaped_url = html_escape(url) standalone_html += f'{escaped_title}' else: standalone_html += escaped_title standalone_html += """
""" standalone_html += """
""" group_idx += 1 standalone_html += """
""" return standalone_html # 生成 RSS 统计和新增 HTML rss_stats_html = render_rss_stats_html(rss_items, "RSS 订阅更新") if rss_items else "" rss_new_html = render_rss_stats_html(rss_new_items, "RSS 新增更新") if rss_new_items else "" # 生成独立展示区 HTML standalone_html = render_standalone_html(standalone_data) # 生成 AI 分析 HTML ai_html = render_ai_analysis_html_rich(ai_analysis) if ai_analysis else "" # 准备各区域内容映射 region_contents = { "hotlist": stats_html, "rss": rss_stats_html, "new_items": (new_titles_html, rss_new_html), # 元组,分别处理 "standalone": standalone_html, "ai_analysis": ai_html, } def add_section_divider(content: str) -> str: """为内容的外层 div 添加 section-divider 类""" if not content or 'class="' not in content: return content first_class_pos = content.find('class="') if first_class_pos != -1: insert_pos = first_class_pos + len('class="') return content[:insert_pos] + "section-divider " + content[insert_pos:] return content # 按 region_order 顺序组装内容,动态添加分割线 has_previous_content = False for region in region_order: content = region_contents.get(region, "") if region == "new_items": # 特殊处理 new_items 区域(包含热榜新增和 RSS 新增两部分) new_html, rss_new = content if new_html: if has_previous_content: new_html = add_section_divider(new_html) html += new_html has_previous_content = True if rss_new: if has_previous_content: rss_new = add_section_divider(rss_new) html += rss_new has_previous_content = True elif content: if has_previous_content: content = add_section_divider(content) html += content has_previous_content = True html += """
""" return html