| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619 |
- # coding=utf-8
- """
- AI 分析器模块
- 调用 AI 大模型对热点新闻进行深度分析
- 基于 LiteLLM 统一接口,支持 100+ AI 提供商
- """
- import json
- from dataclasses import dataclass, field
- from typing import Any, Callable, Dict, List, Optional
- from trendradar.ai.client import AIClient
- from trendradar.ai.prompt_loader import load_prompt_template
- @dataclass
- class AIAnalysisResult:
- """AI 分析结果"""
- # 新版 5 核心板块
- core_trends: str = "" # 核心热点与舆情态势
- sentiment_controversy: str = "" # 舆论风向与争议
- signals: str = "" # 异动与弱信号
- rss_insights: str = "" # RSS 深度洞察
- outlook_strategy: str = "" # 研判与策略建议
- standalone_summaries: Dict[str, str] = field(default_factory=dict) # 独立展示区概括 {源ID: 概括}
- # 基础元数据
- raw_response: str = "" # 原始响应
- success: bool = False # 是否成功
- skipped: bool = False # 是否因无内容跳过(非失败)
- error: str = "" # 错误信息
- # 新闻数量统计
- total_news: int = 0 # 总新闻数(热榜+RSS)
- analyzed_news: int = 0 # 实际分析的新闻数
- max_news_limit: int = 0 # 分析上限配置值
- hotlist_count: int = 0 # 热榜新闻数
- rss_count: int = 0 # RSS 新闻数
- ai_mode: str = "" # AI 分析使用的模式 (daily/current/incremental)
- class AIAnalyzer:
- """AI 分析器"""
- def __init__(
- self,
- ai_config: Dict[str, Any],
- analysis_config: Dict[str, Any],
- get_time_func: Callable,
- debug: bool = False,
- ):
- """
- 初始化 AI 分析器
- Args:
- ai_config: AI 模型配置(LiteLLM 格式)
- analysis_config: AI 分析功能配置(language, prompt_file 等)
- get_time_func: 获取当前时间的函数
- debug: 是否开启调试模式
- """
- self.ai_config = ai_config
- self.analysis_config = analysis_config
- self.get_time_func = get_time_func
- self.debug = debug
- # 创建 AI 客户端(基于 LiteLLM)
- self.client = AIClient(ai_config)
- # 验证配置
- valid, error = self.client.validate_config()
- if not valid:
- print(f"[AI] 配置警告: {error}")
- # 从分析配置获取功能参数
- self.max_news = analysis_config.get("MAX_NEWS_FOR_ANALYSIS", 50)
- self.include_rss = analysis_config.get("INCLUDE_RSS", True)
- self.include_rank_timeline = analysis_config.get("INCLUDE_RANK_TIMELINE", False)
- self.include_standalone = analysis_config.get("INCLUDE_STANDALONE", False)
- self.language = analysis_config.get("LANGUAGE", "Chinese")
- # 加载提示词模板
- self.system_prompt, self.user_prompt_template = load_prompt_template(
- analysis_config.get("PROMPT_FILE", "ai_analysis_prompt.txt"),
- label="AI",
- )
- def analyze(
- self,
- stats: List[Dict],
- rss_stats: Optional[List[Dict]] = None,
- report_mode: str = "daily",
- report_type: str = "当日汇总",
- platforms: Optional[List[str]] = None,
- keywords: Optional[List[str]] = None,
- standalone_data: Optional[Dict] = None,
- ) -> AIAnalysisResult:
- """
- 执行 AI 分析
- Args:
- stats: 热榜统计数据
- rss_stats: RSS 统计数据
- report_mode: 报告模式
- report_type: 报告类型
- platforms: 平台列表
- keywords: 关键词列表
- Returns:
- AIAnalysisResult: 分析结果
- """
-
- # 打印配置信息方便调试
- model = self.ai_config.get("MODEL", "unknown")
- api_key = self.client.api_key or ""
- api_base = self.ai_config.get("API_BASE", "")
- masked_key = f"{api_key[:5]}******" if len(api_key) >= 5 else "******"
- model_display = model.replace("/", "/\u200b") if model else "unknown"
- print(f"[AI] 模型: {model_display}")
- print(f"[AI] Key : {masked_key}")
- if api_base:
- print(f"[AI] 接口: 存在自定义 API 端点")
- timeout = self.ai_config.get("TIMEOUT", 120)
- max_tokens = self.ai_config.get("MAX_TOKENS", 5000)
- print(f"[AI] 参数: timeout={timeout}, max_tokens={max_tokens}")
- if not self.client.api_key:
- return AIAnalysisResult(
- success=False,
- error="未配置 AI API Key,请在 config.yaml 或环境变量 AI_API_KEY 中设置"
- )
- # 准备新闻内容并获取统计数据
- news_content, rss_content, hotlist_total, rss_total, analyzed_count = self._prepare_news_content(stats, rss_stats)
- total_news = hotlist_total + rss_total
- if not news_content and not rss_content:
- return AIAnalysisResult(
- success=False,
- skipped=True,
- error="本轮无新增热点内容,跳过 AI 分析",
- total_news=total_news,
- hotlist_count=hotlist_total,
- rss_count=rss_total,
- analyzed_news=0,
- max_news_limit=self.max_news
- )
- # 构建提示词
- current_time = self.get_time_func().strftime("%Y-%m-%d %H:%M:%S")
- # 提取关键词
- if not keywords:
- keywords = [s.get("word", "") for s in stats if s.get("word")] if stats else []
- # 使用安全的字符串替换,避免模板中其他花括号(如 JSON 示例)被误解析
- user_prompt = self.user_prompt_template
- user_prompt = user_prompt.replace("{report_mode}", report_mode)
- user_prompt = user_prompt.replace("{report_type}", report_type)
- user_prompt = user_prompt.replace("{current_time}", current_time)
- user_prompt = user_prompt.replace("{news_count}", str(hotlist_total))
- user_prompt = user_prompt.replace("{rss_count}", str(rss_total))
- user_prompt = user_prompt.replace("{platforms}", ", ".join(platforms) if platforms else "多平台")
- user_prompt = user_prompt.replace("{keywords}", ", ".join(keywords[:20]) if keywords else "无")
- user_prompt = user_prompt.replace("{news_content}", news_content)
- user_prompt = user_prompt.replace("{rss_content}", rss_content)
- user_prompt = user_prompt.replace("{language}", self.language)
- # 构建独立展示区内容
- standalone_content = ""
- if self.include_standalone and standalone_data:
- standalone_content = self._prepare_standalone_content(standalone_data)
- user_prompt = user_prompt.replace("{standalone_content}", standalone_content)
- if self.debug:
- print("\n" + "=" * 80)
- print("[AI 调试] 发送给 AI 的完整提示词")
- print("=" * 80)
- if self.system_prompt:
- print("\n--- System Prompt ---")
- print(self.system_prompt)
- print("\n--- User Prompt ---")
- print(user_prompt)
- print("=" * 80 + "\n")
- # 调用 AI API(使用 LiteLLM)
- try:
- response = self._call_ai(user_prompt)
- result = self._parse_response(response)
- # JSON 解析失败时的重试兜底(仅重试一次)
- if result.error and "JSON 解析错误" in result.error:
- print(f"[AI] JSON 解析失败,尝试让 AI 修复...")
- retry_result = self._retry_fix_json(response, result.error)
- if retry_result and retry_result.success and not retry_result.error:
- print("[AI] JSON 修复成功")
- retry_result.raw_response = response
- result = retry_result
- else:
- print("[AI] JSON 修复失败,使用原始文本兜底")
- # 如果配置未启用 RSS 分析,强制清空 AI 返回的 RSS 洞察
- if not self.include_rss:
- result.rss_insights = ""
- # 如果配置未启用 standalone 分析,强制清空
- if not self.include_standalone:
- result.standalone_summaries = {}
- # 填充统计数据
- result.total_news = total_news
- result.hotlist_count = hotlist_total
- result.rss_count = rss_total
- result.analyzed_news = analyzed_count
- result.max_news_limit = self.max_news
- return result
- except Exception as e:
- error_type = type(e).__name__
- error_msg = str(e)
- # 截断过长的错误消息
- if len(error_msg) > 200:
- error_msg = error_msg[:200] + "..."
- friendly_msg = f"AI 分析失败 ({error_type}): {error_msg}"
- return AIAnalysisResult(
- success=False,
- error=friendly_msg
- )
- def _prepare_news_content(
- self,
- stats: List[Dict],
- rss_stats: Optional[List[Dict]] = None,
- ) -> tuple:
- """
- 准备新闻内容文本(增强版)
- 热榜新闻包含:来源、标题、排名范围、时间范围、出现次数
- RSS 包含:来源、标题、发布时间
- Returns:
- tuple: (news_content, rss_content, hotlist_total, rss_total, analyzed_count)
- """
- news_lines = []
- rss_lines = []
- news_count = 0
- rss_count = 0
- # 计算总新闻数
- hotlist_total = sum(len(s.get("titles", [])) for s in stats) if stats else 0
- rss_total = sum(len(s.get("titles", [])) for s in rss_stats) if rss_stats else 0
- # 热榜内容
- if stats:
- for stat in stats:
- word = stat.get("word", "")
- titles = stat.get("titles", [])
- if word and titles:
- news_lines.append(f"\n**{word}** ({len(titles)}条)")
- for t in titles:
- if not isinstance(t, dict):
- continue
- title = t.get("title", "")
- if not title:
- continue
- # 来源
- source = t.get("source_name", t.get("source", ""))
- # 构建行
- if source:
- line = f"- [{source}] {title}"
- else:
- line = f"- {title}"
- # 始终显示简化格式:排名范围 + 时间范围 + 出现次数
- ranks = t.get("ranks", [])
- if ranks:
- min_rank = min(ranks)
- max_rank = max(ranks)
- rank_str = f"{min_rank}" if min_rank == max_rank else f"{min_rank}-{max_rank}"
- else:
- rank_str = "-"
- first_time = t.get("first_time", "")
- last_time = t.get("last_time", "")
- time_str = self._format_time_range(first_time, last_time)
- appear_count = t.get("count", 1)
- line += f" | 排名:{rank_str} | 时间:{time_str} | 出现:{appear_count}次"
- # 开启完整时间线时,额外添加轨迹
- if self.include_rank_timeline:
- rank_timeline = t.get("rank_timeline", [])
- timeline_str = self._format_rank_timeline(rank_timeline)
- line += f" | 轨迹:{timeline_str}"
- news_lines.append(line)
- news_count += 1
- if news_count >= self.max_news:
- break
- if news_count >= self.max_news:
- break
- # RSS 内容(仅在启用时构建)
- if self.include_rss and rss_stats:
- remaining = self.max_news - news_count
- for stat in rss_stats:
- if rss_count >= remaining:
- break
- word = stat.get("word", "")
- titles = stat.get("titles", [])
- if word and titles:
- rss_lines.append(f"\n**{word}** ({len(titles)}条)")
- for t in titles:
- if not isinstance(t, dict):
- continue
- title = t.get("title", "")
- if not title:
- continue
- # 来源
- source = t.get("source_name", t.get("feed_name", ""))
- # 发布时间
- time_display = t.get("time_display", "")
- # 构建行:[来源] 标题 | 发布时间
- if source:
- line = f"- [{source}] {title}"
- else:
- line = f"- {title}"
- if time_display:
- line += f" | {time_display}"
- rss_lines.append(line)
- rss_count += 1
- if rss_count >= remaining:
- break
- news_content = "\n".join(news_lines) if news_lines else ""
- rss_content = "\n".join(rss_lines) if rss_lines else ""
- total_count = news_count + rss_count
- return news_content, rss_content, hotlist_total, rss_total, total_count
- def _call_ai(self, user_prompt: str) -> str:
- """调用 AI API(使用 LiteLLM)"""
- messages = []
- if self.system_prompt:
- messages.append({"role": "system", "content": self.system_prompt})
- messages.append({"role": "user", "content": user_prompt})
- return self.client.chat(messages)
- def _retry_fix_json(self, original_response: str, error_msg: str) -> Optional[AIAnalysisResult]:
- """
- JSON 解析失败时,请求 AI 修复 JSON(仅重试一次)
- 使用轻量 prompt,不重复原始分析的 system prompt,节省 token。
- Args:
- original_response: AI 原始响应(JSON 格式有误)
- error_msg: JSON 解析的错误信息
- Returns:
- 修复后的分析结果,失败时返回 None
- """
- messages = [
- {
- "role": "system",
- "content": (
- "你是一个 JSON 修复助手。用户会提供一段格式有误的 JSON 和错误信息,"
- "你需要修复 JSON 格式错误并返回正确的 JSON。\n"
- "常见问题:字符串值内的双引号未转义、缺少逗号、字符串未正确闭合等。\n"
- "只返回纯 JSON,不要包含 markdown 代码块标记(如 ```json)或任何说明文字。"
- ),
- },
- {
- "role": "user",
- "content": (
- f"以下 JSON 解析失败:\n\n"
- f"错误:{error_msg}\n\n"
- f"原始内容:\n{original_response}\n\n"
- f"请修复以上 JSON 中的格式问题(如值中的双引号改用中文引号「」或转义 \\\"、"
- f"缺少逗号、不完整的字符串等),保持原始内容语义不变,只修复格式。"
- f"直接返回修复后的纯 JSON。"
- ),
- },
- ]
- try:
- response = self.client.chat(messages)
- return self._parse_response(response)
- except Exception as e:
- print(f"[AI] 重试修复 JSON 异常: {type(e).__name__}: {e}")
- return None
- def _format_time_range(self, first_time: str, last_time: str) -> str:
- """格式化时间范围(简化显示,只保留时分)"""
- def extract_time(time_str: str) -> str:
- if not time_str:
- return "-"
- # 尝试提取 HH:MM 部分
- if " " in time_str:
- parts = time_str.split(" ")
- if len(parts) >= 2:
- time_part = parts[1]
- if ":" in time_part:
- return time_part[:5] # HH:MM
- elif ":" in time_str:
- return time_str[:5]
- # 处理 HH-MM 格式
- result = time_str[:5] if len(time_str) >= 5 else time_str
- if len(result) == 5 and result[2] == '-':
- result = result.replace('-', ':')
- return result
- first = extract_time(first_time)
- last = extract_time(last_time)
- if first == last or last == "-":
- return first
- return f"{first}~{last}"
- def _format_rank_timeline(self, rank_timeline: List[Dict]) -> str:
- """格式化排名时间线"""
- if not rank_timeline:
- return "-"
- parts = []
- for item in rank_timeline:
- time_str = item.get("time", "")
- if len(time_str) == 5 and time_str[2] == '-':
- time_str = time_str.replace('-', ':')
- rank = item.get("rank")
- if rank is None:
- parts.append(f"0({time_str})")
- else:
- parts.append(f"{rank}({time_str})")
- return "→".join(parts)
- def _prepare_standalone_content(self, standalone_data: Dict) -> str:
- """
- 将独立展示区数据转为文本,注入 AI 分析 prompt
- Args:
- standalone_data: 独立展示区数据 {"platforms": [...], "rss_feeds": [...]}
- Returns:
- 格式化的文本内容
- """
- lines = []
- # 热榜平台
- for platform in standalone_data.get("platforms", []):
- platform_id = platform.get("id", "")
- platform_name = platform.get("name", platform_id)
- items = platform.get("items", [])
- if not items:
- continue
- lines.append(f"### [{platform_name}]")
- for item in items:
- title = item.get("title", "")
- if not title:
- continue
- line = f"- {title}"
- # 排名信息
- ranks = item.get("ranks", [])
- if ranks:
- min_rank = min(ranks)
- max_rank = max(ranks)
- rank_str = f"{min_rank}" if min_rank == max_rank else f"{min_rank}-{max_rank}"
- line += f" | 排名:{rank_str}"
- # 时间范围
- first_time = item.get("first_time", "")
- last_time = item.get("last_time", "")
- if first_time:
- time_str = self._format_time_range(first_time, last_time)
- line += f" | 时间:{time_str}"
- # 出现次数
- count = item.get("count", 1)
- if count > 1:
- line += f" | 出现:{count}次"
- # 排名轨迹(如果启用)
- if self.include_rank_timeline:
- rank_timeline = item.get("rank_timeline", [])
- if rank_timeline:
- timeline_str = self._format_rank_timeline(rank_timeline)
- line += f" | 轨迹:{timeline_str}"
- lines.append(line)
- lines.append("")
- # RSS 源
- for feed in standalone_data.get("rss_feeds", []):
- feed_id = feed.get("id", "")
- feed_name = feed.get("name", feed_id)
- items = feed.get("items", [])
- if not items:
- continue
- lines.append(f"### [{feed_name}]")
- for item in items:
- title = item.get("title", "")
- if not title:
- continue
- line = f"- {title}"
- published_at = item.get("published_at", "")
- if published_at:
- line += f" | {published_at}"
- lines.append(line)
- lines.append("")
- return "\n".join(lines)
- def _parse_response(self, response: str) -> AIAnalysisResult:
- """解析 AI 响应"""
- result = AIAnalysisResult(raw_response=response)
- if not response or not response.strip():
- result.error = "AI 返回空响应"
- return result
- # 提取 JSON 文本(去掉 markdown 代码块标记)
- json_str = response
- if "```json" in response:
- parts = response.split("```json", 1)
- if len(parts) > 1:
- code_block = parts[1]
- end_idx = code_block.find("```")
- if end_idx != -1:
- json_str = code_block[:end_idx]
- else:
- json_str = code_block
- elif "```" in response:
- parts = response.split("```", 2)
- if len(parts) >= 2:
- json_str = parts[1]
- json_str = json_str.strip()
- if not json_str:
- result.error = "提取的 JSON 内容为空"
- result.core_trends = response[:500] + "..." if len(response) > 500 else response
- result.success = True
- return result
- # 第一步:标准 JSON 解析
- data = None
- parse_error = None
- try:
- data = json.loads(json_str)
- except json.JSONDecodeError as e:
- parse_error = e
- # 第二步:json_repair 本地修复
- if data is None:
- try:
- from json_repair import repair_json
- repaired = repair_json(json_str, return_objects=True)
- if isinstance(repaired, dict):
- data = repaired
- print("[AI] JSON 本地修复成功(json_repair)")
- except Exception:
- pass
- # 两步都失败,记录错误(后续由 analyze 方法的重试机制处理)
- if data is None:
- if parse_error:
- error_context = json_str[max(0, parse_error.pos - 30):parse_error.pos + 30] if json_str and parse_error.pos else ""
- result.error = f"JSON 解析错误 (位置 {parse_error.pos}): {parse_error.msg}"
- if error_context:
- result.error += f",上下文: ...{error_context}..."
- else:
- result.error = "JSON 解析失败"
- # 兜底:使用已提取的 json_str(不含 markdown 标记),避免推送中出现 ```json
- result.core_trends = json_str[:500] + "..." if len(json_str) > 500 else json_str
- result.success = True
- return result
- # 解析成功,提取字段
- try:
- result.core_trends = data.get("core_trends", "")
- result.sentiment_controversy = data.get("sentiment_controversy", "")
- result.signals = data.get("signals", "")
- result.rss_insights = data.get("rss_insights", "")
- result.outlook_strategy = data.get("outlook_strategy", "")
- # 解析独立展示区概括
- summaries = data.get("standalone_summaries", {})
- if isinstance(summaries, dict):
- result.standalone_summaries = {
- str(k): str(v) for k, v in summaries.items()
- }
- result.success = True
- except (KeyError, TypeError, AttributeError) as e:
- result.error = f"字段提取错误: {type(e).__name__}: {e}"
- result.core_trends = json_str[:500] + "..." if len(json_str) > 500 else json_str
- result.success = True
- return result
|