context.py 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. # coding=utf-8
  2. """
  3. 应用上下文模块
  4. 提供配置上下文类,封装所有依赖配置的操作,消除全局状态和包装函数。
  5. """
  6. from datetime import datetime
  7. from pathlib import Path
  8. from typing import Any, Dict, List, Optional, Tuple
  9. from trendradar.utils.time import (
  10. DEFAULT_TIMEZONE,
  11. get_configured_time,
  12. format_date_folder,
  13. format_time_filename,
  14. get_current_time_display,
  15. convert_time_for_display,
  16. format_iso_time_friendly,
  17. is_within_days,
  18. )
  19. from trendradar.core import (
  20. load_frequency_words,
  21. matches_word_groups,
  22. read_all_today_titles,
  23. detect_latest_new_titles,
  24. count_word_frequency,
  25. Scheduler,
  26. )
  27. from trendradar.report import (
  28. prepare_report_data,
  29. generate_html_report,
  30. render_html_content,
  31. )
  32. from trendradar.notification import (
  33. render_feishu_content,
  34. render_dingtalk_content,
  35. split_content_into_batches,
  36. NotificationDispatcher,
  37. )
  38. from trendradar.ai import AITranslator
  39. from trendradar.ai.filter import AIFilter, AIFilterResult
  40. from trendradar.storage import get_storage_manager
  41. class AppContext:
  42. """
  43. 应用上下文类
  44. 封装所有依赖配置的操作,提供统一的接口。
  45. 消除对全局 CONFIG 的依赖,提高可测试性。
  46. 使用示例:
  47. config = load_config()
  48. ctx = AppContext(config)
  49. # 时间操作
  50. now = ctx.get_time()
  51. date_folder = ctx.format_date()
  52. # 存储操作
  53. storage = ctx.get_storage_manager()
  54. # 报告生成
  55. html = ctx.generate_html_report(stats, total_titles, ...)
  56. """
  57. def __init__(self, config: Dict[str, Any]):
  58. """
  59. 初始化应用上下文
  60. Args:
  61. config: 完整的配置字典
  62. """
  63. self.config = config
  64. self._storage_manager = None
  65. self._scheduler = None
  66. # === 配置访问 ===
  67. @property
  68. def timezone(self) -> str:
  69. """获取配置的时区"""
  70. return self.config.get("TIMEZONE", DEFAULT_TIMEZONE)
  71. @property
  72. def rank_threshold(self) -> int:
  73. """获取排名阈值"""
  74. return self.config.get("RANK_THRESHOLD", 50)
  75. @property
  76. def weight_config(self) -> Dict:
  77. """获取权重配置"""
  78. return self.config.get("WEIGHT_CONFIG", {})
  79. @property
  80. def platforms(self) -> List[Dict]:
  81. """获取平台配置列表"""
  82. return self.config.get("PLATFORMS", [])
  83. @property
  84. def platform_ids(self) -> List[str]:
  85. """获取平台ID列表"""
  86. return [p["id"] for p in self.platforms]
  87. @property
  88. def rss_config(self) -> Dict:
  89. """获取 RSS 配置"""
  90. return self.config.get("RSS", {})
  91. @property
  92. def rss_enabled(self) -> bool:
  93. """RSS 是否启用"""
  94. return self.rss_config.get("ENABLED", False)
  95. @property
  96. def rss_feeds(self) -> List[Dict]:
  97. """获取 RSS 源列表"""
  98. return self.rss_config.get("FEEDS", [])
  99. @property
  100. def display_mode(self) -> str:
  101. """获取显示模式 (keyword | platform)"""
  102. return self.config.get("DISPLAY_MODE", "keyword")
  103. @property
  104. def show_new_section(self) -> bool:
  105. """是否显示新增热点区域"""
  106. return self.config.get("DISPLAY", {}).get("REGIONS", {}).get("NEW_ITEMS", True)
  107. @property
  108. def region_order(self) -> List[str]:
  109. """获取区域显示顺序"""
  110. default_order = ["hotlist", "rss", "new_items", "standalone", "ai_analysis"]
  111. return self.config.get("DISPLAY", {}).get("REGION_ORDER", default_order)
  112. @property
  113. def filter_method(self) -> str:
  114. """获取筛选策略: keyword | ai"""
  115. return self.config.get("FILTER", {}).get("METHOD", "keyword")
  116. @property
  117. def ai_priority_sort_enabled(self) -> bool:
  118. """AI 模式标签排序开关(与 keyword 的 sort_by_position_first 解耦)"""
  119. return self.config.get("FILTER", {}).get("PRIORITY_SORT_ENABLED", False)
  120. @property
  121. def ai_filter_config(self) -> Dict:
  122. """获取 AI 筛选配置"""
  123. return self.config.get("AI_FILTER", {})
  124. @property
  125. def ai_filter_enabled(self) -> bool:
  126. """AI 筛选是否启用(基于 filter.method 判断)"""
  127. return self.filter_method == "ai"
  128. # === 时间操作 ===
  129. def get_time(self) -> datetime:
  130. """获取当前配置时区的时间"""
  131. return get_configured_time(self.timezone)
  132. def format_date(self) -> str:
  133. """格式化日期文件夹 (YYYY-MM-DD)"""
  134. return format_date_folder(timezone=self.timezone)
  135. def format_time(self) -> str:
  136. """格式化时间文件名 (HH-MM)"""
  137. return format_time_filename(self.timezone)
  138. def get_time_display(self) -> str:
  139. """获取时间显示 (HH:MM)"""
  140. return get_current_time_display(self.timezone)
  141. @staticmethod
  142. def convert_time_display(time_str: str) -> str:
  143. """将 HH-MM 转换为 HH:MM"""
  144. return convert_time_for_display(time_str)
  145. # === 存储操作 ===
  146. def get_storage_manager(self):
  147. """获取存储管理器(延迟初始化,单例)"""
  148. if self._storage_manager is None:
  149. storage_config = self.config.get("STORAGE", {})
  150. remote_config = storage_config.get("REMOTE", {})
  151. local_config = storage_config.get("LOCAL", {})
  152. pull_config = storage_config.get("PULL", {})
  153. self._storage_manager = get_storage_manager(
  154. backend_type=storage_config.get("BACKEND", "auto"),
  155. data_dir=local_config.get("DATA_DIR", "output"),
  156. enable_txt=storage_config.get("FORMATS", {}).get("TXT", True),
  157. enable_html=storage_config.get("FORMATS", {}).get("HTML", True),
  158. remote_config={
  159. "bucket_name": remote_config.get("BUCKET_NAME", ""),
  160. "access_key_id": remote_config.get("ACCESS_KEY_ID", ""),
  161. "secret_access_key": remote_config.get("SECRET_ACCESS_KEY", ""),
  162. "endpoint_url": remote_config.get("ENDPOINT_URL", ""),
  163. "region": remote_config.get("REGION", ""),
  164. },
  165. local_retention_days=local_config.get("RETENTION_DAYS", 0),
  166. remote_retention_days=remote_config.get("RETENTION_DAYS", 0),
  167. pull_enabled=pull_config.get("ENABLED", False),
  168. pull_days=pull_config.get("DAYS", 7),
  169. timezone=self.timezone,
  170. )
  171. return self._storage_manager
  172. def get_output_path(self, subfolder: str, filename: str) -> str:
  173. """获取输出路径(扁平化结构:output/类型/日期/文件名)"""
  174. output_dir = Path("output") / subfolder / self.format_date()
  175. output_dir.mkdir(parents=True, exist_ok=True)
  176. return str(output_dir / filename)
  177. # === 数据处理 ===
  178. def read_today_titles(
  179. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  180. ) -> Tuple[Dict, Dict, Dict]:
  181. """读取当天所有标题"""
  182. return read_all_today_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  183. def detect_new_titles(
  184. self, platform_ids: Optional[List[str]] = None, quiet: bool = False
  185. ) -> Dict:
  186. """检测最新批次的新增标题"""
  187. return detect_latest_new_titles(self.get_storage_manager(), platform_ids, quiet=quiet)
  188. def is_first_crawl(self) -> bool:
  189. """检测是否是当天第一次爬取"""
  190. return self.get_storage_manager().is_first_crawl_today()
  191. # === 频率词处理 ===
  192. def load_frequency_words(
  193. self, frequency_file: Optional[str] = None
  194. ) -> Tuple[List[Dict], List[str], List[str]]:
  195. """加载频率词配置"""
  196. return load_frequency_words(frequency_file)
  197. def matches_word_groups(
  198. self,
  199. title: str,
  200. word_groups: List[Dict],
  201. filter_words: List[str],
  202. global_filters: Optional[List[str]] = None,
  203. ) -> bool:
  204. """检查标题是否匹配词组规则"""
  205. return matches_word_groups(title, word_groups, filter_words, global_filters)
  206. # === 统计分析 ===
  207. def count_frequency(
  208. self,
  209. results: Dict,
  210. word_groups: List[Dict],
  211. filter_words: List[str],
  212. id_to_name: Dict,
  213. title_info: Optional[Dict] = None,
  214. new_titles: Optional[Dict] = None,
  215. mode: str = "daily",
  216. global_filters: Optional[List[str]] = None,
  217. quiet: bool = False,
  218. ) -> Tuple[List[Dict], int]:
  219. """统计词频"""
  220. return count_word_frequency(
  221. results=results,
  222. word_groups=word_groups,
  223. filter_words=filter_words,
  224. id_to_name=id_to_name,
  225. title_info=title_info,
  226. rank_threshold=self.rank_threshold,
  227. new_titles=new_titles,
  228. mode=mode,
  229. global_filters=global_filters,
  230. weight_config=self.weight_config,
  231. max_news_per_keyword=self.config.get("MAX_NEWS_PER_KEYWORD", 0),
  232. sort_by_position_first=self.config.get("SORT_BY_POSITION_FIRST", False),
  233. is_first_crawl_func=self.is_first_crawl,
  234. convert_time_func=self.convert_time_display,
  235. quiet=quiet,
  236. )
  237. # === 报告生成 ===
  238. def prepare_report(
  239. self,
  240. stats: List[Dict],
  241. failed_ids: Optional[List] = None,
  242. new_titles: Optional[Dict] = None,
  243. id_to_name: Optional[Dict] = None,
  244. mode: str = "daily",
  245. frequency_file: Optional[str] = None,
  246. ) -> Dict:
  247. """准备报告数据"""
  248. return prepare_report_data(
  249. stats=stats,
  250. failed_ids=failed_ids,
  251. new_titles=new_titles,
  252. id_to_name=id_to_name,
  253. mode=mode,
  254. rank_threshold=self.rank_threshold,
  255. matches_word_groups_func=self.matches_word_groups,
  256. load_frequency_words_func=lambda: self.load_frequency_words(frequency_file),
  257. show_new_section=self.show_new_section,
  258. )
  259. def generate_html(
  260. self,
  261. stats: List[Dict],
  262. total_titles: int,
  263. failed_ids: Optional[List] = None,
  264. new_titles: Optional[Dict] = None,
  265. id_to_name: Optional[Dict] = None,
  266. mode: str = "daily",
  267. update_info: Optional[Dict] = None,
  268. rss_items: Optional[List[Dict]] = None,
  269. rss_new_items: Optional[List[Dict]] = None,
  270. ai_analysis: Optional[Any] = None,
  271. standalone_data: Optional[Dict] = None,
  272. frequency_file: Optional[str] = None,
  273. ) -> str:
  274. """生成HTML报告"""
  275. return generate_html_report(
  276. stats=stats,
  277. total_titles=total_titles,
  278. failed_ids=failed_ids,
  279. new_titles=new_titles,
  280. id_to_name=id_to_name,
  281. mode=mode,
  282. update_info=update_info,
  283. rank_threshold=self.rank_threshold,
  284. output_dir="output",
  285. date_folder=self.format_date(),
  286. time_filename=self.format_time(),
  287. render_html_func=lambda *args, **kwargs: self.render_html(*args, rss_items=rss_items, rss_new_items=rss_new_items, ai_analysis=ai_analysis, standalone_data=standalone_data, **kwargs),
  288. matches_word_groups_func=self.matches_word_groups,
  289. load_frequency_words_func=lambda: self.load_frequency_words(frequency_file),
  290. )
  291. def render_html(
  292. self,
  293. report_data: Dict,
  294. total_titles: int,
  295. mode: str = "daily",
  296. update_info: Optional[Dict] = None,
  297. rss_items: Optional[List[Dict]] = None,
  298. rss_new_items: Optional[List[Dict]] = None,
  299. ai_analysis: Optional[Any] = None,
  300. standalone_data: Optional[Dict] = None,
  301. ) -> str:
  302. """渲染HTML内容"""
  303. return render_html_content(
  304. report_data=report_data,
  305. total_titles=total_titles,
  306. mode=mode,
  307. update_info=update_info,
  308. region_order=self.region_order,
  309. get_time_func=self.get_time,
  310. rss_items=rss_items,
  311. rss_new_items=rss_new_items,
  312. display_mode=self.display_mode,
  313. ai_analysis=ai_analysis,
  314. show_new_section=self.show_new_section,
  315. standalone_data=standalone_data,
  316. )
  317. # === 通知内容渲染 ===
  318. def render_feishu(
  319. self,
  320. report_data: Dict,
  321. update_info: Optional[Dict] = None,
  322. mode: str = "daily",
  323. ) -> str:
  324. """渲染飞书内容"""
  325. return render_feishu_content(
  326. report_data=report_data,
  327. update_info=update_info,
  328. mode=mode,
  329. separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  330. region_order=self.region_order,
  331. get_time_func=self.get_time,
  332. show_new_section=self.show_new_section,
  333. )
  334. def render_dingtalk(
  335. self,
  336. report_data: Dict,
  337. update_info: Optional[Dict] = None,
  338. mode: str = "daily",
  339. ) -> str:
  340. """渲染钉钉内容"""
  341. return render_dingtalk_content(
  342. report_data=report_data,
  343. update_info=update_info,
  344. mode=mode,
  345. region_order=self.region_order,
  346. get_time_func=self.get_time,
  347. show_new_section=self.show_new_section,
  348. )
  349. def split_content(
  350. self,
  351. report_data: Dict,
  352. format_type: str,
  353. update_info: Optional[Dict] = None,
  354. max_bytes: Optional[int] = None,
  355. mode: str = "daily",
  356. rss_items: Optional[list] = None,
  357. rss_new_items: Optional[list] = None,
  358. ai_content: Optional[str] = None,
  359. standalone_data: Optional[Dict] = None,
  360. ai_stats: Optional[Dict] = None,
  361. report_type: str = "热点分析报告",
  362. ) -> List[str]:
  363. """分批处理消息内容(支持热榜+RSS合并+AI分析+独立展示区)
  364. Args:
  365. report_data: 报告数据
  366. format_type: 格式类型
  367. update_info: 更新信息
  368. max_bytes: 最大字节数
  369. mode: 报告模式
  370. rss_items: RSS 统计条目列表
  371. rss_new_items: RSS 新增条目列表
  372. ai_content: AI 分析内容(已渲染的字符串)
  373. standalone_data: 独立展示区数据
  374. ai_stats: AI 分析统计数据
  375. report_type: 报告类型
  376. Returns:
  377. 分批后的消息内容列表
  378. """
  379. return split_content_into_batches(
  380. report_data=report_data,
  381. format_type=format_type,
  382. update_info=update_info,
  383. max_bytes=max_bytes,
  384. mode=mode,
  385. batch_sizes={
  386. "dingtalk": self.config.get("DINGTALK_BATCH_SIZE", 20000),
  387. "feishu": self.config.get("FEISHU_BATCH_SIZE", 29000),
  388. "default": self.config.get("MESSAGE_BATCH_SIZE", 4000),
  389. },
  390. feishu_separator=self.config.get("FEISHU_MESSAGE_SEPARATOR", "---"),
  391. region_order=self.region_order,
  392. get_time_func=self.get_time,
  393. rss_items=rss_items,
  394. rss_new_items=rss_new_items,
  395. timezone=self.config.get("TIMEZONE", DEFAULT_TIMEZONE),
  396. display_mode=self.display_mode,
  397. ai_content=ai_content,
  398. standalone_data=standalone_data,
  399. rank_threshold=self.rank_threshold,
  400. ai_stats=ai_stats,
  401. report_type=report_type,
  402. show_new_section=self.show_new_section,
  403. )
  404. # === 通知发送 ===
  405. def create_notification_dispatcher(self) -> NotificationDispatcher:
  406. """创建通知调度器"""
  407. # 创建翻译器(如果启用)
  408. translator = None
  409. trans_config = self.config.get("AI_TRANSLATION", {})
  410. if trans_config.get("ENABLED", False):
  411. ai_config = self.config.get("AI", {})
  412. translator = AITranslator(trans_config, ai_config)
  413. return NotificationDispatcher(
  414. config=self.config,
  415. get_time_func=self.get_time,
  416. split_content_func=self.split_content,
  417. translator=translator,
  418. )
  419. def create_scheduler(self) -> Scheduler:
  420. """
  421. 创建调度器(延迟初始化,单例)
  422. 基于 config.yaml 的 schedule 段 + timeline.yaml 构建。
  423. """
  424. if self._scheduler is None:
  425. schedule_config = self.config.get("SCHEDULE", {})
  426. timeline_data = self.config.get("_TIMELINE_DATA", {})
  427. self._scheduler = Scheduler(
  428. schedule_config=schedule_config,
  429. timeline_data=timeline_data,
  430. storage_backend=self.get_storage_manager(),
  431. get_time_func=self.get_time,
  432. fallback_report_mode=self.config.get("REPORT_MODE", "current"),
  433. )
  434. return self._scheduler
  435. # === AI 智能筛选 ===
  436. @staticmethod
  437. def _with_ordered_priorities(tags: List[Dict], start_priority: int = 1) -> List[Dict]:
  438. """按当前列表顺序补齐优先级(值越小优先级越高)"""
  439. normalized: List[Dict] = []
  440. priority = start_priority
  441. for tag_data in tags:
  442. if not isinstance(tag_data, dict):
  443. continue
  444. tag_name = str(tag_data.get("tag", "")).strip()
  445. if not tag_name:
  446. continue
  447. item = dict(tag_data)
  448. item["tag"] = tag_name
  449. item["priority"] = priority
  450. normalized.append(item)
  451. priority += 1
  452. return normalized
  453. def run_ai_filter(self, interests_file: Optional[str] = None) -> Optional[AIFilterResult]:
  454. """
  455. 执行 AI 智能筛选完整流程
  456. Args:
  457. interests_file: 兴趣描述文件名(位于 config/custom/ai/),None=使用默认 config/ai_interests.txt
  458. 1. 读取兴趣描述文件,计算 hash
  459. 2. 对比数据库 prompt_hash,决定是否重新提取标签
  460. 3. 收集待分类新闻(去重)
  461. 4. 按 batch_size 分组调用 AI 分类
  462. 5. 保存结果
  463. 6. 查询 active 结果,按标签分组返回
  464. Returns:
  465. AIFilterResult 或 None(未启用或出错)
  466. """
  467. if not self.ai_filter_enabled:
  468. return None
  469. filter_config = self.ai_filter_config
  470. ai_config = self.config.get("AI", {})
  471. debug = self.config.get("DEBUG", False)
  472. # 创建 AIFilter 实例
  473. ai_filter = AIFilter(ai_config, filter_config, self.get_time, debug)
  474. # 确定实际使用的兴趣文件名
  475. # None = 使用默认 config/ai_interests.txt,指定文件名 = config/custom/ai/{name}
  476. configured_interests = interests_file or filter_config.get("INTERESTS_FILE")
  477. effective_interests_file = configured_interests or "ai_interests.txt"
  478. if debug:
  479. print(f"[AI筛选][DEBUG] === 配置信息 ===")
  480. print(f"[AI筛选][DEBUG] 存储后端: {self.get_storage_manager().backend_name}")
  481. print(f"[AI筛选][DEBUG] batch_size={filter_config.get('BATCH_SIZE', 200)}, "
  482. f"batch_interval={filter_config.get('BATCH_INTERVAL', 5)}")
  483. print(f"[AI筛选][DEBUG] interests_file={effective_interests_file}")
  484. print(f"[AI筛选][DEBUG] prompt_file={filter_config.get('PROMPT_FILE', 'prompt.txt')}")
  485. print(f"[AI筛选][DEBUG] extract_prompt_file={filter_config.get('EXTRACT_PROMPT_FILE', 'extract_prompt.txt')}")
  486. # 1. 读取兴趣描述
  487. # 传 configured_interests(可能为 None)给 load_interests_content,
  488. # 让它区分"默认文件(config/ai_interests.txt)"和"自定义文件(config/custom/ai/)"
  489. interests_content = ai_filter.load_interests_content(configured_interests)
  490. if not interests_content:
  491. return AIFilterResult(success=False, error="兴趣描述文件为空或不存在")
  492. current_hash = ai_filter.compute_interests_hash(interests_content, effective_interests_file)
  493. storage = self.get_storage_manager()
  494. if debug:
  495. print(f"[AI筛选][DEBUG] 兴趣描述 hash: {current_hash}")
  496. print(f"[AI筛选][DEBUG] 兴趣描述内容 ({len(interests_content)} 字符):\n{interests_content}")
  497. # 2. 开启批量模式(远程后端延迟上传,所有写操作完成后统一上传)
  498. storage.begin_batch()
  499. # 3. 检查提示词是否变更
  500. stored_hash = storage.get_latest_prompt_hash(interests_file=effective_interests_file)
  501. if debug:
  502. print(f"[AI筛选][DEBUG] 数据库存储 hash: {stored_hash}")
  503. print(f"[AI筛选][DEBUG] hash 对比: stored={stored_hash} vs current={current_hash} → {'匹配' if stored_hash == current_hash else '不匹配'}")
  504. if stored_hash != current_hash:
  505. new_version = storage.get_latest_ai_filter_tag_version() + 1
  506. threshold = filter_config.get("RECLASSIFY_THRESHOLD", 0.6)
  507. if stored_hash is None:
  508. # 首次运行,直接提取并保存全部标签
  509. print(f"[AI筛选] 首次运行 ({effective_interests_file}),提取标签...")
  510. tags_data = ai_filter.extract_tags(interests_content)
  511. if not tags_data:
  512. storage.end_batch()
  513. return AIFilterResult(success=False, error="标签提取失败")
  514. tags_data = self._with_ordered_priorities(tags_data, start_priority=1)
  515. saved_count = storage.save_ai_filter_tags(tags_data, new_version, current_hash, interests_file=effective_interests_file)
  516. print(f"[AI筛选] 已保存 {saved_count} 个标签 (版本 {new_version})")
  517. else:
  518. # 兴趣描述已变更,让 AI 对比旧标签和新兴趣,给出更新方案
  519. old_tags = storage.get_active_ai_filter_tags(interests_file=effective_interests_file)
  520. update_result = ai_filter.update_tags(old_tags, interests_content)
  521. if update_result is None:
  522. # AI 标签更新失败,回退到重新提取全部标签
  523. print(f"[AI筛选] AI 标签更新失败,回退到重新提取")
  524. tags_data = ai_filter.extract_tags(interests_content)
  525. if not tags_data:
  526. storage.end_batch()
  527. return AIFilterResult(success=False, error="标签提取失败")
  528. tags_data = self._with_ordered_priorities(tags_data, start_priority=1)
  529. deprecated_count = storage.deprecate_all_ai_filter_tags(interests_file=effective_interests_file)
  530. storage.clear_analyzed_news(interests_file=effective_interests_file)
  531. saved_count = storage.save_ai_filter_tags(tags_data, new_version, current_hash, interests_file=effective_interests_file)
  532. print(f"[AI筛选] 废弃 {deprecated_count} 个旧标签, 保存 {saved_count} 个新标签 (版本 {new_version})")
  533. else:
  534. change_ratio = update_result["change_ratio"]
  535. keep_tags = update_result["keep"]
  536. add_tags = update_result["add"]
  537. remove_tags = update_result["remove"]
  538. if debug:
  539. print(f"[AI筛选][DEBUG] AI 标签更新: keep={len(keep_tags)}, add={len(add_tags)}, remove={len(remove_tags)}, change_ratio={change_ratio:.2f}, threshold={threshold:.2f}")
  540. if change_ratio >= threshold:
  541. # 全量重分类:废弃所有旧标签,用 extract_tags 重新提取
  542. print(f"[AI筛选] 兴趣文件变更: {effective_interests_file} (AI change_ratio={change_ratio:.2f} >= threshold={threshold:.2f} → 全量重分类)")
  543. tags_data = ai_filter.extract_tags(interests_content)
  544. if not tags_data:
  545. storage.end_batch()
  546. return AIFilterResult(success=False, error="标签提取失败")
  547. tags_data = self._with_ordered_priorities(tags_data, start_priority=1)
  548. deprecated_count = storage.deprecate_all_ai_filter_tags(interests_file=effective_interests_file)
  549. storage.clear_analyzed_news(interests_file=effective_interests_file)
  550. saved_count = storage.save_ai_filter_tags(tags_data, new_version, current_hash, interests_file=effective_interests_file)
  551. print(f"[AI筛选] 废弃 {deprecated_count} 个旧标签, 保存 {saved_count} 个新标签 (版本 {new_version})")
  552. else:
  553. # 增量更新:按 AI 指示操作
  554. print(f"[AI筛选] 兴趣文件变更: {effective_interests_file} (AI change_ratio={change_ratio:.2f} < threshold={threshold:.2f} → 增量更新)")
  555. print(f"[AI筛选] 保留 {len(keep_tags)} 个标签, 新增 {len(add_tags)} 个, 废弃 {len(remove_tags)} 个")
  556. # 废弃 AI 标记移除的标签
  557. if remove_tags:
  558. remove_set = set(remove_tags)
  559. removed_ids = [t["id"] for t in old_tags if t["tag"] in remove_set]
  560. if removed_ids:
  561. storage.deprecate_specific_ai_filter_tags(removed_ids)
  562. if debug:
  563. print(f"[AI筛选][DEBUG] 废弃标签 IDs: {removed_ids}")
  564. # 更新保留标签的描述
  565. keep_with_priority = []
  566. if keep_tags:
  567. storage.update_ai_filter_tag_descriptions(keep_tags, interests_file=effective_interests_file)
  568. keep_with_priority = self._with_ordered_priorities(keep_tags, start_priority=1)
  569. storage.update_ai_filter_tag_priorities(keep_with_priority, interests_file=effective_interests_file)
  570. # 保存新增标签
  571. if add_tags:
  572. add_start = keep_with_priority[-1]["priority"] + 1 if keep_with_priority else 1
  573. add_with_priority = self._with_ordered_priorities(add_tags, start_priority=add_start)
  574. saved_count = storage.save_ai_filter_tags(add_with_priority, new_version, current_hash, interests_file=effective_interests_file)
  575. if debug:
  576. print(f"[AI筛选][DEBUG] 新增保存 {saved_count} 个标签")
  577. # 更新保留标签的 hash(标记为已处理)
  578. storage.update_ai_filter_tags_hash(effective_interests_file, current_hash)
  579. # 增量更新:清除不匹配新闻的分析记录,让它们有机会被新标签集重新分析
  580. if add_tags:
  581. cleared = storage.clear_unmatched_analyzed_news(interests_file=effective_interests_file)
  582. if cleared > 0:
  583. print(f"[AI筛选] 清除 {cleared} 条不匹配记录,将在新标签下重新分析")
  584. # 3. 获取当前 active 标签
  585. active_tags = storage.get_active_ai_filter_tags(interests_file=effective_interests_file)
  586. if debug:
  587. print(f"[AI筛选][DEBUG] 从数据库获取 active 标签: {len(active_tags)} 个")
  588. for t in active_tags:
  589. print(f"[AI筛选][DEBUG] id={t['id']} tag={t['tag']} priority={t.get('priority', 9999)} version={t.get('version')} hash={t.get('prompt_hash', '')[:8]}...")
  590. if not active_tags:
  591. storage.end_batch()
  592. return AIFilterResult(success=False, error="没有可用的标签")
  593. print(f"[AI筛选] 使用 {len(active_tags)} 个标签")
  594. # 4. 收集待分类新闻
  595. # 热榜
  596. all_news = storage.get_all_news_ids()
  597. analyzed_hotlist = storage.get_analyzed_news_ids("hotlist", interests_file=effective_interests_file)
  598. pending_news = [n for n in all_news if n["id"] not in analyzed_hotlist]
  599. # RSS(先做新鲜度过滤,再去除已分类的)
  600. pending_rss = []
  601. freshness_filtered_rss = 0
  602. if self.rss_enabled:
  603. all_rss = storage.get_all_rss_ids()
  604. # 应用新鲜度过滤(与推送阶段一致)
  605. rss_config = self.rss_config
  606. freshness_config = rss_config.get("FRESHNESS_FILTER", {})
  607. freshness_enabled = freshness_config.get("ENABLED", True)
  608. default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
  609. timezone = self.config.get("TIMEZONE", DEFAULT_TIMEZONE)
  610. # 构建 feed_id -> max_age_days 的映射
  611. feed_max_age_map = {}
  612. for feed_cfg in self.rss_feeds:
  613. feed_id = feed_cfg.get("id", "")
  614. max_age = feed_cfg.get("max_age_days")
  615. if max_age is not None:
  616. try:
  617. feed_max_age_map[feed_id] = int(max_age)
  618. except (ValueError, TypeError):
  619. pass
  620. fresh_rss = []
  621. for n in all_rss:
  622. published_at = n.get("published_at", "")
  623. feed_id = n.get("source_id", "")
  624. max_days = feed_max_age_map.get(feed_id, default_max_age_days)
  625. if freshness_enabled and max_days > 0 and published_at:
  626. if not is_within_days(published_at, max_days, timezone):
  627. freshness_filtered_rss += 1
  628. continue
  629. fresh_rss.append(n)
  630. analyzed_rss = storage.get_analyzed_news_ids("rss", interests_file=effective_interests_file)
  631. pending_rss = [n for n in fresh_rss if n["id"] not in analyzed_rss]
  632. # 始终打印总量/已分析/待分析 的详细数据
  633. hotlist_total = len(all_news)
  634. hotlist_skipped = len(analyzed_hotlist)
  635. hotlist_pending = len(pending_news)
  636. print(f"[AI筛选] 热榜: 总计 {hotlist_total} 条, 已分析跳过 {hotlist_skipped} 条, 本次发送AI分析 {hotlist_pending} 条")
  637. if self.rss_enabled:
  638. rss_total = len(all_rss)
  639. rss_skipped = len(analyzed_rss)
  640. rss_pending = len(pending_rss)
  641. freshness_info = f", 新鲜度过滤 {freshness_filtered_rss} 条" if freshness_filtered_rss > 0 else ""
  642. print(f"[AI筛选] RSS: 总计 {rss_total} 条{freshness_info}, 已分析跳过 {rss_skipped} 条, 本次发送AI分析 {rss_pending} 条")
  643. total_pending = len(pending_news) + len(pending_rss)
  644. if total_pending == 0:
  645. print("[AI筛选] 没有新增新闻需要分类")
  646. # 5. 批量分类
  647. batch_size = filter_config.get("BATCH_SIZE", 200)
  648. batch_interval = filter_config.get("BATCH_INTERVAL", 5)
  649. total_results = []
  650. batch_count = 0 # 跨热榜和 RSS 的全局批次计数
  651. # 处理热榜
  652. for i in range(0, len(pending_news), batch_size):
  653. if batch_count > 0 and batch_interval > 0:
  654. import time
  655. print(f"[AI筛选] 批次间隔等待 {batch_interval} 秒...")
  656. time.sleep(batch_interval)
  657. batch = pending_news[i:i + batch_size]
  658. titles_for_ai = [
  659. {"id": n["id"], "title": n["title"], "source": n.get("source_name", "")}
  660. for n in batch
  661. ]
  662. batch_results = ai_filter.classify_batch(titles_for_ai, active_tags, interests_content)
  663. for r in batch_results:
  664. r["source_type"] = "hotlist"
  665. total_results.extend(batch_results)
  666. batch_count += 1
  667. print(f"[AI筛选] 热榜批次 {i // batch_size + 1}: {len(batch)} 条 → {len(batch_results)} 条匹配")
  668. # 处理 RSS
  669. for i in range(0, len(pending_rss), batch_size):
  670. if batch_count > 0 and batch_interval > 0:
  671. import time
  672. print(f"[AI筛选] 批次间隔等待 {batch_interval} 秒...")
  673. time.sleep(batch_interval)
  674. batch = pending_rss[i:i + batch_size]
  675. titles_for_ai = [
  676. {"id": n["id"], "title": n["title"], "source": n.get("source_name", "")}
  677. for n in batch
  678. ]
  679. batch_results = ai_filter.classify_batch(titles_for_ai, active_tags, interests_content)
  680. for r in batch_results:
  681. r["source_type"] = "rss"
  682. total_results.extend(batch_results)
  683. batch_count += 1
  684. print(f"[AI筛选] RSS 批次 {i // batch_size + 1}: {len(batch)} 条 → {len(batch_results)} 条匹配")
  685. # 6. 保存结果
  686. if total_results:
  687. saved = storage.save_ai_filter_results(total_results)
  688. print(f"[AI筛选] 保存 {saved} 条分类结果")
  689. if debug and saved != len(total_results):
  690. print(f"[AI筛选][DEBUG] !! 保存数量不一致: 期望 {len(total_results)}, 实际 {saved}(可能有重复记录被跳过)")
  691. # 6.5 记录所有已分析的新闻(匹配+不匹配,用于去重)
  692. matched_hotlist_ids = {r["news_item_id"] for r in total_results if r.get("source_type") == "hotlist"}
  693. matched_rss_ids = {r["news_item_id"] for r in total_results if r.get("source_type") == "rss"}
  694. if pending_news:
  695. hotlist_ids = [n["id"] for n in pending_news]
  696. storage.save_analyzed_news(
  697. hotlist_ids, "hotlist", effective_interests_file,
  698. current_hash, matched_hotlist_ids
  699. )
  700. if pending_rss:
  701. rss_ids = [n["id"] for n in pending_rss]
  702. storage.save_analyzed_news(
  703. rss_ids, "rss", effective_interests_file,
  704. current_hash, matched_rss_ids
  705. )
  706. if pending_news or pending_rss:
  707. total_analyzed = len(pending_news) + len(pending_rss)
  708. total_matched = len(matched_hotlist_ids) + len(matched_rss_ids)
  709. print(f"[AI筛选] 已记录 {total_analyzed} 条新闻分析状态 (匹配 {total_matched}, 不匹配 {total_analyzed - total_matched})")
  710. # 7. 结束批量模式(统一上传数据库到远程存储)
  711. storage.end_batch()
  712. # 8. 查询并组装返回结果
  713. all_results = storage.get_active_ai_filter_results(interests_file=effective_interests_file)
  714. if debug:
  715. print(f"[AI筛选][DEBUG] === 最终汇总 ===")
  716. print(f"[AI筛选][DEBUG] 数据库 active 分类结果: {len(all_results)} 条")
  717. # 按标签统计
  718. tag_counts: dict = {}
  719. for r in all_results:
  720. tag_name = r.get("tag", "?")
  721. src_type = r.get("source_type", "?")
  722. key = f"{tag_name}({src_type})"
  723. tag_counts[key] = tag_counts.get(key, 0) + 1
  724. for key, count in sorted(tag_counts.items()):
  725. print(f"[AI筛选][DEBUG] {key}: {count} 条")
  726. return self._build_filter_result(all_results, active_tags, total_pending)
  727. def _build_filter_result(
  728. self,
  729. raw_results: List[Dict],
  730. tags: List[Dict],
  731. total_processed: int,
  732. ) -> AIFilterResult:
  733. """将数据库查询结果组装为 AIFilterResult"""
  734. priority_sort_enabled = self.ai_priority_sort_enabled
  735. tag_priority_map = {}
  736. for idx, t in enumerate(tags, start=1):
  737. tag_name = str(t.get("tag", "")).strip() if isinstance(t, dict) else ""
  738. if not tag_name:
  739. continue
  740. try:
  741. tag_priority_map[tag_name] = int(t.get("priority", idx))
  742. except (TypeError, ValueError):
  743. tag_priority_map[tag_name] = idx
  744. # 按标签分组
  745. tag_groups: Dict[str, Dict] = {}
  746. seen_titles: Dict[str, set] = {} # 每个标签下去重
  747. for r in raw_results:
  748. tag_name = r["tag"]
  749. if tag_name not in tag_groups:
  750. raw_priority = r.get("tag_priority", tag_priority_map.get(tag_name, 9999))
  751. try:
  752. tag_position = int(raw_priority)
  753. except (TypeError, ValueError):
  754. tag_position = 9999
  755. tag_groups[tag_name] = {
  756. "tag": tag_name,
  757. "description": r.get("tag_description", ""),
  758. "position": tag_position,
  759. "count": 0,
  760. "items": [],
  761. }
  762. seen_titles[tag_name] = set()
  763. title = r["title"]
  764. if title in seen_titles[tag_name]:
  765. continue
  766. seen_titles[tag_name].add(title)
  767. tag_groups[tag_name]["items"].append({
  768. "title": title,
  769. "source_id": r.get("source_id", ""),
  770. "source_name": r.get("source_name", ""),
  771. "url": r.get("url", ""),
  772. "mobile_url": r.get("mobile_url", ""),
  773. "rank": r.get("rank", 0),
  774. "ranks": r.get("ranks", []),
  775. "first_time": r.get("first_time", ""),
  776. "last_time": r.get("last_time", ""),
  777. "count": r.get("count", 1),
  778. "relevance_score": r.get("relevance_score", 0),
  779. "source_type": r.get("source_type", "hotlist"),
  780. })
  781. tag_groups[tag_name]["count"] += 1
  782. # 根据配置排序:位置优先 / 数量优先
  783. if priority_sort_enabled:
  784. sorted_tags = sorted(
  785. tag_groups.values(),
  786. key=lambda x: (x.get("position", 9999), -x["count"], x["tag"]),
  787. )
  788. else:
  789. sorted_tags = sorted(
  790. tag_groups.values(),
  791. key=lambda x: (-x["count"], x.get("position", 9999), x["tag"]),
  792. )
  793. total_matched = sum(t["count"] for t in sorted_tags)
  794. return AIFilterResult(
  795. tags=sorted_tags,
  796. total_matched=total_matched,
  797. total_processed=total_processed,
  798. success=True,
  799. )
  800. def convert_ai_filter_to_report_data(
  801. self,
  802. ai_filter_result: AIFilterResult,
  803. mode: str = "daily",
  804. new_titles: Optional[Dict] = None,
  805. rss_new_urls: Optional[set] = None,
  806. ) -> tuple:
  807. """
  808. 将 AI 筛选结果转换为与关键词匹配相同的数据结构
  809. AIFilterResult.tags 中每个 tag 对应一个 "word"(关键词组)。
  810. tag.items 中 source_type="hotlist" 的条目进入热榜 stats,
  811. source_type="rss" 的条目进入 rss_items stats。
  812. Args:
  813. ai_filter_result: AI 筛选结果
  814. mode: 报告模式 ("daily" | "current" | "incremental")
  815. new_titles: 热榜新增标题 {source_id: {title: data}},用于 is_new 检测
  816. rss_new_urls: 新增 RSS 条目的 URL 集合,用于 is_new 检测
  817. Returns:
  818. (hotlist_stats, rss_stats):
  819. - hotlist_stats: 与 count_word_frequency() 产出格式一致
  820. - rss_stats: 与 rss_items 格式一致
  821. """
  822. hotlist_stats = []
  823. rss_stats = []
  824. max_news = self.config.get("MAX_NEWS_PER_KEYWORD", 0)
  825. min_score = self.ai_filter_config.get("MIN_SCORE", 0)
  826. # current 模式:计算最新时间,只保留当前在榜的热榜新闻
  827. # 与 count_word_frequency(mode="current") 的过滤逻辑对齐
  828. latest_time = None
  829. if mode == "current":
  830. for tag_data in ai_filter_result.tags:
  831. for item in tag_data.get("items", []):
  832. if item.get("source_type", "hotlist") == "hotlist":
  833. last_time = item.get("last_time", "")
  834. if last_time and (latest_time is None or last_time > latest_time):
  835. latest_time = last_time
  836. if latest_time:
  837. print(f"[AI筛选] current 模式:最新时间 {latest_time},过滤已下榜新闻")
  838. # RSS 新鲜度过滤配置(与推送阶段一致)
  839. rss_config = self.rss_config
  840. freshness_config = rss_config.get("FRESHNESS_FILTER", {})
  841. freshness_enabled = freshness_config.get("ENABLED", True)
  842. default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
  843. timezone = self.config.get("TIMEZONE", DEFAULT_TIMEZONE)
  844. feed_max_age_map = {}
  845. for feed_cfg in self.rss_feeds:
  846. feed_id = feed_cfg.get("id", "")
  847. max_age = feed_cfg.get("max_age_days")
  848. if max_age is not None:
  849. try:
  850. feed_max_age_map[feed_id] = int(max_age)
  851. except (ValueError, TypeError):
  852. pass
  853. filtered_count = 0
  854. for tag_data in ai_filter_result.tags:
  855. tag_name = tag_data.get("tag", "")
  856. items = tag_data.get("items", [])
  857. if not items:
  858. continue
  859. hotlist_titles = []
  860. rss_titles = []
  861. for item in items:
  862. source_type = item.get("source_type", "hotlist")
  863. # current 模式:跳过已下榜的热榜新闻
  864. if mode == "current" and latest_time and source_type == "hotlist":
  865. if item.get("last_time", "") != latest_time:
  866. filtered_count += 1
  867. continue
  868. # 分数阈值过滤:跳过相关度低于 min_score 的新闻
  869. if min_score > 0:
  870. score = item.get("relevance_score", 0)
  871. if score < min_score:
  872. continue
  873. # 构建时间显示
  874. first_time = item.get("first_time", "")
  875. last_time = item.get("last_time", "")
  876. if source_type == "rss":
  877. # RSS 新鲜度过滤:跳过超过 max_age_days 的旧文章
  878. if freshness_enabled and first_time:
  879. feed_id = item.get("source_id", "")
  880. max_days = feed_max_age_map.get(feed_id, default_max_age_days)
  881. if max_days > 0 and not is_within_days(first_time, max_days, timezone):
  882. continue
  883. # RSS 条目:first_time 是 ISO 格式,用友好格式显示
  884. if first_time:
  885. time_display = format_iso_time_friendly(first_time, timezone, include_date=True)
  886. else:
  887. time_display = ""
  888. else:
  889. # 热榜条目:使用 [HH:MM ~ HH:MM] 格式(与 keyword 模式一致)
  890. if first_time and last_time and first_time != last_time:
  891. first_display = convert_time_for_display(first_time)
  892. last_display = convert_time_for_display(last_time)
  893. time_display = f"[{first_display} ~ {last_display}]"
  894. elif first_time:
  895. time_display = convert_time_for_display(first_time)
  896. else:
  897. time_display = ""
  898. # 计算 is_new(与 keyword 模式 core/analyzer.py:335-342 对齐)
  899. if source_type == "rss":
  900. is_new = False
  901. if rss_new_urls:
  902. item_url = item.get("url", "")
  903. is_new = item_url in rss_new_urls if item_url else False
  904. else:
  905. is_new = False
  906. if new_titles:
  907. item_source_id = item.get("source_id", "")
  908. item_title = item.get("title", "")
  909. if item_source_id in new_titles:
  910. is_new = item_title in new_titles[item_source_id]
  911. title_entry = {
  912. "title": item.get("title", ""),
  913. "source_name": item.get("source_name", ""),
  914. "url": item.get("url", ""),
  915. "mobile_url": item.get("mobile_url", ""),
  916. "ranks": item.get("ranks", []),
  917. "rank_threshold": self.rank_threshold,
  918. "count": item.get("count", 1),
  919. "is_new": is_new,
  920. "time_display": time_display,
  921. "matched_keyword": tag_name,
  922. }
  923. if source_type == "rss":
  924. rss_titles.append(title_entry)
  925. else:
  926. hotlist_titles.append(title_entry)
  927. if hotlist_titles:
  928. if max_news > 0:
  929. hotlist_titles = hotlist_titles[:max_news]
  930. hotlist_stats.append({
  931. "word": tag_name,
  932. "count": len(hotlist_titles),
  933. "position": tag_data.get("position", 9999),
  934. "titles": hotlist_titles,
  935. })
  936. if rss_titles:
  937. if max_news > 0:
  938. rss_titles = rss_titles[:max_news]
  939. rss_stats.append({
  940. "word": tag_name,
  941. "count": len(rss_titles),
  942. "position": tag_data.get("position", 9999),
  943. "titles": rss_titles,
  944. })
  945. if mode == "current" and filtered_count > 0:
  946. total_kept = sum(s["count"] for s in hotlist_stats)
  947. print(f"[AI筛选] current 模式:过滤 {filtered_count} 条已下榜新闻,保留 {total_kept} 条当前在榜")
  948. if min_score > 0:
  949. hotlist_kept = sum(s["count"] for s in hotlist_stats)
  950. rss_kept = sum(s["count"] for s in rss_stats)
  951. total_kept = hotlist_kept + rss_kept
  952. parts = [f"热榜 {hotlist_kept} 条"]
  953. if rss_kept > 0:
  954. parts.append(f"RSS {rss_kept} 条")
  955. print(f"[AI筛选] 分数过滤:min_score={min_score},保留 {total_kept} 条 score≥{min_score} ({', '.join(parts)})")
  956. priority_sort_enabled = self.ai_priority_sort_enabled
  957. if priority_sort_enabled:
  958. hotlist_stats.sort(key=lambda x: (x.get("position", 9999), -x["count"], x["word"]))
  959. rss_stats.sort(key=lambda x: (x.get("position", 9999), -x["count"], x["word"]))
  960. else:
  961. hotlist_stats.sort(key=lambda x: (-x["count"], x.get("position", 9999), x["word"]))
  962. rss_stats.sort(key=lambda x: (-x["count"], x.get("position", 9999), x["word"]))
  963. return hotlist_stats, rss_stats
  964. # === 资源清理 ===
  965. def cleanup(self):
  966. """清理资源"""
  967. if self._storage_manager:
  968. self._storage_manager.cleanup_old_data()
  969. self._storage_manager.cleanup()
  970. self._storage_manager = None