| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171 |
- # coding=utf-8
- """
- TrendRadar 主程序
- 热点新闻聚合与分析工具
- 支持: python -m trendradar
- """
- import os
- import webbrowser
- from pathlib import Path
- from typing import Dict, List, Tuple, Optional
- import requests
- from trendradar.context import AppContext
- from trendradar import __version__
- from trendradar.core import load_config
- from trendradar.core.analyzer import convert_keyword_stats_to_platform_stats
- from trendradar.crawler import DataFetcher
- from trendradar.storage import convert_crawl_results_to_news_data
- from trendradar.utils.time import is_within_days
- def check_version_update(
- current_version: str, version_url: str, proxy_url: Optional[str] = None
- ) -> Tuple[bool, Optional[str]]:
- """检查版本更新"""
- try:
- proxies = None
- if proxy_url:
- proxies = {"http": proxy_url, "https": proxy_url}
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
- "Accept": "text/plain, */*",
- "Cache-Control": "no-cache",
- }
- response = requests.get(
- version_url, proxies=proxies, headers=headers, timeout=10
- )
- response.raise_for_status()
- remote_version = response.text.strip()
- print(f"当前版本: {current_version}, 远程版本: {remote_version}")
- # 比较版本
- def parse_version(version_str):
- try:
- parts = version_str.strip().split(".")
- if len(parts) != 3:
- raise ValueError("版本号格式不正确")
- return int(parts[0]), int(parts[1]), int(parts[2])
- except:
- return 0, 0, 0
- current_tuple = parse_version(current_version)
- remote_tuple = parse_version(remote_version)
- need_update = current_tuple < remote_tuple
- return need_update, remote_version if need_update else None
- except Exception as e:
- print(f"版本检查失败: {e}")
- return False, None
- # === 主分析器 ===
- class NewsAnalyzer:
- """新闻分析器"""
- # 模式策略定义
- MODE_STRATEGIES = {
- "incremental": {
- "mode_name": "增量模式",
- "description": "增量模式(只关注新增新闻,无新增时不推送)",
- "realtime_report_type": "实时增量",
- "summary_report_type": "当日汇总",
- "should_send_realtime": True,
- "should_generate_summary": True,
- "summary_mode": "daily",
- },
- "current": {
- "mode_name": "当前榜单模式",
- "description": "当前榜单模式(当前榜单匹配新闻 + 新增新闻区域 + 按时推送)",
- "realtime_report_type": "实时当前榜单",
- "summary_report_type": "当前榜单汇总",
- "should_send_realtime": True,
- "should_generate_summary": True,
- "summary_mode": "current",
- },
- "daily": {
- "mode_name": "当日汇总模式",
- "description": "当日汇总模式(所有匹配新闻 + 新增新闻区域 + 按时推送)",
- "realtime_report_type": "",
- "summary_report_type": "当日汇总",
- "should_send_realtime": False,
- "should_generate_summary": True,
- "summary_mode": "daily",
- },
- }
- def __init__(self):
- # 加载配置
- print("正在加载配置...")
- config = load_config()
- print(f"TrendRadar v{__version__} 配置加载完成")
- print(f"监控平台数量: {len(config['PLATFORMS'])}")
- print(f"时区: {config.get('TIMEZONE', 'Asia/Shanghai')}")
- # 创建应用上下文
- self.ctx = AppContext(config)
- self.request_interval = self.ctx.config["REQUEST_INTERVAL"]
- self.report_mode = self.ctx.config["REPORT_MODE"]
- self.rank_threshold = self.ctx.rank_threshold
- self.is_github_actions = os.environ.get("GITHUB_ACTIONS") == "true"
- self.is_docker_container = self._detect_docker_environment()
- self.update_info = None
- self.proxy_url = None
- self._setup_proxy()
- self.data_fetcher = DataFetcher(self.proxy_url)
- # 初始化存储管理器(使用 AppContext)
- self._init_storage_manager()
- if self.is_github_actions:
- self._check_version_update()
- def _init_storage_manager(self) -> None:
- """初始化存储管理器(使用 AppContext)"""
- # 获取数据保留天数(支持环境变量覆盖)
- env_retention = os.environ.get("STORAGE_RETENTION_DAYS", "").strip()
- if env_retention:
- # 环境变量覆盖配置
- self.ctx.config["STORAGE"]["RETENTION_DAYS"] = int(env_retention)
- self.storage_manager = self.ctx.get_storage_manager()
- print(f"存储后端: {self.storage_manager.backend_name}")
- retention_days = self.ctx.config.get("STORAGE", {}).get("RETENTION_DAYS", 0)
- if retention_days > 0:
- print(f"数据保留天数: {retention_days} 天")
- def _detect_docker_environment(self) -> bool:
- """检测是否运行在 Docker 容器中"""
- try:
- if os.environ.get("DOCKER_CONTAINER") == "true":
- return True
- if os.path.exists("/.dockerenv"):
- return True
- return False
- except Exception:
- return False
- def _should_open_browser(self) -> bool:
- """判断是否应该打开浏览器"""
- return not self.is_github_actions and not self.is_docker_container
- def _setup_proxy(self) -> None:
- """设置代理配置"""
- if not self.is_github_actions and self.ctx.config["USE_PROXY"]:
- self.proxy_url = self.ctx.config["DEFAULT_PROXY"]
- print("本地环境,使用代理")
- elif not self.is_github_actions and not self.ctx.config["USE_PROXY"]:
- print("本地环境,未启用代理")
- else:
- print("GitHub Actions环境,不使用代理")
- def _check_version_update(self) -> None:
- """检查版本更新"""
- try:
- need_update, remote_version = check_version_update(
- __version__, self.ctx.config["VERSION_CHECK_URL"], self.proxy_url
- )
- if need_update and remote_version:
- self.update_info = {
- "current_version": __version__,
- "remote_version": remote_version,
- }
- print(f"发现新版本: {remote_version} (当前: {__version__})")
- else:
- print("版本检查完成,当前为最新版本")
- except Exception as e:
- print(f"版本检查出错: {e}")
- def _get_mode_strategy(self) -> Dict:
- """获取当前模式的策略配置"""
- return self.MODE_STRATEGIES.get(self.report_mode, self.MODE_STRATEGIES["daily"])
- def _has_notification_configured(self) -> bool:
- """检查是否配置了任何通知渠道"""
- cfg = self.ctx.config
- return any(
- [
- cfg["FEISHU_WEBHOOK_URL"],
- cfg["DINGTALK_WEBHOOK_URL"],
- cfg["WEWORK_WEBHOOK_URL"],
- (cfg["TELEGRAM_BOT_TOKEN"] and cfg["TELEGRAM_CHAT_ID"]),
- (
- cfg["EMAIL_FROM"]
- and cfg["EMAIL_PASSWORD"]
- and cfg["EMAIL_TO"]
- ),
- (cfg["NTFY_SERVER_URL"] and cfg["NTFY_TOPIC"]),
- cfg["BARK_URL"],
- cfg["SLACK_WEBHOOK_URL"],
- ]
- )
- def _has_valid_content(
- self, stats: List[Dict], new_titles: Optional[Dict] = None
- ) -> bool:
- """检查是否有有效的新闻内容"""
- if self.report_mode == "incremental":
- # 增量模式:必须有新增标题且匹配了关键词才推送
- has_new_titles = bool(
- new_titles and any(len(titles) > 0 for titles in new_titles.values())
- )
- has_matched_news = any(stat["count"] > 0 for stat in stats)
- return has_new_titles and has_matched_news
- elif self.report_mode == "current":
- # current模式:只要stats有内容就说明有匹配的新闻
- return any(stat["count"] > 0 for stat in stats)
- else:
- # 当日汇总模式下,检查是否有匹配的频率词新闻或新增新闻
- has_matched_news = any(stat["count"] > 0 for stat in stats)
- has_new_news = bool(
- new_titles and any(len(titles) > 0 for titles in new_titles.values())
- )
- return has_matched_news or has_new_news
- def _load_analysis_data(
- self,
- quiet: bool = False,
- ) -> Optional[Tuple[Dict, Dict, Dict, Dict, List, List]]:
- """统一的数据加载和预处理,使用当前监控平台列表过滤历史数据"""
- try:
- # 获取当前配置的监控平台ID列表
- current_platform_ids = self.ctx.platform_ids
- if not quiet:
- print(f"当前监控平台: {current_platform_ids}")
- all_results, id_to_name, title_info = self.ctx.read_today_titles(
- current_platform_ids, quiet=quiet
- )
- if not all_results:
- print("没有找到当天的数据")
- return None
- total_titles = sum(len(titles) for titles in all_results.values())
- if not quiet:
- print(f"读取到 {total_titles} 个标题(已按当前监控平台过滤)")
- new_titles = self.ctx.detect_new_titles(current_platform_ids, quiet=quiet)
- word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
- return (
- all_results,
- id_to_name,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- global_filters,
- )
- except Exception as e:
- print(f"数据加载失败: {e}")
- return None
- def _prepare_current_title_info(self, results: Dict, time_info: str) -> Dict:
- """从当前抓取结果构建标题信息"""
- title_info = {}
- for source_id, titles_data in results.items():
- title_info[source_id] = {}
- for title, title_data in titles_data.items():
- ranks = title_data.get("ranks", [])
- url = title_data.get("url", "")
- mobile_url = title_data.get("mobileUrl", "")
- title_info[source_id][title] = {
- "first_time": time_info,
- "last_time": time_info,
- "count": 1,
- "ranks": ranks,
- "url": url,
- "mobileUrl": mobile_url,
- }
- return title_info
- def _run_analysis_pipeline(
- self,
- data_source: Dict,
- mode: str,
- title_info: Dict,
- new_titles: Dict,
- word_groups: List[Dict],
- filter_words: List[str],
- id_to_name: Dict,
- failed_ids: Optional[List] = None,
- is_daily_summary: bool = False,
- global_filters: Optional[List[str]] = None,
- quiet: bool = False,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ) -> Tuple[List[Dict], Optional[str]]:
- """统一的分析流水线:数据处理 → 统计计算 → HTML生成"""
- # 统计计算(使用 AppContext)
- stats, total_titles = self.ctx.count_frequency(
- data_source,
- word_groups,
- filter_words,
- id_to_name,
- title_info,
- new_titles,
- mode=mode,
- global_filters=global_filters,
- quiet=quiet,
- )
- # 如果是 platform 模式,转换数据结构
- if self.ctx.display_mode == "platform" and stats:
- stats = convert_keyword_stats_to_platform_stats(
- stats,
- self.ctx.weight_config,
- self.ctx.rank_threshold,
- )
- # HTML生成(如果启用)
- html_file = None
- if self.ctx.config["STORAGE"]["FORMATS"]["HTML"]:
- html_file = self.ctx.generate_html(
- stats,
- total_titles,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- mode=mode,
- is_daily_summary=is_daily_summary,
- update_info=self.update_info if self.ctx.config["SHOW_VERSION_UPDATE"] else None,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- return stats, html_file
- def _send_notification_if_needed(
- self,
- stats: List[Dict],
- report_type: str,
- mode: str,
- failed_ids: Optional[List] = None,
- new_titles: Optional[Dict] = None,
- id_to_name: Optional[Dict] = None,
- html_file_path: Optional[str] = None,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ) -> bool:
- """统一的通知发送逻辑,包含所有判断条件,支持热榜+RSS合并推送"""
- has_notification = self._has_notification_configured()
- cfg = self.ctx.config
- # 检查是否有有效内容(热榜或RSS)
- has_news_content = self._has_valid_content(stats, new_titles)
- has_rss_content = bool(rss_items and len(rss_items) > 0)
- has_any_content = has_news_content or has_rss_content
- # 计算热榜匹配条数
- news_count = sum(len(stat.get("titles", [])) for stat in stats) if stats else 0
- rss_count = len(rss_items) if rss_items else 0
- if (
- cfg["ENABLE_NOTIFICATION"]
- and has_notification
- and has_any_content
- ):
- # 输出推送内容统计
- content_parts = []
- if news_count > 0:
- content_parts.append(f"热榜 {news_count} 条")
- if rss_count > 0:
- content_parts.append(f"RSS {rss_count} 条")
- total_count = news_count + rss_count
- print(f"[推送] 准备发送:{' + '.join(content_parts)},合计 {total_count} 条")
- # 推送窗口控制
- if cfg["PUSH_WINDOW"]["ENABLED"]:
- push_manager = self.ctx.create_push_manager()
- time_range_start = cfg["PUSH_WINDOW"]["TIME_RANGE"]["START"]
- time_range_end = cfg["PUSH_WINDOW"]["TIME_RANGE"]["END"]
- if not push_manager.is_in_time_range(time_range_start, time_range_end):
- now = self.ctx.get_time()
- print(
- f"推送窗口控制:当前时间 {now.strftime('%H:%M')} 不在推送时间窗口 {time_range_start}-{time_range_end} 内,跳过推送"
- )
- return False
- if cfg["PUSH_WINDOW"]["ONCE_PER_DAY"]:
- if push_manager.has_pushed_today():
- print(f"推送窗口控制:今天已推送过,跳过本次推送")
- return False
- else:
- print(f"推送窗口控制:今天首次推送")
- # 准备报告数据
- report_data = self.ctx.prepare_report(stats, failed_ids, new_titles, id_to_name, mode)
- # 是否发送版本更新信息
- update_info_to_send = self.update_info if cfg["SHOW_VERSION_UPDATE"] else None
- # 使用 NotificationDispatcher 发送到所有渠道(合并热榜+RSS)
- dispatcher = self.ctx.create_notification_dispatcher()
- results = dispatcher.dispatch_all(
- report_data=report_data,
- report_type=report_type,
- update_info=update_info_to_send,
- proxy_url=self.proxy_url,
- mode=mode,
- html_file_path=html_file_path,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- if not results:
- print("未配置任何通知渠道,跳过通知发送")
- return False
- # 如果成功发送了任何通知,且启用了每天只推一次,则记录推送
- if (
- cfg["PUSH_WINDOW"]["ENABLED"]
- and cfg["PUSH_WINDOW"]["ONCE_PER_DAY"]
- and any(results.values())
- ):
- push_manager = self.ctx.create_push_manager()
- push_manager.record_push(report_type)
- return True
- elif cfg["ENABLE_NOTIFICATION"] and not has_notification:
- print("⚠️ 警告:通知功能已启用但未配置任何通知渠道,将跳过通知发送")
- elif not cfg["ENABLE_NOTIFICATION"]:
- print(f"跳过{report_type}通知:通知功能已禁用")
- elif (
- cfg["ENABLE_NOTIFICATION"]
- and has_notification
- and not has_any_content
- ):
- mode_strategy = self._get_mode_strategy()
- if "实时" in report_type:
- if self.report_mode == "incremental":
- has_new = bool(
- new_titles and any(len(titles) > 0 for titles in new_titles.values())
- )
- if not has_new and not has_rss_content:
- print("跳过实时推送通知:增量模式下未检测到新增的新闻和RSS")
- elif not has_new:
- print("跳过实时推送通知:增量模式下新增新闻未匹配到关键词")
- else:
- print(
- f"跳过实时推送通知:{mode_strategy['mode_name']}下未检测到匹配的新闻"
- )
- else:
- print(
- f"跳过{mode_strategy['summary_report_type']}通知:未匹配到有效的新闻内容"
- )
- return False
- def _generate_summary_report(
- self,
- mode_strategy: Dict,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ) -> Optional[str]:
- """生成汇总报告(带通知,支持RSS合并)"""
- summary_type = (
- "当前榜单汇总" if mode_strategy["summary_mode"] == "current" else "当日汇总"
- )
- print(f"生成{summary_type}报告...")
- # 加载分析数据
- analysis_data = self._load_analysis_data()
- if not analysis_data:
- return None
- all_results, id_to_name, title_info, new_titles, word_groups, filter_words, global_filters = (
- analysis_data
- )
- # 运行分析流水线
- stats, html_file = self._run_analysis_pipeline(
- all_results,
- mode_strategy["summary_mode"],
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- is_daily_summary=True,
- global_filters=global_filters,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- if html_file:
- print(f"{summary_type}报告已生成: {html_file}")
- # 发送通知(合并RSS)
- self._send_notification_if_needed(
- stats,
- mode_strategy["summary_report_type"],
- mode_strategy["summary_mode"],
- failed_ids=[],
- new_titles=new_titles,
- id_to_name=id_to_name,
- html_file_path=html_file,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- return html_file
- def _generate_summary_html(
- self,
- mode: str = "daily",
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ) -> Optional[str]:
- """生成汇总HTML"""
- summary_type = "当前榜单汇总" if mode == "current" else "当日汇总"
- print(f"生成{summary_type}HTML...")
- # 加载分析数据(静默模式,避免重复输出日志)
- analysis_data = self._load_analysis_data(quiet=True)
- if not analysis_data:
- return None
- all_results, id_to_name, title_info, new_titles, word_groups, filter_words, global_filters = (
- analysis_data
- )
- # 运行分析流水线(静默模式,避免重复输出日志)
- _, html_file = self._run_analysis_pipeline(
- all_results,
- mode,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- is_daily_summary=True,
- global_filters=global_filters,
- quiet=True,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- if html_file:
- print(f"{summary_type}HTML已生成: {html_file}")
- return html_file
- def _initialize_and_check_config(self) -> None:
- """通用初始化和配置检查"""
- now = self.ctx.get_time()
- print(f"当前北京时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
- if not self.ctx.config["ENABLE_CRAWLER"]:
- print("爬虫功能已禁用(ENABLE_CRAWLER=False),程序退出")
- return
- has_notification = self._has_notification_configured()
- if not self.ctx.config["ENABLE_NOTIFICATION"]:
- print("通知功能已禁用(ENABLE_NOTIFICATION=False),将只进行数据抓取")
- elif not has_notification:
- print("未配置任何通知渠道,将只进行数据抓取,不发送通知")
- else:
- print("通知功能已启用,将发送通知")
- mode_strategy = self._get_mode_strategy()
- print(f"报告模式: {self.report_mode}")
- print(f"运行模式: {mode_strategy['description']}")
- def _crawl_data(self) -> Tuple[Dict, Dict, List]:
- """执行数据爬取"""
- ids = []
- for platform in self.ctx.platforms:
- if "name" in platform:
- ids.append((platform["id"], platform["name"]))
- else:
- ids.append(platform["id"])
- print(
- f"配置的监控平台: {[p.get('name', p['id']) for p in self.ctx.platforms]}"
- )
- print(f"开始爬取数据,请求间隔 {self.request_interval} 毫秒")
- Path("output").mkdir(parents=True, exist_ok=True)
- results, id_to_name, failed_ids = self.data_fetcher.crawl_websites(
- ids, self.request_interval
- )
- # 转换为 NewsData 格式并保存到存储后端
- crawl_time = self.ctx.format_time()
- crawl_date = self.ctx.format_date()
- news_data = convert_crawl_results_to_news_data(
- results, id_to_name, failed_ids, crawl_time, crawl_date
- )
- # 保存到存储后端(SQLite)
- if self.storage_manager.save_news_data(news_data):
- print(f"数据已保存到存储后端: {self.storage_manager.backend_name}")
- # 保存 TXT 快照(如果启用)
- txt_file = self.storage_manager.save_txt_snapshot(news_data)
- if txt_file:
- print(f"TXT 快照已保存: {txt_file}")
- # 兼容:同时保存到原有 TXT 格式(确保向后兼容)
- if self.ctx.config["STORAGE"]["FORMATS"]["TXT"]:
- title_file = self.ctx.save_titles(results, id_to_name, failed_ids)
- print(f"标题已保存到: {title_file}")
- return results, id_to_name, failed_ids
- def _crawl_rss_data(self) -> Tuple[Optional[List[Dict]], Optional[List[Dict]]]:
- """
- 执行 RSS 数据抓取
- Returns:
- (rss_items, rss_new_items) 元组:
- - rss_items: 统计条目列表(按模式处理,用于统计区块)
- - rss_new_items: 新增条目列表(用于新增区块)
- 如果未启用或失败返回 (None, None)
- """
- if not self.ctx.rss_enabled:
- return None, None
- rss_feeds = self.ctx.rss_feeds
- if not rss_feeds:
- print("[RSS] 未配置任何 RSS 源")
- return None, None
- try:
- from trendradar.crawler.rss import RSSFetcher, RSSFeedConfig
- # 构建 RSS 源配置
- feeds = []
- for feed_config in rss_feeds:
- # 读取并验证单个 feed 的 max_age_days(可选)
- max_age_days_raw = feed_config.get("max_age_days")
- max_age_days = None
- if max_age_days_raw is not None:
- try:
- max_age_days = int(max_age_days_raw)
- if max_age_days < 0:
- feed_id = feed_config.get("id", "unknown")
- print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 为负数,将使用全局默认值")
- max_age_days = None
- except (ValueError, TypeError):
- feed_id = feed_config.get("id", "unknown")
- print(f"[警告] RSS feed '{feed_id}' 的 max_age_days 格式错误:{max_age_days_raw}")
- max_age_days = None
- feed = RSSFeedConfig(
- id=feed_config.get("id", ""),
- name=feed_config.get("name", ""),
- url=feed_config.get("url", ""),
- max_items=feed_config.get("max_items", 50),
- enabled=feed_config.get("enabled", True),
- max_age_days=max_age_days, # None=使用全局,0=禁用,>0=覆盖
- )
- if feed.id and feed.url and feed.enabled:
- feeds.append(feed)
- if not feeds:
- print("[RSS] 没有启用的 RSS 源")
- return None, None
- # 创建抓取器
- rss_config = self.ctx.rss_config
- # RSS 代理:优先使用 RSS 专属代理,否则使用爬虫默认代理
- rss_proxy_url = rss_config.get("PROXY_URL", "") or self.proxy_url or ""
- # 获取配置的时区
- timezone = self.ctx.config.get("TIMEZONE", "Asia/Shanghai")
- # 获取新鲜度过滤配置
- freshness_config = rss_config.get("FRESHNESS_FILTER", {})
- freshness_enabled = freshness_config.get("ENABLED", True)
- default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
- fetcher = RSSFetcher(
- feeds=feeds,
- request_interval=rss_config.get("REQUEST_INTERVAL", 2000),
- timeout=rss_config.get("TIMEOUT", 15),
- use_proxy=rss_config.get("USE_PROXY", False),
- proxy_url=rss_proxy_url,
- timezone=timezone,
- freshness_enabled=freshness_enabled,
- default_max_age_days=default_max_age_days,
- )
- # 抓取数据
- rss_data = fetcher.fetch_all()
- # 保存到存储后端
- if self.storage_manager.save_rss_data(rss_data):
- print(f"[RSS] 数据已保存到存储后端")
- # 处理 RSS 数据(按模式过滤)并返回用于合并推送
- return self._process_rss_data_by_mode(rss_data)
- else:
- print(f"[RSS] 数据保存失败")
- return None, None
- except ImportError as e:
- print(f"[RSS] 缺少依赖: {e}")
- print("[RSS] 请安装 feedparser: pip install feedparser")
- return None, None
- except Exception as e:
- print(f"[RSS] 抓取失败: {e}")
- return None, None
- def _process_rss_data_by_mode(self, rss_data) -> Tuple[Optional[List[Dict]], Optional[List[Dict]]]:
- """
- 按报告模式处理 RSS 数据,返回与热榜相同格式的统计结构
- 三种模式:
- - daily: 当日汇总,统计=当天所有条目,新增=本次新增条目
- - current: 当前榜单,统计=当前榜单条目,新增=本次新增条目
- - incremental: 增量模式,统计=新增条目,新增=无
- Args:
- rss_data: 当前抓取的 RSSData 对象
- Returns:
- (rss_stats, rss_new_stats) 元组:
- - rss_stats: RSS 关键词统计列表(与热榜 stats 格式一致)
- - rss_new_stats: RSS 新增关键词统计列表(与热榜 stats 格式一致)
- """
- from trendradar.core.analyzer import count_rss_frequency
- rss_config = self.ctx.rss_config
- # 检查是否启用 RSS 通知
- if not rss_config.get("NOTIFICATION", {}).get("ENABLED", False):
- return None, None
- # 加载关键词配置
- try:
- word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
- except FileNotFoundError:
- word_groups, filter_words, global_filters = [], [], []
- timezone = self.ctx.timezone
- max_news_per_keyword = self.ctx.config.get("MAX_NEWS_PER_KEYWORD", 0)
- sort_by_position_first = self.ctx.config.get("SORT_BY_POSITION_FIRST", False)
- rss_stats = None
- rss_new_stats = None
- # 1. 首先获取新增条目(所有模式都需要)
- new_items_dict = self.storage_manager.detect_new_rss_items(rss_data)
- new_items_list = None
- if new_items_dict:
- new_items_list = self._convert_rss_items_to_list(new_items_dict, rss_data.id_to_name)
- if new_items_list:
- print(f"[RSS] 检测到 {len(new_items_list)} 条新增")
- # 2. 根据模式获取统计条目
- if self.report_mode == "incremental":
- # 增量模式:统计条目就是新增条目
- if not new_items_list:
- print("[RSS] 增量模式:没有新增 RSS 条目")
- return None, None
- rss_stats, total = count_rss_frequency(
- rss_items=new_items_list,
- word_groups=word_groups,
- filter_words=filter_words,
- global_filters=global_filters,
- new_items=new_items_list, # 增量模式所有都是新增
- max_news_per_keyword=max_news_per_keyword,
- sort_by_position_first=sort_by_position_first,
- timezone=timezone,
- rank_threshold=self.rank_threshold,
- quiet=False,
- )
- if not rss_stats:
- print("[RSS] 增量模式:关键词匹配后没有内容")
- return None, None
- elif self.report_mode == "current":
- # 当前榜单模式:统计=当前榜单所有条目
- latest_data = self.storage_manager.get_latest_rss_data(rss_data.date)
- if not latest_data:
- print("[RSS] 当前榜单模式:没有 RSS 数据")
- return None, None
- all_items_list = self._convert_rss_items_to_list(latest_data.items, latest_data.id_to_name)
- rss_stats, total = count_rss_frequency(
- rss_items=all_items_list,
- word_groups=word_groups,
- filter_words=filter_words,
- global_filters=global_filters,
- new_items=new_items_list, # 标记新增
- max_news_per_keyword=max_news_per_keyword,
- sort_by_position_first=sort_by_position_first,
- timezone=timezone,
- rank_threshold=self.rank_threshold,
- quiet=False,
- )
- if not rss_stats:
- print("[RSS] 当前榜单模式:关键词匹配后没有内容")
- return None, None
- # 生成新增统计
- if new_items_list:
- rss_new_stats, _ = count_rss_frequency(
- rss_items=new_items_list,
- word_groups=word_groups,
- filter_words=filter_words,
- global_filters=global_filters,
- new_items=new_items_list,
- max_news_per_keyword=max_news_per_keyword,
- sort_by_position_first=sort_by_position_first,
- timezone=timezone,
- rank_threshold=self.rank_threshold,
- quiet=True,
- )
- else:
- # daily 模式:统计=当天所有条目
- all_data = self.storage_manager.get_rss_data(rss_data.date)
- if not all_data:
- print("[RSS] 当日汇总模式:没有 RSS 数据")
- return None, None
- all_items_list = self._convert_rss_items_to_list(all_data.items, all_data.id_to_name)
- rss_stats, total = count_rss_frequency(
- rss_items=all_items_list,
- word_groups=word_groups,
- filter_words=filter_words,
- global_filters=global_filters,
- new_items=new_items_list, # 标记新增
- max_news_per_keyword=max_news_per_keyword,
- sort_by_position_first=sort_by_position_first,
- timezone=timezone,
- rank_threshold=self.rank_threshold,
- quiet=False,
- )
- if not rss_stats:
- print("[RSS] 当日汇总模式:关键词匹配后没有内容")
- return None, None
- # 生成新增统计
- if new_items_list:
- rss_new_stats, _ = count_rss_frequency(
- rss_items=new_items_list,
- word_groups=word_groups,
- filter_words=filter_words,
- global_filters=global_filters,
- new_items=new_items_list,
- max_news_per_keyword=max_news_per_keyword,
- sort_by_position_first=sort_by_position_first,
- timezone=timezone,
- rank_threshold=self.rank_threshold,
- quiet=True,
- )
- return rss_stats, rss_new_stats
- def _convert_rss_items_to_list(self, items_dict: Dict, id_to_name: Dict) -> List[Dict]:
- """将 RSS 条目字典转换为列表格式,并应用新鲜度过滤(用于推送)"""
- rss_items = []
- filtered_count = 0
- # 获取新鲜度过滤配置
- rss_config = self.ctx.rss_config
- freshness_config = rss_config.get("FRESHNESS_FILTER", {})
- freshness_enabled = freshness_config.get("ENABLED", True)
- default_max_age_days = freshness_config.get("MAX_AGE_DAYS", 3)
- timezone = self.ctx.config.get("TIMEZONE", "Asia/Shanghai")
- # 构建 feed_id -> max_age_days 的映射
- feed_max_age_map = {}
- for feed_cfg in self.ctx.rss_feeds:
- feed_id = feed_cfg.get("id", "")
- max_age = feed_cfg.get("max_age_days")
- if max_age is not None:
- try:
- feed_max_age_map[feed_id] = int(max_age)
- except (ValueError, TypeError):
- pass
- for feed_id, items in items_dict.items():
- # 确定此 feed 的 max_age_days
- max_days = feed_max_age_map.get(feed_id)
- if max_days is None:
- max_days = default_max_age_days
- for item in items:
- # 应用新鲜度过滤(仅在启用时)
- if freshness_enabled and max_days > 0:
- if item.published_at and not is_within_days(item.published_at, max_days, timezone):
- filtered_count += 1
- continue # 跳过超过指定天数的文章
- rss_items.append({
- "title": item.title,
- "feed_id": feed_id,
- "feed_name": id_to_name.get(feed_id, feed_id),
- "url": item.url,
- "published_at": item.published_at,
- "summary": item.summary,
- "author": item.author,
- })
- # 输出过滤统计
- if filtered_count > 0:
- print(f"[RSS] 新鲜度过滤:跳过 {filtered_count} 篇超过指定天数的旧文章(仍保留在数据库中)")
- return rss_items
- def _filter_rss_by_keywords(self, rss_items: List[Dict]) -> List[Dict]:
- """使用 frequency_words.txt 过滤 RSS 条目"""
- try:
- word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
- if word_groups or filter_words or global_filters:
- from trendradar.core.frequency import matches_word_groups
- filtered_items = []
- for item in rss_items:
- title = item.get("title", "")
- if matches_word_groups(title, word_groups, filter_words, global_filters):
- filtered_items.append(item)
- original_count = len(rss_items)
- rss_items = filtered_items
- print(f"[RSS] 关键词过滤后剩余 {len(rss_items)}/{original_count} 条")
- if not rss_items:
- print("[RSS] 关键词过滤后没有匹配内容")
- return []
- except FileNotFoundError:
- # frequency_words.txt 不存在时跳过过滤
- pass
- return rss_items
- def _process_rss_report_and_notification(self, rss_data) -> None:
- """处理 RSS 报告生成和通知发送(独立推送,已废弃)"""
- # 此方法保留用于向后兼容,但不再使用
- # RSS 现在与热榜合并推送
- pass
- def _generate_rss_html_report(self, rss_items: list, feeds_info: dict) -> str:
- """生成 RSS HTML 报告"""
- try:
- from trendradar.report.rss_html import render_rss_html_content
- from pathlib import Path
- html_content = render_rss_html_content(
- rss_items=rss_items,
- total_count=len(rss_items),
- feeds_info=feeds_info,
- get_time_func=self.ctx.get_time,
- )
- # 保存 HTML 文件
- date_folder = self.ctx.format_date()
- time_filename = self.ctx.format_time()
- output_dir = Path("output") / date_folder / "html"
- output_dir.mkdir(parents=True, exist_ok=True)
- file_path = output_dir / f"rss_{time_filename}.html"
- with open(file_path, "w", encoding="utf-8") as f:
- f.write(html_content)
- print(f"[RSS] HTML 报告已生成: {file_path}")
- return str(file_path)
- except Exception as e:
- print(f"[RSS] 生成 HTML 报告失败: {e}")
- return None
- def _execute_mode_strategy(
- self, mode_strategy: Dict, results: Dict, id_to_name: Dict, failed_ids: List,
- rss_items: Optional[List[Dict]] = None,
- rss_new_items: Optional[List[Dict]] = None,
- ) -> Optional[str]:
- """执行模式特定逻辑,支持热榜+RSS合并推送"""
- # 获取当前监控平台ID列表
- current_platform_ids = self.ctx.platform_ids
- new_titles = self.ctx.detect_new_titles(current_platform_ids)
- time_info = self.ctx.format_time()
- if self.ctx.config["STORAGE"]["FORMATS"]["TXT"]:
- self.ctx.save_titles(results, id_to_name, failed_ids)
- word_groups, filter_words, global_filters = self.ctx.load_frequency_words()
- # current模式下,实时推送需要使用完整的历史数据来保证统计信息的完整性
- if self.report_mode == "current":
- # 加载完整的历史数据(已按当前平台过滤)
- analysis_data = self._load_analysis_data()
- if analysis_data:
- (
- all_results,
- historical_id_to_name,
- historical_title_info,
- historical_new_titles,
- _,
- _,
- _,
- ) = analysis_data
- print(
- f"current模式:使用过滤后的历史数据,包含平台:{list(all_results.keys())}"
- )
- stats, html_file = self._run_analysis_pipeline(
- all_results,
- self.report_mode,
- historical_title_info,
- historical_new_titles,
- word_groups,
- filter_words,
- historical_id_to_name,
- failed_ids=failed_ids,
- global_filters=global_filters,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- combined_id_to_name = {**historical_id_to_name, **id_to_name}
- if html_file:
- print(f"HTML报告已生成: {html_file}")
- # 发送实时通知(使用完整历史数据的统计结果,合并RSS)
- summary_html = None
- if mode_strategy["should_send_realtime"]:
- self._send_notification_if_needed(
- stats,
- mode_strategy["realtime_report_type"],
- self.report_mode,
- failed_ids=failed_ids,
- new_titles=historical_new_titles,
- id_to_name=combined_id_to_name,
- html_file_path=html_file,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- else:
- print("❌ 严重错误:无法读取刚保存的数据文件")
- raise RuntimeError("数据一致性检查失败:保存后立即读取失败")
- else:
- title_info = self._prepare_current_title_info(results, time_info)
- stats, html_file = self._run_analysis_pipeline(
- results,
- self.report_mode,
- title_info,
- new_titles,
- word_groups,
- filter_words,
- id_to_name,
- failed_ids=failed_ids,
- global_filters=global_filters,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- if html_file:
- print(f"HTML报告已生成: {html_file}")
- # 发送实时通知(如果需要,合并RSS)
- summary_html = None
- if mode_strategy["should_send_realtime"]:
- self._send_notification_if_needed(
- stats,
- mode_strategy["realtime_report_type"],
- self.report_mode,
- failed_ids=failed_ids,
- new_titles=new_titles,
- id_to_name=id_to_name,
- html_file_path=html_file,
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- # 生成汇总报告(如果需要)
- summary_html = None
- if mode_strategy["should_generate_summary"]:
- if mode_strategy["should_send_realtime"]:
- # 如果已经发送了实时通知,汇总只生成HTML不发送通知
- summary_html = self._generate_summary_html(
- mode_strategy["summary_mode"],
- rss_items=rss_items,
- rss_new_items=rss_new_items,
- )
- else:
- # daily模式:直接生成汇总报告并发送通知(合并RSS)
- summary_html = self._generate_summary_report(
- mode_strategy, rss_items=rss_items, rss_new_items=rss_new_items
- )
- # 打开浏览器(仅在非容器环境)
- if self._should_open_browser() and html_file:
- if summary_html:
- summary_url = "file://" + str(Path(summary_html).resolve())
- print(f"正在打开汇总报告: {summary_url}")
- webbrowser.open(summary_url)
- else:
- file_url = "file://" + str(Path(html_file).resolve())
- print(f"正在打开HTML报告: {file_url}")
- webbrowser.open(file_url)
- elif self.is_docker_container and html_file:
- if summary_html:
- print(f"汇总报告已生成(Docker环境): {summary_html}")
- else:
- print(f"HTML报告已生成(Docker环境): {html_file}")
- return summary_html
- def run(self) -> None:
- """执行分析流程"""
- try:
- self._initialize_and_check_config()
- mode_strategy = self._get_mode_strategy()
- # 抓取热榜数据
- results, id_to_name, failed_ids = self._crawl_data()
- # 抓取 RSS 数据(如果启用),返回统计条目和新增条目用于合并推送
- rss_items, rss_new_items = self._crawl_rss_data()
- # 执行模式策略,传递 RSS 数据用于合并推送
- self._execute_mode_strategy(
- mode_strategy, results, id_to_name, failed_ids,
- rss_items=rss_items, rss_new_items=rss_new_items
- )
- except Exception as e:
- print(f"分析流程执行出错: {e}")
- raise
- finally:
- # 清理资源(包括过期数据清理和数据库连接关闭)
- self.ctx.cleanup()
- def main():
- """主程序入口"""
- try:
- analyzer = NewsAnalyzer()
- analyzer.run()
- except FileNotFoundError as e:
- print(f"❌ 配置文件错误: {e}")
- print("\n请确保以下文件存在:")
- print(" • config/config.yaml")
- print(" • config/frequency_words.txt")
- print("\n参考项目文档进行正确配置")
- except Exception as e:
- print(f"❌ 程序运行错误: {e}")
- raise
- if __name__ == "__main__":
- main()
|