__main__.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171
  1. # coding=utf-8
  2. """
  3. TrendRadar 主程序
  4. 热点新闻聚合与分析工具
  5. 支持: python -m trendradar
  6. """
  7. import os
  8. import webbrowser
  9. from pathlib import Path
  10. from typing import Dict, List, Tuple, Optional
  11. import requests
  12. from trendradar.context import AppContext
  13. from trendradar import __version__
  14. from trendradar.core import load_config
  15. from trendradar.core.analyzer import convert_keyword_stats_to_platform_stats
  16. from trendradar.crawler import DataFetcher
  17. from trendradar.storage import convert_crawl_results_to_news_data
  18. from trendradar.utils.time import is_within_days
  19. def check_version_update(
  20. current_version: str, version_url: str, proxy_url: Optional[str] = None
  21. ) -> Tuple[bool, Optional[str]]:
  22. """检查版本更新"""
  23. try:
  24. proxies = None
  25. if proxy_url:
  26. proxies = {"http": proxy_url, "https": proxy_url}
  27. headers = {
  28. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
  29. "Accept": "text/plain, */*",
  30. "Cache-Control": "no-cache",
  31. }
  32. response = requests.get(
  33. version_url, proxies=proxies, headers=headers, timeout=10
  34. )
  35. response.raise_for_status()
  36. remote_version = response.text.strip()
  37. print(f"当前版本: {current_version}, 远程版本: {remote_version}")
  38. # 比较版本
  39. def parse_version(version_str):
  40. try:
  41. parts = version_str.strip().split(".")
  42. if len(parts) != 3:
  43. raise ValueError("版本号格式不正确")
  44. return int(parts[0]), int(parts[1]), int(parts[2])
  45. except:
  46. return 0, 0, 0
  47. current_tuple = parse_version(current_version)
  48. remote_tuple = parse_version(remote_version)
  49. need_update = current_tuple < remote_tuple
  50. return need_update, remote_version if need_update else None
  51. except Exception as e:
  52. print(f"版本检查失败: {e}")
  53. return False, None
  54. # === 主分析器 ===
  55. class NewsAnalyzer:
  56. """新闻分析器"""
  57. # 模式策略定义
  58. MODE_STRATEGIES = {
  59. "incremental": {
  60. "mode_name": "增量模式",
  61. "description": "增量模式(只关注新增新闻,无新增时不推送)",
  62. "realtime_report_type": "实时增量",
  63. "summary_report_type": "当日汇总",
  64. "should_send_realtime": True,
  65. "should_generate_summary": True,
  66. "summary_mode": "daily",
  67. },
  68. "current": {
  69. "mode_name": "当前榜单模式",
  70. "description": "当前榜单模式(当前榜单匹配新闻 + 新增新闻区域 + 按时推送)",
  71. "realtime_report_type": "实时当前榜单",
  72. "summary_report_type": "当前榜单汇总",
  73. "should_send_realtime": True,
  74. "should_generate_summary": True,
  75. "summary_mode": "current",
  76. },
  77. "daily": {
  78. "mode_name": "当日汇总模式",
  79. "description": "当日汇总模式(所有匹配新闻 + 新增新闻区域 + 按时推送)",
  80. "realtime_report_type": "",
  81. "summary_report_type": "当日汇总",
  82. "should_send_realtime": False,
  83. "should_generate_summary": True,
  84. "summary_mode": "daily",
  85. },
  86. }
  87. def __init__(self):
  88. # 加载配置
  89. print("正在加载配置...")
  90. config = load_config()
  91. print(f"TrendRadar v{__version__} 配置加载完成")
  92. print(f"监控平台数量: {len(config['PLATFORMS'])}")
  93. print(f"时区: {config.get('TIMEZONE', 'Asia/Shanghai')}")
  94. # 创建应用上下文
  95. self.ctx = AppContext(config)
  96. self.request_interval = self.ctx.config["REQUEST_INTERVAL"]
  97. self.report_mode = self.ctx.config["REPORT_MODE"]
  98. self.rank_threshold = self.ctx.rank_threshold
  99. self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true"
  100. self.is_docker_container = self._detect_docker_environment()
  101. self.update_info = None
  102. self.proxy_url = None
  103. self._setup_proxy()
  104. self.data_fetcher = DataFetcher(self.proxy_url)
  105. # 初始化存储管理器(使用 AppContext)
  106. self._init_storage_manager()
  107. if self.is_github_actions:
  108. self._check_version_update()
  109. def _init_storage_manager(self) -> None:
  110. """初始化存储管理器(使用 AppContext)"""
  111. # 获取数据保留天数(支持环境变量覆盖)
  112. env_retention = os.environ.get("STORAGE_RETENTION_DAYS", "").strip()
  113. if env_retention:
  114. # 环境变量覆盖配置
  115. self.ctx.config["STORAGE"]["RETENTION_DAYS"] = int(env_retention)
  116. self.storage_manager = self.ctx.get_storage_manager()
  117. print(f"存储后端: {self.storage_manager.backend_name}")
  118. retention_days = self.ctx.config.get("STORAGE", {}).get("RETENTION_DAYS", 0)
  119. if retention_days > 0:
  120. print(f"数据保留天数: {retention_days} 天")
  121. def _detect_docker_environment(self) -> bool:
  122. """检测是否运行在 Docker 容器中"""
  123. try:
  124. if os.environ.get("DOCKER_CONTAINER") == "true":
  125. return True
  126. if os.path.exists("/.dockerenv"):
  127. return True
  128. return False
  129. except Exception:
  130. return False
  131. def _should_open_browser(self) -> bool:
  132. """判断是否应该打开浏览器"""
  133. return not self.is_github_actions and not self.is_docker_container
  134. def _setup_proxy(self) -> None:
  135. """设置代理配置"""
  136. if not self.is_github_actions and self.ctx.config["USE_PROXY"]:
  137. self.proxy_url = self.ctx.config["DEFAULT_PROXY"]
  138. print("本地环境,使用代理")
  139. elif not self.is_github_actions and not self.ctx.config["USE_PROXY"]:
  140. print("本地环境,未启用代理")
  141. else:
  142. print("GitHub Actions环境,不使用代理")
  143. def _check_version_update(self) -> None:
  144. """检查版本更新"""
  145. try:
  146. need_update, remote_version = check_version_update(
  147. __version__, self.ctx.config["VERSION_CHECK_URL"], self.proxy_url
  148. )
  149. if need_update and remote_version:
  150. self.update_info = {
  151. "current_version": __version__,
  152. "remote_version": remote_version,
  153. }
  154. print(f"发现新版本: {remote_version} (当前: {__version__})")
  155. else:
  156. print("版本检查完成,当前为最新版本")
  157. except Exception as e:
  158. print(f"版本检查出错: {e}")
  159. def _get_mode_strategy(self) -> Dict:
  160. """获取当前模式的策略配置"""
  161. return self.MODE_STRATEGIES.get(self.report_mode, self.MODE_STRATEGIES["daily"])
  162. def _has_notification_configured(self) -> bool:
  163. """检查是否配置了任何通知渠道"""
  164. cfg = self.ctx.config
  165. return any(
  166. [
  167. cfg["FEISHU_WEBHOOK_URL"],
  168. cfg["DINGTALK_WEBHOOK_URL"],
  169. cfg["WEWORK_WEBHOOK_URL"],
  170. (cfg["TELEGRAM_BOT_TOKEN"] and cfg["TELEGRAM_CHAT_ID"]),
  171. (
  172. cfg["EMAIL_FROM"]
  173. and cfg["EMAIL_PASSWORD"]
  174. and cfg["EMAIL_TO"]
  175. ),
  176. (cfg["NTFY_SERVER_URL"] and cfg["NTFY_TOPIC"]),
  177. cfg["BARK_URL"],
  178. cfg["SLACK_WEBHOOK_URL"],
  179. ]
  180. )
  181. def _has_valid_content(
  182. self, stats: List[Dict], new_titles: Optional[Dict] = None
  183. ) -> bool:
  184. """检查是否有有效的新闻内容"""
  185. if self.report_mode == "incremental":
  186. # 增量模式:必须有新增标题且匹配了关键词才推送
  187. has_new_titles = bool(
  188. new_titles and any(len(titles) > 0 for titles in new_titles.values())
  189. )
  190. has_matched_news = any(stat["count"] > 0 for stat in stats)
  191. return has_new_titles and has_matched_news
  192. elif self.report_mode == "current":
  193. # current模式:只要stats有内容就说明有匹配的新闻
  194. return any(stat["count"] > 0 for stat in stats)
  195. else:
  196. # 当日汇总模式下,检查是否有匹配的频率词新闻或新增新闻
  197. has_matched_news = any(stat["count"] > 0 for stat in stats)
  198. has_new_news = bool(
  199. new_titles and any(len(titles) > 0 for titles in new_titles.values())
  200. )
  201. return has_matched_news or has_new_news
  202. def _load_analysis_data(
  203. self,
  204. quiet: bool = False,
  205. ) -> Optional[Tuple[Dict, Dict, Dict, Dict, List, List]]:
  206. """统一的数据加载和预处理,使用当前监控平台列表过滤历史数据"""
  207. try:
  208. # 获取当前配置的监控平台ID列表
  209. current_platform_ids = self.ctx.platform_ids
  210. if not quiet:
  211. print(f"当前监控平台: {current_platform_ids}")
  212. all_results, id_to_name, title_info = self.ctx.read_today_titles(
  213. current_platform_ids, quiet=quiet
  214. )
  215. if not all_results:
  216. print("没有找到当天的数据")
  217. return None
  218. total_titles = sum(len(titles) for titles in all_results.values())
  219. if not quiet:
  220. print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)")
  221. new_titles = self.ctx.detect_new_titles(current_platform_ids, quiet=quiet)
  222. word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
  223. return (
  224. all_results,
  225. id_to_name,
  226. title_info,
  227. new_titles,
  228. word_groups,
  229. filter_words,
  230. global_filters,
  231. )
  232. except Exception as e:
  233. print(f"数据加载失败: {e}")
  234. return None
  235. def _prepare_current_title_info(self, results: Dict, time_info: str) -> Dict:
  236. """从当前抓取结果构建标题信息"""
  237. title_info = {}
  238. for source_id, titles_data in results.items():
  239. title_info[source_id] = {}
  240. for title, title_data in titles_data.items():
  241. ranks = title_data.get("ranks", [])
  242. url = title_data.get("url", "")
  243. mobile_url = title_data.get("mobileUrl", "")
  244. title_info[source_id][title] = {
  245. "first_time": time_info,
  246. "last_time": time_info,
  247. "count": 1,
  248. "ranks": ranks,
  249. "url": url,
  250. "mobileUrl": mobile_url,
  251. }
  252. return title_info
  253. def _run_analysis_pipeline(
  254. self,
  255. data_source: Dict,
  256. mode: str,
  257. title_info: Dict,
  258. new_titles: Dict,
  259. word_groups: List[Dict],
  260. filter_words: List[str],
  261. id_to_name: Dict,
  262. failed_ids: Optional[List] = None,
  263. is_daily_summary: bool = False,
  264. global_filters: Optional[List[str]] = None,
  265. quiet: bool = False,
  266. rss_items: Optional[List[Dict]] = None,
  267. rss_new_items: Optional[List[Dict]] = None,
  268. ) -> Tuple[List[Dict], Optional[str]]:
  269. """统一的分析流水线:数据处理 → 统计计算 → HTML生成"""
  270. # 统计计算(使用 AppContext)
  271. stats, total_titles = self.ctx.count_frequency(
  272. data_source,
  273. word_groups,
  274. filter_words,
  275. id_to_name,
  276. title_info,
  277. new_titles,
  278. mode=mode,
  279. global_filters=global_filters,
  280. quiet=quiet,
  281. )
  282. # 如果是 platform 模式,转换数据结构
  283. if self.ctx.display_mode == "platform" and stats:
  284. stats = convert_keyword_stats_to_platform_stats(
  285. stats,
  286. self.ctx.weight_config,
  287. self.ctx.rank_threshold,
  288. )
  289. # HTML生成(如果启用)
  290. html_file = None
  291. if self.ctx.config["STORAGE"]["FORMATS"]["HTML"]:
  292. html_file = self.ctx.generate_html(
  293. stats,
  294. total_titles,
  295. failed_ids=failed_ids,
  296. new_titles=new_titles,
  297. id_to_name=id_to_name,
  298. mode=mode,
  299. is_daily_summary=is_daily_summary,
  300. update_info=self.update_info if self.ctx.config["SHOW_VERSION_UPDATE"] else None,
  301. rss_items=rss_items,
  302. rss_new_items=rss_new_items,
  303. )
  304. return stats, html_file
  305. def _send_notification_if_needed(
  306. self,
  307. stats: List[Dict],
  308. report_type: str,
  309. mode: str,
  310. failed_ids: Optional[List] = None,
  311. new_titles: Optional[Dict] = None,
  312. id_to_name: Optional[Dict] = None,
  313. html_file_path: Optional[str] = None,
  314. rss_items: Optional[List[Dict]] = None,
  315. rss_new_items: Optional[List[Dict]] = None,
  316. ) -> bool:
  317. """统一的通知发送逻辑,包含所有判断条件,支持热榜+RSS合并推送"""
  318. has_notification = self._has_notification_configured()
  319. cfg = self.ctx.config
  320. # 检查是否有有效内容(热榜或RSS)
  321. has_news_content = self._has_valid_content(stats, new_titles)
  322. has_rss_content = bool(rss_items and len(rss_items) > 0)
  323. has_any_content = has_news_content or has_rss_content
  324. # 计算热榜匹配条数
  325. news_count = sum(len(stat.get("titles", [])) for stat in stats) if stats else 0
  326. rss_count = len(rss_items) if rss_items else 0
  327. if (
  328. cfg["ENABLE_NOTIFICATION"]
  329. and has_notification
  330. and has_any_content
  331. ):
  332. # 输出推送内容统计
  333. content_parts = []
  334. if news_count > 0:
  335. content_parts.append(f"热榜 {news_count} 条")
  336. if rss_count > 0:
  337. content_parts.append(f"RSS {rss_count} 条")
  338. total_count = news_count + rss_count
  339. print(f"[推送] 准备发送:{' + '.join(content_parts)},合计 {total_count} 条")
  340. # 推送窗口控制
  341. if cfg["PUSH_WINDOW"]["ENABLED"]:
  342. push_manager = self.ctx.create_push_manager()
  343. time_range_start = cfg["PUSH_WINDOW"]["TIME_RANGE"]["START"]
  344. time_range_end = cfg["PUSH_WINDOW"]["TIME_RANGE"]["END"]
  345. if not push_manager.is_in_time_range(time_range_start, time_range_end):
  346. now = self.ctx.get_time()
  347. print(
  348. f"推送窗口控制:当前时间 {now.strftime('%H:%M')} 不在推送时间窗口 {time_range_start}-{time_range_end} 内,跳过推送"
  349. )
  350. return False
  351. if cfg["PUSH_WINDOW"]["ONCE_PER_DAY"]:
  352. if push_manager.has_pushed_today():
  353. print(f"推送窗口控制:今天已推送过,跳过本次推送")
  354. return False
  355. else:
  356. print(f"推送窗口控制:今天首次推送")
  357. # 准备报告数据
  358. report_data = self.ctx.prepare_report(stats, failed_ids, new_titles, id_to_name, mode)
  359. # 是否发送版本更新信息
  360. update_info_to_send = self.update_info if cfg["SHOW_VERSION_UPDATE"] else None
  361. # 使用 NotificationDispatcher 发送到所有渠道(合并热榜+RSS)
  362. dispatcher = self.ctx.create_notification_dispatcher()
  363. results = dispatcher.dispatch_all(
  364. report_data=report_data,
  365. report_type=report_type,
  366. update_info=update_info_to_send,
  367. proxy_url=self.proxy_url,
  368. mode=mode,
  369. html_file_path=html_file_path,
  370. rss_items=rss_items,
  371. rss_new_items=rss_new_items,
  372. )
  373. if not results:
  374. print("未配置任何通知渠道,跳过通知发送")
  375. return False
  376. # 如果成功发送了任何通知,且启用了每天只推一次,则记录推送
  377. if (
  378. cfg["PUSH_WINDOW"]["ENABLED"]
  379. and cfg["PUSH_WINDOW"]["ONCE_PER_DAY"]
  380. and any(results.values())
  381. ):
  382. push_manager = self.ctx.create_push_manager()
  383. push_manager.record_push(report_type)
  384. return True
  385. elif cfg["ENABLE_NOTIFICATION"] and not has_notification:
  386. print("⚠️ 警告:通知功能已启用但未配置任何通知渠道,将跳过通知发送")
  387. elif not cfg["ENABLE_NOTIFICATION"]:
  388. print(f"跳过{report_type}通知:通知功能已禁用")
  389. elif (
  390. cfg["ENABLE_NOTIFICATION"]
  391. and has_notification
  392. and not has_any_content
  393. ):
  394. mode_strategy = self._get_mode_strategy()
  395. if "实时" in report_type:
  396. if self.report_mode == "incremental":
  397. has_new = bool(
  398. new_titles and any(len(titles) > 0 for titles in new_titles.values())
  399. )
  400. if not has_new and not has_rss_content:
  401. print("跳过实时推送通知:增量模式下未检测到新增的新闻和RSS")
  402. elif not has_new:
  403. print("跳过实时推送通知:增量模式下新增新闻未匹配到关键词")
  404. else:
  405. print(
  406. f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻"
  407. )
  408. else:
  409. print(
  410. f"跳过{mode_strategy['summary_report_type']}通知:未匹配到有效的新闻内容"
  411. )
  412. return False
  413. def _generate_summary_report(
  414. self,
  415. mode_strategy: Dict,
  416. rss_items: Optional[List[Dict]] = None,
  417. rss_new_items: Optional[List[Dict]] = None,
  418. ) -> Optional[str]:
  419. """生成汇总报告(带通知,支持RSS合并)"""
  420. summary_type = (
  421. "当前榜单汇总" if mode_strategy["summary_mode"] == "current" else "当日汇总"
  422. )
  423. print(f"生成{summary_type}报告...")
  424. # 加载分析数据
  425. analysis_data = self._load_analysis_data()
  426. if not analysis_data:
  427. return None
  428. all_results, id_to_name, title_info, new_titles, word_groups, filter_words, global_filters = (
  429. analysis_data
  430. )
  431. # 运行分析流水线
  432. stats, html_file = self._run_analysis_pipeline(
  433. all_results,
  434. mode_strategy["summary_mode"],
  435. title_info,
  436. new_titles,
  437. word_groups,
  438. filter_words,
  439. id_to_name,
  440. is_daily_summary=True,
  441. global_filters=global_filters,
  442. rss_items=rss_items,
  443. rss_new_items=rss_new_items,
  444. )
  445. if html_file:
  446. print(f"{summary_type}报告已生成: {html_file}")
  447. # 发送通知(合并RSS)
  448. self._send_notification_if_needed(
  449. stats,
  450. mode_strategy["summary_report_type"],
  451. mode_strategy["summary_mode"],
  452. failed_ids=[],
  453. new_titles=new_titles,
  454. id_to_name=id_to_name,
  455. html_file_path=html_file,
  456. rss_items=rss_items,
  457. rss_new_items=rss_new_items,
  458. )
  459. return html_file
  460. def _generate_summary_html(
  461. self,
  462. mode: str = "daily",
  463. rss_items: Optional[List[Dict]] = None,
  464. rss_new_items: Optional[List[Dict]] = None,
  465. ) -> Optional[str]:
  466. """生成汇总HTML"""
  467. summary_type = "当前榜单汇总" if mode == "current" else "当日汇总"
  468. print(f"生成{summary_type}HTML...")
  469. # 加载分析数据(静默模式,避免重复输出日志)
  470. analysis_data = self._load_analysis_data(quiet=True)
  471. if not analysis_data:
  472. return None
  473. all_results, id_to_name, title_info, new_titles, word_groups, filter_words, global_filters = (
  474. analysis_data
  475. )
  476. # 运行分析流水线(静默模式,避免重复输出日志)
  477. _, html_file = self._run_analysis_pipeline(
  478. all_results,
  479. mode,
  480. title_info,
  481. new_titles,
  482. word_groups,
  483. filter_words,
  484. id_to_name,
  485. is_daily_summary=True,
  486. global_filters=global_filters,
  487. quiet=True,
  488. rss_items=rss_items,
  489. rss_new_items=rss_new_items,
  490. )
  491. if html_file:
  492. print(f"{summary_type}HTML已生成: {html_file}")
  493. return html_file
  494. def _initialize_and_check_config(self) -> None:
  495. """通用初始化和配置检查"""
  496. now = self.ctx.get_time()
  497. print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
  498. if not self.ctx.config["ENABLE_CRAWLER"]:
  499. print("爬虫功能已禁用(ENABLE_CRAWLER=False),程序退出")
  500. return
  501. has_notification = self._has_notification_configured()
  502. if not self.ctx.config["ENABLE_NOTIFICATION"]:
  503. print("通知功能已禁用(ENABLE_NOTIFICATION=False),将只进行数据抓取")
  504. elif not has_notification:
  505. print("未配置任何通知渠道,将只进行数据抓取,不发送通知")
  506. else:
  507. print("通知功能已启用,将发送通知")
  508. mode_strategy = self._get_mode_strategy()
  509. print(f"报告模式: {self.report_mode}")
  510. print(f"运行模式: {mode_strategy['description']}")
  511. def _crawl_data(self) -> Tuple[Dict, Dict, List]:
  512. """执行数据爬取"""
  513. ids = []
  514. for platform in self.ctx.platforms:
  515. if "name" in platform:
  516. ids.append((platform["id"], platform["name"]))
  517. else:
  518. ids.append(platform["id"])
  519. print(
  520. f"配置的监控平台: {[p.get('name', p['id']) for p in self.ctx.platforms]}"
  521. )
  522. print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒")
  523. Path("output").mkdir(parents=True, exist_ok=True)
  524. results, id_to_name, failed_ids = self.data_fetcher.crawl_websites(
  525. ids, self.request_interval
  526. )
  527. # 转换为 NewsData 格式并保存到存储后端
  528. crawl_time = self.ctx.format_time()
  529. crawl_date = self.ctx.format_date()
  530. news_data = convert_crawl_results_to_news_data(
  531. results, id_to_name, failed_ids, crawl_time, crawl_date
  532. )
  533. # 保存到存储后端(SQLite)
  534. if self.storage_manager.save_news_data(news_data):
  535. print(f"数据已保存到存储后端: {self.storage_manager.backend_name}")
  536. # 保存 TXT 快照(如果启用)
  537. txt_file = self.storage_manager.save_txt_snapshot(news_data)
  538. if txt_file:
  539. print(f"TXT 快照已保存: {txt_file}")
  540. # 兼容:同时保存到原有 TXT 格式(确保向后兼容)
  541. if self.ctx.config["STORAGE"]["FORMATS"]["TXT"]:
  542. title_file = self.ctx.save_titles(results, id_to_name, failed_ids)
  543. print(f"标题已保存到: {title_file}")
  544. return results, id_to_name, failed_ids
  545. def _crawl_rss_data(self) -> Tuple[Optional[List[Dict]], Optional[List[Dict]]]:
  546. """
  547. 执行 RSS 数据抓取
  548. Returns:
  549. (rss_items, rss_new_items) 元组:
  550. - rss_items: 统计条目列表(按模式处理,用于统计区块)
  551. - rss_new_items: 新增条目列表(用于新增区块)
  552. 如果未启用或失败返回 (None, None)
  553. """
  554. if not self.ctx.rss_enabled:
  555. return None, None
  556. rss_feeds = self.ctx.rss_feeds
  557. if not rss_feeds:
  558. print("[RSS] 未配置任何 RSS 源")
  559. return None, None
  560. try:
  561. from trendradar.crawler.rss import RSSFetcher, RSSFeedConfig
  562. # 构建 RSS 源配置
  563. feeds = []
  564. for feed_config in rss_feeds:
  565. # 读取并验证单个 feed 的 max_age_days(可选)
  566. max_age_days_raw = feed_config.get("max_age_days")
  567. max_age_days = None
  568. if max_age_days_raw is not None:
  569. try:
  570. max_age_days = int(max_age_days_raw)
  571. if max_age_days < 0:
  572. feed_id = feed_config.get("id", "unknown")
  573. print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 为负数,将使用全局默认值")
  574. max_age_days = None
  575. except (ValueError, TypeError):
  576. feed_id = feed_config.get("id", "unknown")
  577. print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 格式错误:{max_age_days_raw}")
  578. max_age_days = None
  579. feed = RSSFeedConfig(
  580. id=feed_config.get("id", ""),
  581. name=feed_config.get("name", ""),
  582. url=feed_config.get("url", ""),
  583. max_items=feed_config.get("max_items", 50),
  584. enabled=feed_config.get("enabled", True),
  585. max_age_days=max_age_days, # None=使用全局,0=禁用,>0=覆盖
  586. )
  587. if feed.id and feed.url and feed.enabled:
  588. feeds.append(feed)
  589. if not feeds:
  590. print("[RSS] 没有启用的 RSS 源")
  591. return None, None
  592. # 创建抓取器
  593. rss_config = self.ctx.rss_config
  594. # RSS 代理:优先使用 RSS 专属代理,否则使用爬虫默认代理
  595. rss_proxy_url = rss_config.get("PROXY_URL", "") or self.proxy_url or ""
  596. # 获取配置的时区
  597. timezone = self.ctx.config.get("TIMEZONE", "Asia/Shanghai")
  598. # 获取新鲜度过滤配置
  599. freshness_config = rss_config.get("FRESHNESS_FILTER", {})
  600. freshness_enabled = freshness_config.get("ENABLED", True)
  601. default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
  602. fetcher = RSSFetcher(
  603. feeds=feeds,
  604. request_interval=rss_config.get("REQUEST_INTERVAL", 2000),
  605. timeout=rss_config.get("TIMEOUT", 15),
  606. use_proxy=rss_config.get("USE_PROXY", False),
  607. proxy_url=rss_proxy_url,
  608. timezone=timezone,
  609. freshness_enabled=freshness_enabled,
  610. default_max_age_days=default_max_age_days,
  611. )
  612. # 抓取数据
  613. rss_data = fetcher.fetch_all()
  614. # 保存到存储后端
  615. if self.storage_manager.save_rss_data(rss_data):
  616. print(f"[RSS] 数据已保存到存储后端")
  617. # 处理 RSS 数据(按模式过滤)并返回用于合并推送
  618. return self._process_rss_data_by_mode(rss_data)
  619. else:
  620. print(f"[RSS] 数据保存失败")
  621. return None, None
  622. except ImportError as e:
  623. print(f"[RSS] 缺少依赖: {e}")
  624. print("[RSS] 请安装 feedparser: pip install feedparser")
  625. return None, None
  626. except Exception as e:
  627. print(f"[RSS] 抓取失败: {e}")
  628. return None, None
  629. def _process_rss_data_by_mode(self, rss_data) -> Tuple[Optional[List[Dict]], Optional[List[Dict]]]:
  630. """
  631. 按报告模式处理 RSS 数据,返回与热榜相同格式的统计结构
  632. 三种模式:
  633. - daily: 当日汇总,统计=当天所有条目,新增=本次新增条目
  634. - current: 当前榜单,统计=当前榜单条目,新增=本次新增条目
  635. - incremental: 增量模式,统计=新增条目,新增=无
  636. Args:
  637. rss_data: 当前抓取的 RSSData 对象
  638. Returns:
  639. (rss_stats, rss_new_stats) 元组:
  640. - rss_stats: RSS 关键词统计列表(与热榜 stats 格式一致)
  641. - rss_new_stats: RSS 新增关键词统计列表(与热榜 stats 格式一致)
  642. """
  643. from trendradar.core.analyzer import count_rss_frequency
  644. rss_config = self.ctx.rss_config
  645. # 检查是否启用 RSS 通知
  646. if not rss_config.get("NOTIFICATION", {}).get("ENABLED", False):
  647. return None, None
  648. # 加载关键词配置
  649. try:
  650. word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
  651. except FileNotFoundError:
  652. word_groups, filter_words, global_filters = [], [], []
  653. timezone = self.ctx.timezone
  654. max_news_per_keyword = self.ctx.config.get("MAX_NEWS_PER_KEYWORD", 0)
  655. sort_by_position_first = self.ctx.config.get("SORT_BY_POSITION_FIRST", False)
  656. rss_stats = None
  657. rss_new_stats = None
  658. # 1. 首先获取新增条目(所有模式都需要)
  659. new_items_dict = self.storage_manager.detect_new_rss_items(rss_data)
  660. new_items_list = None
  661. if new_items_dict:
  662. new_items_list = self._convert_rss_items_to_list(new_items_dict, rss_data.id_to_name)
  663. if new_items_list:
  664. print(f"[RSS] 检测到 {len(new_items_list)} 条新增")
  665. # 2. 根据模式获取统计条目
  666. if self.report_mode == "incremental":
  667. # 增量模式:统计条目就是新增条目
  668. if not new_items_list:
  669. print("[RSS] 增量模式:没有新增 RSS 条目")
  670. return None, None
  671. rss_stats, total = count_rss_frequency(
  672. rss_items=new_items_list,
  673. word_groups=word_groups,
  674. filter_words=filter_words,
  675. global_filters=global_filters,
  676. new_items=new_items_list, # 增量模式所有都是新增
  677. max_news_per_keyword=max_news_per_keyword,
  678. sort_by_position_first=sort_by_position_first,
  679. timezone=timezone,
  680. rank_threshold=self.rank_threshold,
  681. quiet=False,
  682. )
  683. if not rss_stats:
  684. print("[RSS] 增量模式:关键词匹配后没有内容")
  685. return None, None
  686. elif self.report_mode == "current":
  687. # 当前榜单模式:统计=当前榜单所有条目
  688. latest_data = self.storage_manager.get_latest_rss_data(rss_data.date)
  689. if not latest_data:
  690. print("[RSS] 当前榜单模式:没有 RSS 数据")
  691. return None, None
  692. all_items_list = self._convert_rss_items_to_list(latest_data.items, latest_data.id_to_name)
  693. rss_stats, total = count_rss_frequency(
  694. rss_items=all_items_list,
  695. word_groups=word_groups,
  696. filter_words=filter_words,
  697. global_filters=global_filters,
  698. new_items=new_items_list, # 标记新增
  699. max_news_per_keyword=max_news_per_keyword,
  700. sort_by_position_first=sort_by_position_first,
  701. timezone=timezone,
  702. rank_threshold=self.rank_threshold,
  703. quiet=False,
  704. )
  705. if not rss_stats:
  706. print("[RSS] 当前榜单模式:关键词匹配后没有内容")
  707. return None, None
  708. # 生成新增统计
  709. if new_items_list:
  710. rss_new_stats, _ = count_rss_frequency(
  711. rss_items=new_items_list,
  712. word_groups=word_groups,
  713. filter_words=filter_words,
  714. global_filters=global_filters,
  715. new_items=new_items_list,
  716. max_news_per_keyword=max_news_per_keyword,
  717. sort_by_position_first=sort_by_position_first,
  718. timezone=timezone,
  719. rank_threshold=self.rank_threshold,
  720. quiet=True,
  721. )
  722. else:
  723. # daily 模式:统计=当天所有条目
  724. all_data = self.storage_manager.get_rss_data(rss_data.date)
  725. if not all_data:
  726. print("[RSS] 当日汇总模式:没有 RSS 数据")
  727. return None, None
  728. all_items_list = self._convert_rss_items_to_list(all_data.items, all_data.id_to_name)
  729. rss_stats, total = count_rss_frequency(
  730. rss_items=all_items_list,
  731. word_groups=word_groups,
  732. filter_words=filter_words,
  733. global_filters=global_filters,
  734. new_items=new_items_list, # 标记新增
  735. max_news_per_keyword=max_news_per_keyword,
  736. sort_by_position_first=sort_by_position_first,
  737. timezone=timezone,
  738. rank_threshold=self.rank_threshold,
  739. quiet=False,
  740. )
  741. if not rss_stats:
  742. print("[RSS] 当日汇总模式:关键词匹配后没有内容")
  743. return None, None
  744. # 生成新增统计
  745. if new_items_list:
  746. rss_new_stats, _ = count_rss_frequency(
  747. rss_items=new_items_list,
  748. word_groups=word_groups,
  749. filter_words=filter_words,
  750. global_filters=global_filters,
  751. new_items=new_items_list,
  752. max_news_per_keyword=max_news_per_keyword,
  753. sort_by_position_first=sort_by_position_first,
  754. timezone=timezone,
  755. rank_threshold=self.rank_threshold,
  756. quiet=True,
  757. )
  758. return rss_stats, rss_new_stats
  759. def _convert_rss_items_to_list(self, items_dict: Dict, id_to_name: Dict) -> List[Dict]:
  760. """将 RSS 条目字典转换为列表格式,并应用新鲜度过滤(用于推送)"""
  761. rss_items = []
  762. filtered_count = 0
  763. # 获取新鲜度过滤配置
  764. rss_config = self.ctx.rss_config
  765. freshness_config = rss_config.get("FRESHNESS_FILTER", {})
  766. freshness_enabled = freshness_config.get("ENABLED", True)
  767. default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
  768. timezone = self.ctx.config.get("TIMEZONE", "Asia/Shanghai")
  769. # 构建 feed_id -> max_age_days 的映射
  770. feed_max_age_map = {}
  771. for feed_cfg in self.ctx.rss_feeds:
  772. feed_id = feed_cfg.get("id", "")
  773. max_age = feed_cfg.get("max_age_days")
  774. if max_age is not None:
  775. try:
  776. feed_max_age_map[feed_id] = int(max_age)
  777. except (ValueError, TypeError):
  778. pass
  779. for feed_id, items in items_dict.items():
  780. # 确定此 feed 的 max_age_days
  781. max_days = feed_max_age_map.get(feed_id)
  782. if max_days is None:
  783. max_days = default_max_age_days
  784. for item in items:
  785. # 应用新鲜度过滤(仅在启用时)
  786. if freshness_enabled and max_days > 0:
  787. if item.published_at and not is_within_days(item.published_at, max_days, timezone):
  788. filtered_count += 1
  789. continue # 跳过超过指定天数的文章
  790. rss_items.append({
  791. "title": item.title,
  792. "feed_id": feed_id,
  793. "feed_name": id_to_name.get(feed_id, feed_id),
  794. "url": item.url,
  795. "published_at": item.published_at,
  796. "summary": item.summary,
  797. "author": item.author,
  798. })
  799. # 输出过滤统计
  800. if filtered_count > 0:
  801. print(f"[RSS] 新鲜度过滤:跳过 {filtered_count} 篇超过指定天数的旧文章(仍保留在数据库中)")
  802. return rss_items
  803. def _filter_rss_by_keywords(self, rss_items: List[Dict]) -> List[Dict]:
  804. """使用 frequency_words.txt 过滤 RSS 条目"""
  805. try:
  806. word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
  807. if word_groups or filter_words or global_filters:
  808. from trendradar.core.frequency import matches_word_groups
  809. filtered_items = []
  810. for item in rss_items:
  811. title = item.get("title", "")
  812. if matches_word_groups(title, word_groups, filter_words, global_filters):
  813. filtered_items.append(item)
  814. original_count = len(rss_items)
  815. rss_items = filtered_items
  816. print(f"[RSS] 关键词过滤后剩余 {len(rss_items)}/{original_count} 条")
  817. if not rss_items:
  818. print("[RSS] 关键词过滤后没有匹配内容")
  819. return []
  820. except FileNotFoundError:
  821. # frequency_words.txt 不存在时跳过过滤
  822. pass
  823. return rss_items
  824. def _process_rss_report_and_notification(self, rss_data) -> None:
  825. """处理 RSS 报告生成和通知发送(独立推送,已废弃)"""
  826. # 此方法保留用于向后兼容,但不再使用
  827. # RSS 现在与热榜合并推送
  828. pass
  829. def _generate_rss_html_report(self, rss_items: list, feeds_info: dict) -> str:
  830. """生成 RSS HTML 报告"""
  831. try:
  832. from trendradar.report.rss_html import render_rss_html_content
  833. from pathlib import Path
  834. html_content = render_rss_html_content(
  835. rss_items=rss_items,
  836. total_count=len(rss_items),
  837. feeds_info=feeds_info,
  838. get_time_func=self.ctx.get_time,
  839. )
  840. # 保存 HTML 文件
  841. date_folder = self.ctx.format_date()
  842. time_filename = self.ctx.format_time()
  843. output_dir = Path("output") / date_folder / "html"
  844. output_dir.mkdir(parents=True, exist_ok=True)
  845. file_path = output_dir / f"rss_{time_filename}.html"
  846. with open(file_path, "w", encoding="utf-8") as f:
  847. f.write(html_content)
  848. print(f"[RSS] HTML 报告已生成: {file_path}")
  849. return str(file_path)
  850. except Exception as e:
  851. print(f"[RSS] 生成 HTML 报告失败: {e}")
  852. return None
  853. def _execute_mode_strategy(
  854. self, mode_strategy: Dict, results: Dict, id_to_name: Dict, failed_ids: List,
  855. rss_items: Optional[List[Dict]] = None,
  856. rss_new_items: Optional[List[Dict]] = None,
  857. ) -> Optional[str]:
  858. """执行模式特定逻辑,支持热榜+RSS合并推送"""
  859. # 获取当前监控平台ID列表
  860. current_platform_ids = self.ctx.platform_ids
  861. new_titles = self.ctx.detect_new_titles(current_platform_ids)
  862. time_info = self.ctx.format_time()
  863. if self.ctx.config["STORAGE"]["FORMATS"]["TXT"]:
  864. self.ctx.save_titles(results, id_to_name, failed_ids)
  865. word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
  866. # current模式下,实时推送需要使用完整的历史数据来保证统计信息的完整性
  867. if self.report_mode == "current":
  868. # 加载完整的历史数据(已按当前平台过滤)
  869. analysis_data = self._load_analysis_data()
  870. if analysis_data:
  871. (
  872. all_results,
  873. historical_id_to_name,
  874. historical_title_info,
  875. historical_new_titles,
  876. _,
  877. _,
  878. _,
  879. ) = analysis_data
  880. print(
  881. f"current模式:使用过滤后的历史数据,包含平台:{list(all_results.keys())}"
  882. )
  883. stats, html_file = self._run_analysis_pipeline(
  884. all_results,
  885. self.report_mode,
  886. historical_title_info,
  887. historical_new_titles,
  888. word_groups,
  889. filter_words,
  890. historical_id_to_name,
  891. failed_ids=failed_ids,
  892. global_filters=global_filters,
  893. rss_items=rss_items,
  894. rss_new_items=rss_new_items,
  895. )
  896. combined_id_to_name = {**historical_id_to_name, **id_to_name}
  897. if html_file:
  898. print(f"HTML报告已生成: {html_file}")
  899. # 发送实时通知(使用完整历史数据的统计结果,合并RSS)
  900. summary_html = None
  901. if mode_strategy["should_send_realtime"]:
  902. self._send_notification_if_needed(
  903. stats,
  904. mode_strategy["realtime_report_type"],
  905. self.report_mode,
  906. failed_ids=failed_ids,
  907. new_titles=historical_new_titles,
  908. id_to_name=combined_id_to_name,
  909. html_file_path=html_file,
  910. rss_items=rss_items,
  911. rss_new_items=rss_new_items,
  912. )
  913. else:
  914. print("❌ 严重错误:无法读取刚保存的数据文件")
  915. raise RuntimeError("数据一致性检查失败:保存后立即读取失败")
  916. else:
  917. title_info = self._prepare_current_title_info(results, time_info)
  918. stats, html_file = self._run_analysis_pipeline(
  919. results,
  920. self.report_mode,
  921. title_info,
  922. new_titles,
  923. word_groups,
  924. filter_words,
  925. id_to_name,
  926. failed_ids=failed_ids,
  927. global_filters=global_filters,
  928. rss_items=rss_items,
  929. rss_new_items=rss_new_items,
  930. )
  931. if html_file:
  932. print(f"HTML报告已生成: {html_file}")
  933. # 发送实时通知(如果需要,合并RSS)
  934. summary_html = None
  935. if mode_strategy["should_send_realtime"]:
  936. self._send_notification_if_needed(
  937. stats,
  938. mode_strategy["realtime_report_type"],
  939. self.report_mode,
  940. failed_ids=failed_ids,
  941. new_titles=new_titles,
  942. id_to_name=id_to_name,
  943. html_file_path=html_file,
  944. rss_items=rss_items,
  945. rss_new_items=rss_new_items,
  946. )
  947. # 生成汇总报告(如果需要)
  948. summary_html = None
  949. if mode_strategy["should_generate_summary"]:
  950. if mode_strategy["should_send_realtime"]:
  951. # 如果已经发送了实时通知,汇总只生成HTML不发送通知
  952. summary_html = self._generate_summary_html(
  953. mode_strategy["summary_mode"],
  954. rss_items=rss_items,
  955. rss_new_items=rss_new_items,
  956. )
  957. else:
  958. # daily模式:直接生成汇总报告并发送通知(合并RSS)
  959. summary_html = self._generate_summary_report(
  960. mode_strategy, rss_items=rss_items, rss_new_items=rss_new_items
  961. )
  962. # 打开浏览器(仅在非容器环境)
  963. if self._should_open_browser() and html_file:
  964. if summary_html:
  965. summary_url = "file://" + str(Path(summary_html).resolve())
  966. print(f"正在打开汇总报告: {summary_url}")
  967. webbrowser.open(summary_url)
  968. else:
  969. file_url = "file://" + str(Path(html_file).resolve())
  970. print(f"正在打开HTML报告: {file_url}")
  971. webbrowser.open(file_url)
  972. elif self.is_docker_container and html_file:
  973. if summary_html:
  974. print(f"汇总报告已生成(Docker环境): {summary_html}")
  975. else:
  976. print(f"HTML报告已生成(Docker环境): {html_file}")
  977. return summary_html
  978. def run(self) -> None:
  979. """执行分析流程"""
  980. try:
  981. self._initialize_and_check_config()
  982. mode_strategy = self._get_mode_strategy()
  983. # 抓取热榜数据
  984. results, id_to_name, failed_ids = self._crawl_data()
  985. # 抓取 RSS 数据(如果启用),返回统计条目和新增条目用于合并推送
  986. rss_items, rss_new_items = self._crawl_rss_data()
  987. # 执行模式策略,传递 RSS 数据用于合并推送
  988. self._execute_mode_strategy(
  989. mode_strategy, results, id_to_name, failed_ids,
  990. rss_items=rss_items, rss_new_items=rss_new_items
  991. )
  992. except Exception as e:
  993. print(f"分析流程执行出错: {e}")
  994. raise
  995. finally:
  996. # 清理资源(包括过期数据清理和数据库连接关闭)
  997. self.ctx.cleanup()
  998. def main():
  999. """主程序入口"""
  1000. try:
  1001. analyzer = NewsAnalyzer()
  1002. analyzer.run()
  1003. except FileNotFoundError as e:
  1004. print(f"❌ 配置文件错误: {e}")
  1005. print("\n请确保以下文件存在:")
  1006. print(" • config/config.yaml")
  1007. print(" • config/frequency_words.txt")
  1008. print("\n参考项目文档进行正确配置")
  1009. except Exception as e:
  1010. print(f"❌ 程序运行错误: {e}")
  1011. raise
  1012. if __name__ == "__main__":
  1013. main()