analyzer.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. # coding=utf-8
  2. """
  3. AI 分析器模块
  4. 调用 AI 大模型对热点新闻进行深度分析
  5. 基于 LiteLLM 统一接口,支持 100+ AI 提供商
  6. """
  7. import json
  8. from dataclasses import dataclass, field
  9. from typing import Any, Callable, Dict, List, Optional
  10. from trendradar.ai.client import AIClient
  11. from trendradar.ai.prompt_loader import load_prompt_template
  12. @dataclass
  13. class AIAnalysisResult:
  14. """AI 分析结果"""
  15. # 新版 5 核心板块
  16. core_trends: str = "" # 核心热点与舆情态势
  17. sentiment_controversy: str = "" # 舆论风向与争议
  18. signals: str = "" # 异动与弱信号
  19. rss_insights: str = "" # RSS 深度洞察
  20. outlook_strategy: str = "" # 研判与策略建议
  21. standalone_summaries: Dict[str, str] = field(default_factory=dict) # 独立展示区概括 {源ID: 概括}
  22. # 基础元数据
  23. raw_response: str = "" # 原始响应
  24. success: bool = False # 是否成功
  25. skipped: bool = False # 是否因无内容跳过(非失败)
  26. error: str = "" # 错误信息
  27. # 新闻数量统计
  28. total_news: int = 0 # 总新闻数(热榜+RSS)
  29. analyzed_news: int = 0 # 实际分析的新闻数
  30. max_news_limit: int = 0 # 分析上限配置值
  31. hotlist_count: int = 0 # 热榜新闻数
  32. rss_count: int = 0 # RSS 新闻数
  33. ai_mode: str = "" # AI 分析使用的模式 (daily/current/incremental)
  34. class AIAnalyzer:
  35. """AI 分析器"""
  36. def __init__(
  37. self,
  38. ai_config: Dict[str, Any],
  39. analysis_config: Dict[str, Any],
  40. get_time_func: Callable,
  41. debug: bool = False,
  42. ):
  43. """
  44. 初始化 AI 分析器
  45. Args:
  46. ai_config: AI 模型配置(LiteLLM 格式)
  47. analysis_config: AI 分析功能配置(language, prompt_file 等)
  48. get_time_func: 获取当前时间的函数
  49. debug: 是否开启调试模式
  50. """
  51. self.ai_config = ai_config
  52. self.analysis_config = analysis_config
  53. self.get_time_func = get_time_func
  54. self.debug = debug
  55. # 创建 AI 客户端(基于 LiteLLM)
  56. self.client = AIClient(ai_config)
  57. # 验证配置
  58. valid, error = self.client.validate_config()
  59. if not valid:
  60. print(f"[AI] 配置警告: {error}")
  61. # 从分析配置获取功能参数
  62. self.max_news = analysis_config.get("MAX_NEWS_FOR_ANALYSIS", 50)
  63. self.include_rss = analysis_config.get("INCLUDE_RSS", True)
  64. self.include_rank_timeline = analysis_config.get("INCLUDE_RANK_TIMELINE", False)
  65. self.include_standalone = analysis_config.get("INCLUDE_STANDALONE", False)
  66. self.language = analysis_config.get("LANGUAGE", "Chinese")
  67. # 加载提示词模板
  68. self.system_prompt, self.user_prompt_template = load_prompt_template(
  69. analysis_config.get("PROMPT_FILE", "ai_analysis_prompt.txt"),
  70. label="AI",
  71. )
  72. def analyze(
  73. self,
  74. stats: List[Dict],
  75. rss_stats: Optional[List[Dict]] = None,
  76. report_mode: str = "daily",
  77. report_type: str = "当日汇总",
  78. platforms: Optional[List[str]] = None,
  79. keywords: Optional[List[str]] = None,
  80. standalone_data: Optional[Dict] = None,
  81. ) -> AIAnalysisResult:
  82. """
  83. 执行 AI 分析
  84. Args:
  85. stats: 热榜统计数据
  86. rss_stats: RSS 统计数据
  87. report_mode: 报告模式
  88. report_type: 报告类型
  89. platforms: 平台列表
  90. keywords: 关键词列表
  91. Returns:
  92. AIAnalysisResult: 分析结果
  93. """
  94. # 打印配置信息方便调试
  95. model = self.ai_config.get("MODEL", "unknown")
  96. api_key = self.client.api_key or ""
  97. api_base = self.ai_config.get("API_BASE", "")
  98. masked_key = f"{api_key[:5]}******" if len(api_key) >= 5 else "******"
  99. model_display = model.replace("/", "/\u200b") if model else "unknown"
  100. print(f"[AI] 模型: {model_display}")
  101. print(f"[AI] Key : {masked_key}")
  102. if api_base:
  103. print(f"[AI] 接口: 存在自定义 API 端点")
  104. timeout = self.ai_config.get("TIMEOUT", 120)
  105. max_tokens = self.ai_config.get("MAX_TOKENS", 5000)
  106. print(f"[AI] 参数: timeout={timeout}, max_tokens={max_tokens}")
  107. if not self.client.api_key:
  108. return AIAnalysisResult(
  109. success=False,
  110. error="未配置 AI API Key,请在 config.yaml 或环境变量 AI_API_KEY 中设置"
  111. )
  112. # 准备新闻内容并获取统计数据
  113. news_content, rss_content, hotlist_total, rss_total, analyzed_count = self._prepare_news_content(stats, rss_stats)
  114. total_news = hotlist_total + rss_total
  115. if not news_content and not rss_content:
  116. return AIAnalysisResult(
  117. success=False,
  118. skipped=True,
  119. error="本轮无新增热点内容,跳过 AI 分析",
  120. total_news=total_news,
  121. hotlist_count=hotlist_total,
  122. rss_count=rss_total,
  123. analyzed_news=0,
  124. max_news_limit=self.max_news
  125. )
  126. # 构建提示词
  127. current_time = self.get_time_func().strftime("%Y-%m-%d %H:%M:%S")
  128. # 提取关键词
  129. if not keywords:
  130. keywords = [s.get("word", "") for s in stats if s.get("word")] if stats else []
  131. # 使用安全的字符串替换,避免模板中其他花括号(如 JSON 示例)被误解析
  132. user_prompt = self.user_prompt_template
  133. user_prompt = user_prompt.replace("{report_mode}", report_mode)
  134. user_prompt = user_prompt.replace("{report_type}", report_type)
  135. user_prompt = user_prompt.replace("{current_time}", current_time)
  136. user_prompt = user_prompt.replace("{news_count}", str(hotlist_total))
  137. user_prompt = user_prompt.replace("{rss_count}", str(rss_total))
  138. user_prompt = user_prompt.replace("{platforms}", ", ".join(platforms) if platforms else "多平台")
  139. user_prompt = user_prompt.replace("{keywords}", ", ".join(keywords[:20]) if keywords else "无")
  140. user_prompt = user_prompt.replace("{news_content}", news_content)
  141. user_prompt = user_prompt.replace("{rss_content}", rss_content)
  142. user_prompt = user_prompt.replace("{language}", self.language)
  143. # 构建独立展示区内容
  144. standalone_content = ""
  145. if self.include_standalone and standalone_data:
  146. standalone_content = self._prepare_standalone_content(standalone_data)
  147. user_prompt = user_prompt.replace("{standalone_content}", standalone_content)
  148. if self.debug:
  149. print("\n" + "=" * 80)
  150. print("[AI 调试] 发送给 AI 的完整提示词")
  151. print("=" * 80)
  152. if self.system_prompt:
  153. print("\n--- System Prompt ---")
  154. print(self.system_prompt)
  155. print("\n--- User Prompt ---")
  156. print(user_prompt)
  157. print("=" * 80 + "\n")
  158. # 调用 AI API(使用 LiteLLM)
  159. try:
  160. response = self._call_ai(user_prompt)
  161. result = self._parse_response(response)
  162. # JSON 解析失败时的重试兜底(仅重试一次)
  163. if result.error and "JSON 解析错误" in result.error:
  164. print(f"[AI] JSON 解析失败,尝试让 AI 修复...")
  165. retry_result = self._retry_fix_json(response, result.error)
  166. if retry_result and retry_result.success and not retry_result.error:
  167. print("[AI] JSON 修复成功")
  168. retry_result.raw_response = response
  169. result = retry_result
  170. else:
  171. print("[AI] JSON 修复失败,使用原始文本兜底")
  172. # 如果配置未启用 RSS 分析,强制清空 AI 返回的 RSS 洞察
  173. if not self.include_rss:
  174. result.rss_insights = ""
  175. # 如果配置未启用 standalone 分析,强制清空
  176. if not self.include_standalone:
  177. result.standalone_summaries = {}
  178. # 填充统计数据
  179. result.total_news = total_news
  180. result.hotlist_count = hotlist_total
  181. result.rss_count = rss_total
  182. result.analyzed_news = analyzed_count
  183. result.max_news_limit = self.max_news
  184. return result
  185. except Exception as e:
  186. error_type = type(e).__name__
  187. error_msg = str(e)
  188. # 截断过长的错误消息
  189. if len(error_msg) > 200:
  190. error_msg = error_msg[:200] + "..."
  191. friendly_msg = f"AI 分析失败 ({error_type}): {error_msg}"
  192. return AIAnalysisResult(
  193. success=False,
  194. error=friendly_msg
  195. )
  196. def _prepare_news_content(
  197. self,
  198. stats: List[Dict],
  199. rss_stats: Optional[List[Dict]] = None,
  200. ) -> tuple:
  201. """
  202. 准备新闻内容文本(增强版)
  203. 热榜新闻包含:来源、标题、排名范围、时间范围、出现次数
  204. RSS 包含:来源、标题、发布时间
  205. Returns:
  206. tuple: (news_content, rss_content, hotlist_total, rss_total, analyzed_count)
  207. """
  208. news_lines = []
  209. rss_lines = []
  210. news_count = 0
  211. rss_count = 0
  212. # 计算总新闻数
  213. hotlist_total = sum(len(s.get("titles", [])) for s in stats) if stats else 0
  214. rss_total = sum(len(s.get("titles", [])) for s in rss_stats) if rss_stats else 0
  215. # 热榜内容
  216. if stats:
  217. for stat in stats:
  218. word = stat.get("word", "")
  219. titles = stat.get("titles", [])
  220. if word and titles:
  221. news_lines.append(f"\n**{word}** ({len(titles)}条)")
  222. for t in titles:
  223. if not isinstance(t, dict):
  224. continue
  225. title = t.get("title", "")
  226. if not title:
  227. continue
  228. # 来源
  229. source = t.get("source_name", t.get("source", ""))
  230. # 构建行
  231. if source:
  232. line = f"- [{source}] {title}"
  233. else:
  234. line = f"- {title}"
  235. # 始终显示简化格式:排名范围 + 时间范围 + 出现次数
  236. ranks = t.get("ranks", [])
  237. if ranks:
  238. min_rank = min(ranks)
  239. max_rank = max(ranks)
  240. rank_str = f"{min_rank}" if min_rank == max_rank else f"{min_rank}-{max_rank}"
  241. else:
  242. rank_str = "-"
  243. first_time = t.get("first_time", "")
  244. last_time = t.get("last_time", "")
  245. time_str = self._format_time_range(first_time, last_time)
  246. appear_count = t.get("count", 1)
  247. line += f" | 排名:{rank_str} | 时间:{time_str} | 出现:{appear_count}次"
  248. # 开启完整时间线时,额外添加轨迹
  249. if self.include_rank_timeline:
  250. rank_timeline = t.get("rank_timeline", [])
  251. timeline_str = self._format_rank_timeline(rank_timeline)
  252. line += f" | 轨迹:{timeline_str}"
  253. news_lines.append(line)
  254. news_count += 1
  255. if news_count >= self.max_news:
  256. break
  257. if news_count >= self.max_news:
  258. break
  259. # RSS 内容(仅在启用时构建)
  260. if self.include_rss and rss_stats:
  261. remaining = self.max_news - news_count
  262. for stat in rss_stats:
  263. if rss_count >= remaining:
  264. break
  265. word = stat.get("word", "")
  266. titles = stat.get("titles", [])
  267. if word and titles:
  268. rss_lines.append(f"\n**{word}** ({len(titles)}条)")
  269. for t in titles:
  270. if not isinstance(t, dict):
  271. continue
  272. title = t.get("title", "")
  273. if not title:
  274. continue
  275. # 来源
  276. source = t.get("source_name", t.get("feed_name", ""))
  277. # 发布时间
  278. time_display = t.get("time_display", "")
  279. # 构建行:[来源] 标题 | 发布时间
  280. if source:
  281. line = f"- [{source}] {title}"
  282. else:
  283. line = f"- {title}"
  284. if time_display:
  285. line += f" | {time_display}"
  286. rss_lines.append(line)
  287. rss_count += 1
  288. if rss_count >= remaining:
  289. break
  290. news_content = "\n".join(news_lines) if news_lines else ""
  291. rss_content = "\n".join(rss_lines) if rss_lines else ""
  292. total_count = news_count + rss_count
  293. return news_content, rss_content, hotlist_total, rss_total, total_count
  294. def _call_ai(self, user_prompt: str) -> str:
  295. """调用 AI API(使用 LiteLLM)"""
  296. messages = []
  297. if self.system_prompt:
  298. messages.append({"role": "system", "content": self.system_prompt})
  299. messages.append({"role": "user", "content": user_prompt})
  300. return self.client.chat(messages)
  301. def _retry_fix_json(self, original_response: str, error_msg: str) -> Optional[AIAnalysisResult]:
  302. """
  303. JSON 解析失败时,请求 AI 修复 JSON(仅重试一次)
  304. 使用轻量 prompt,不重复原始分析的 system prompt,节省 token。
  305. Args:
  306. original_response: AI 原始响应(JSON 格式有误)
  307. error_msg: JSON 解析的错误信息
  308. Returns:
  309. 修复后的分析结果,失败时返回 None
  310. """
  311. messages = [
  312. {
  313. "role": "system",
  314. "content": (
  315. "你是一个 JSON 修复助手。用户会提供一段格式有误的 JSON 和错误信息,"
  316. "你需要修复 JSON 格式错误并返回正确的 JSON。\n"
  317. "常见问题:字符串值内的双引号未转义、缺少逗号、字符串未正确闭合等。\n"
  318. "只返回纯 JSON,不要包含 markdown 代码块标记(如 ```json)或任何说明文字。"
  319. ),
  320. },
  321. {
  322. "role": "user",
  323. "content": (
  324. f"以下 JSON 解析失败:\n\n"
  325. f"错误:{error_msg}\n\n"
  326. f"原始内容:\n{original_response}\n\n"
  327. f"请修复以上 JSON 中的格式问题(如值中的双引号改用中文引号「」或转义 \\\"、"
  328. f"缺少逗号、不完整的字符串等),保持原始内容语义不变,只修复格式。"
  329. f"直接返回修复后的纯 JSON。"
  330. ),
  331. },
  332. ]
  333. try:
  334. response = self.client.chat(messages)
  335. return self._parse_response(response)
  336. except Exception as e:
  337. print(f"[AI] 重试修复 JSON 异常: {type(e).__name__}: {e}")
  338. return None
  339. def _format_time_range(self, first_time: str, last_time: str) -> str:
  340. """格式化时间范围(简化显示,只保留时分)"""
  341. def extract_time(time_str: str) -> str:
  342. if not time_str:
  343. return "-"
  344. # 尝试提取 HH:MM 部分
  345. if " " in time_str:
  346. parts = time_str.split(" ")
  347. if len(parts) >= 2:
  348. time_part = parts[1]
  349. if ":" in time_part:
  350. return time_part[:5] # HH:MM
  351. elif ":" in time_str:
  352. return time_str[:5]
  353. # 处理 HH-MM 格式
  354. result = time_str[:5] if len(time_str) >= 5 else time_str
  355. if len(result) == 5 and result[2] == '-':
  356. result = result.replace('-', ':')
  357. return result
  358. first = extract_time(first_time)
  359. last = extract_time(last_time)
  360. if first == last or last == "-":
  361. return first
  362. return f"{first}~{last}"
  363. def _format_rank_timeline(self, rank_timeline: List[Dict]) -> str:
  364. """格式化排名时间线"""
  365. if not rank_timeline:
  366. return "-"
  367. parts = []
  368. for item in rank_timeline:
  369. time_str = item.get("time", "")
  370. if len(time_str) == 5 and time_str[2] == '-':
  371. time_str = time_str.replace('-', ':')
  372. rank = item.get("rank")
  373. if rank is None:
  374. parts.append(f"0({time_str})")
  375. else:
  376. parts.append(f"{rank}({time_str})")
  377. return "→".join(parts)
  378. def _prepare_standalone_content(self, standalone_data: Dict) -> str:
  379. """
  380. 将独立展示区数据转为文本,注入 AI 分析 prompt
  381. Args:
  382. standalone_data: 独立展示区数据 {"platforms": [...], "rss_feeds": [...]}
  383. Returns:
  384. 格式化的文本内容
  385. """
  386. lines = []
  387. # 热榜平台
  388. for platform in standalone_data.get("platforms", []):
  389. platform_id = platform.get("id", "")
  390. platform_name = platform.get("name", platform_id)
  391. items = platform.get("items", [])
  392. if not items:
  393. continue
  394. lines.append(f"### [{platform_name}]")
  395. for item in items:
  396. title = item.get("title", "")
  397. if not title:
  398. continue
  399. line = f"- {title}"
  400. # 排名信息
  401. ranks = item.get("ranks", [])
  402. if ranks:
  403. min_rank = min(ranks)
  404. max_rank = max(ranks)
  405. rank_str = f"{min_rank}" if min_rank == max_rank else f"{min_rank}-{max_rank}"
  406. line += f" | 排名:{rank_str}"
  407. # 时间范围
  408. first_time = item.get("first_time", "")
  409. last_time = item.get("last_time", "")
  410. if first_time:
  411. time_str = self._format_time_range(first_time, last_time)
  412. line += f" | 时间:{time_str}"
  413. # 出现次数
  414. count = item.get("count", 1)
  415. if count > 1:
  416. line += f" | 出现:{count}次"
  417. # 排名轨迹(如果启用)
  418. if self.include_rank_timeline:
  419. rank_timeline = item.get("rank_timeline", [])
  420. if rank_timeline:
  421. timeline_str = self._format_rank_timeline(rank_timeline)
  422. line += f" | 轨迹:{timeline_str}"
  423. lines.append(line)
  424. lines.append("")
  425. # RSS 源
  426. for feed in standalone_data.get("rss_feeds", []):
  427. feed_id = feed.get("id", "")
  428. feed_name = feed.get("name", feed_id)
  429. items = feed.get("items", [])
  430. if not items:
  431. continue
  432. lines.append(f"### [{feed_name}]")
  433. for item in items:
  434. title = item.get("title", "")
  435. if not title:
  436. continue
  437. line = f"- {title}"
  438. published_at = item.get("published_at", "")
  439. if published_at:
  440. line += f" | {published_at}"
  441. lines.append(line)
  442. lines.append("")
  443. return "\n".join(lines)
  444. def _parse_response(self, response: str) -> AIAnalysisResult:
  445. """解析 AI 响应"""
  446. result = AIAnalysisResult(raw_response=response)
  447. if not response or not response.strip():
  448. result.error = "AI 返回空响应"
  449. return result
  450. # 提取 JSON 文本(去掉 markdown 代码块标记)
  451. json_str = response
  452. if "```json" in response:
  453. parts = response.split("```json", 1)
  454. if len(parts) > 1:
  455. code_block = parts[1]
  456. end_idx = code_block.find("```")
  457. if end_idx != -1:
  458. json_str = code_block[:end_idx]
  459. else:
  460. json_str = code_block
  461. elif "```" in response:
  462. parts = response.split("```", 2)
  463. if len(parts) >= 2:
  464. json_str = parts[1]
  465. json_str = json_str.strip()
  466. if not json_str:
  467. result.error = "提取的 JSON 内容为空"
  468. result.core_trends = response[:500] + "..." if len(response) > 500 else response
  469. result.success = True
  470. return result
  471. # 第一步:标准 JSON 解析
  472. data = None
  473. parse_error = None
  474. try:
  475. data = json.loads(json_str)
  476. except json.JSONDecodeError as e:
  477. parse_error = e
  478. # 第二步:json_repair 本地修复
  479. if data is None:
  480. try:
  481. from json_repair import repair_json
  482. repaired = repair_json(json_str, return_objects=True)
  483. if isinstance(repaired, dict):
  484. data = repaired
  485. print("[AI] JSON 本地修复成功(json_repair)")
  486. except Exception:
  487. pass
  488. # 两步都失败,记录错误(后续由 analyze 方法的重试机制处理)
  489. if data is None:
  490. if parse_error:
  491. error_context = json_str[max(0, parse_error.pos - 30):parse_error.pos + 30] if json_str and parse_error.pos else ""
  492. result.error = f"JSON 解析错误 (位置 {parse_error.pos}): {parse_error.msg}"
  493. if error_context:
  494. result.error += f",上下文: ...{error_context}..."
  495. else:
  496. result.error = "JSON 解析失败"
  497. # 兜底:使用已提取的 json_str(不含 markdown 标记),避免推送中出现 ```json
  498. result.core_trends = json_str[:500] + "..." if len(json_str) > 500 else json_str
  499. result.success = True
  500. return result
  501. # 解析成功,提取字段
  502. try:
  503. result.core_trends = data.get("core_trends", "")
  504. result.sentiment_controversy = data.get("sentiment_controversy", "")
  505. result.signals = data.get("signals", "")
  506. result.rss_insights = data.get("rss_insights", "")
  507. result.outlook_strategy = data.get("outlook_strategy", "")
  508. # 解析独立展示区概括
  509. summaries = data.get("standalone_summaries", {})
  510. if isinstance(summaries, dict):
  511. result.standalone_summaries = {
  512. str(k): str(v) for k, v in summaries.items()
  513. }
  514. result.success = True
  515. except (KeyError, TypeError, AttributeError) as e:
  516. result.error = f"字段提取错误: {type(e).__name__}: {e}"
  517. result.core_trends = json_str[:500] + "..." if len(json_str) > 500 else json_str
  518. result.success = True
  519. return result