""" 系统管理工具 实现系统状态查询和爬虫触发功能。 """ from pathlib import Path from typing import Dict, List, Optional from ..services.data_service import DataService from ..utils.validators import validate_platforms from ..utils.errors import MCPError, CrawlTaskError class SystemManagementTools: """系统管理工具类""" def __init__(self, project_root: str = None): """ 初始化系统管理工具 Args: project_root: 项目根目录 """ self.data_service = DataService(project_root) if project_root: self.project_root = Path(project_root) else: # 获取项目根目录 current_file = Path(__file__) self.project_root = current_file.parent.parent.parent def get_system_status(self) -> Dict: """ 获取系统运行状态和健康检查信息 Returns: 系统状态字典 Example: >>> tools = SystemManagementTools() >>> result = tools.get_system_status() >>> print(result['system']['version']) """ try: # 获取系统状态 status = self.data_service.get_system_status() return { "success": True, "summary": { "description": "系统运行状态和健康检查信息" }, "data": status } except MCPError as e: return { "success": False, "error": e.to_dict() } except Exception as e: return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e) } } def _load_crawl_config(self): """加载爬取配置,返回 (config_data, target_platforms_config)""" import yaml config_path = self.project_root / "config" / "config.yaml" if not config_path.exists(): raise CrawlTaskError( "配置文件不存在", suggestion=f"请确保配置文件存在: {config_path}" ) with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) platforms_config = config_data.get("platforms", {}) if not platforms_config.get("enabled", True): raise CrawlTaskError( "热榜平台已禁用", suggestion="请检查 config/config.yaml 中的 platforms.enabled 配置" ) all_platforms = [p for p in platforms_config.get("sources", []) if p.get("enabled", True)] if not all_platforms: raise CrawlTaskError( "配置文件中没有平台配置", suggestion="请检查 config/config.yaml 中的 platforms.sources 配置" ) return config_data, all_platforms def _resolve_target_platforms(self, all_platforms: list, platforms: Optional[List[str]]): """根据用户指定的平台列表过滤,返回 (target_platforms, ids_list)""" if platforms: target_platforms = [p for p in all_platforms if p["id"] in platforms] if not target_platforms: raise CrawlTaskError( f"指定的平台不存在: {platforms}", suggestion=f"可用平台: {[p['id'] for p in all_platforms]}" ) else: target_platforms = all_platforms ids = [] for platform in target_platforms: if "name" in platform: ids.append((platform["id"], platform["name"])) else: ids.append(platform["id"]) return target_platforms, ids def _persist_crawl_data(self, storage, news_data, save_to_local, results, id_to_name, failed_ids, current_time, crawl_time_str): """持久化爬取数据,返回 (save_success, save_error_msg, saved_files)""" save_success = False save_error_msg = "" saved_files = {} try: if storage.save_news_data(news_data): save_success = True if save_to_local: txt_path = storage.save_txt_snapshot(news_data) if txt_path: saved_files["txt"] = txt_path html_content = self._generate_simple_html(results, id_to_name, failed_ids, current_time) html_filename = f"{crawl_time_str}.html" html_path = storage.save_html_report(html_content, html_filename) if html_path: saved_files["html"] = html_path except Exception as e: print(f"[System] 数据保存失败: {e}") save_success = False save_error_msg = str(e) return save_success, save_error_msg, saved_files def _build_crawl_response(self, results, id_to_name, failed_ids, current_time, include_url, save_success, save_to_local, save_error_msg, saved_files): """构建爬取结果响应字典""" import time news_response_data = [] for platform_id, titles_data in results.items(): platform_name = id_to_name.get(platform_id, platform_id) for title, info in titles_data.items(): news_item = { "platform_id": platform_id, "platform_name": platform_name, "title": title, "ranks": info.get("ranks", []) } if include_url: news_item["url"] = info.get("url", "") news_item["mobile_url"] = info.get("mobileUrl", "") news_response_data.append(news_item) result = { "success": True, "summary": { "description": "爬取任务执行结果", "task_id": f"crawl_{int(time.time())}", "status": "completed", "crawl_time": current_time.strftime("%Y-%m-%d %H:%M:%S"), "total_news": len(news_response_data), "platforms": list(results.keys()), "failed_platforms": failed_ids, "saved_to_local": save_success and save_to_local }, "data": news_response_data } if save_success: if save_to_local: result["saved_files"] = saved_files result["note"] = "数据已保存到 SQLite 数据库及 output 文件夹" else: result["note"] = "数据已保存到 SQLite 数据库 (仅内存中返回结果,未生成TXT快照)" else: result["saved_to_local"] = False result["save_error"] = save_error_msg if "Read-only file system" in save_error_msg or "Permission denied" in save_error_msg: result["note"] = "爬取成功,但无法写入数据库(Docker只读模式)。数据仅在本次返回中有效。" else: result["note"] = f"爬取成功但保存失败: {save_error_msg}" return result def trigger_crawl(self, platforms: Optional[List[str]] = None, save_to_local: bool = False, include_url: bool = False) -> Dict: """ 手动触发一次临时爬取任务(可选持久化) Args: platforms: 指定平台列表,为空则爬取所有平台 save_to_local: 是否保存到本地 output 目录,默认 False include_url: 是否包含URL链接,默认False(节省token) Returns: 爬取结果字典,包含新闻数据和保存路径(如果保存) """ try: from trendradar.crawler.fetcher import DataFetcher from trendradar.storage.local import LocalStorageBackend from trendradar.storage.base import convert_crawl_results_to_news_data from trendradar.utils.time import get_configured_time, format_date_folder, format_time_filename from ..services.cache_service import get_cache platforms = validate_platforms(platforms) # 1. 加载配置 config_data, all_platforms = self._load_crawl_config() target_platforms, ids = self._resolve_target_platforms(all_platforms, platforms) print(f"开始临时爬取,平台: {[p.get('name', p['id']) for p in target_platforms]}") # 2. 执行爬取 advanced = config_data.get("advanced", {}) crawler_config = advanced.get("crawler", {}) proxy_url = crawler_config.get("default_proxy") if crawler_config.get("use_proxy") else None fetcher = DataFetcher(proxy_url=proxy_url) results, id_to_name, failed_ids = fetcher.crawl_websites( ids_list=ids, request_interval=crawler_config.get("request_interval", 100) ) # 3. 转换与持久化 timezone = config_data.get("app", {}).get("timezone", "Asia/Shanghai") current_time = get_configured_time(timezone) crawl_date = format_date_folder(None, timezone) crawl_time_str = format_time_filename(timezone) news_data = convert_crawl_results_to_news_data( results=results, id_to_name=id_to_name, failed_ids=failed_ids, crawl_time=crawl_time_str, crawl_date=crawl_date ) storage = LocalStorageBackend( data_dir=str(self.project_root / "output"), enable_txt=True, enable_html=True, timezone=timezone ) try: save_success, save_error_msg, saved_files = self._persist_crawl_data( storage, news_data, save_to_local, results, id_to_name, failed_ids, current_time, crawl_time_str ) finally: get_cache().clear() print("[System] 缓存已清除") storage.cleanup() # 4. 构建响应 return self._build_crawl_response( results, id_to_name, failed_ids, current_time, include_url, save_success, save_to_local, save_error_msg, saved_files ) except MCPError as e: return {"success": False, "error": e.to_dict()} except Exception as e: import traceback return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e), "traceback": traceback.format_exc() } } def _generate_simple_html(self, results: Dict, id_to_name: Dict, failed_ids: List, now) -> str: """生成简化的 HTML 报告""" html = """ MCP 爬取结果

MCP 爬取结果

""" # 添加时间戳 html += f'

爬取时间: {now.strftime("%Y-%m-%d %H:%M:%S")}

\n\n' # 遍历每个平台 for platform_id, titles_data in results.items(): platform_name = id_to_name.get(platform_id, platform_id) html += f'
\n' html += f'
{platform_name}
\n' # 排序标题 sorted_items = [] for title, info in titles_data.items(): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") rank = ranks[0] if ranks else 999 sorted_items.append((rank, title, url, mobile_url)) sorted_items.sort(key=lambda x: x[0]) # 显示新闻 for rank, title, url, mobile_url in sorted_items: html += f'
\n' html += f' {rank}.\n' html += f' {self._html_escape(title)}\n' if url: html += f' 链接\n' if mobile_url and mobile_url != url: html += f' 移动版\n' html += '
\n' html += '
\n\n' # 失败的平台 if failed_ids: html += '
\n' html += '

请求失败的平台

\n' html += ' \n' html += '
\n' html += """
""" return html def _html_escape(self, text: str) -> str: """HTML 转义""" if not isinstance(text, str): text = str(text) return ( text.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) def check_version(self, proxy_url: Optional[str] = None) -> Dict: """ 检查版本更新 同时检查 TrendRadar 和 MCP Server 两个组件的版本更新。 远程版本 URL 从 config.yaml 获取: - version_check_url: TrendRadar 版本 - mcp_version_check_url: MCP Server 版本 Args: proxy_url: 可选的代理URL,用于访问远程版本 Returns: 版本检查结果字典,包含: - success: 是否成功 - trendradar: TrendRadar 版本检查结果 - mcp: MCP Server 版本检查结果 - any_update: 是否有任何组件需要更新 Example: >>> tools = SystemManagementTools() >>> result = tools.check_version() >>> print(result['data']['any_update']) """ import yaml import requests def parse_version(version_str: str): """将版本号字符串解析为元组""" try: parts = version_str.strip().split(".") if len(parts) != 3: raise ValueError("版本号格式不正确") return int(parts[0]), int(parts[1]), int(parts[2]) except (ValueError, AttributeError, TypeError): return 0, 0, 0 def check_single_version( name: str, local_version: str, remote_url: str, proxies: Optional[Dict], headers: Dict ) -> Dict: """检查单个组件的版本""" try: response = requests.get( remote_url, proxies=proxies, headers=headers, timeout=10 ) response.raise_for_status() remote_version = response.text.strip() local_tuple = parse_version(local_version) remote_tuple = parse_version(remote_version) need_update = local_tuple < remote_tuple if need_update: message = f"发现新版本 {remote_version},当前版本 {local_version},建议更新" elif local_tuple > remote_tuple: message = f"当前版本 {local_version} 高于远程版本 {remote_version}(可能是开发版本)" else: message = f"当前版本 {local_version} 已是最新版本" return { "success": True, "name": name, "current_version": local_version, "remote_version": remote_version, "need_update": need_update, "current_parsed": list(local_tuple), "remote_parsed": list(remote_tuple), "message": message } except requests.exceptions.Timeout: return { "success": False, "name": name, "current_version": local_version, "error": "获取远程版本超时" } except requests.exceptions.RequestException as e: return { "success": False, "name": name, "current_version": local_version, "error": f"网络请求失败: {str(e)}" } except Exception as e: return { "success": False, "name": name, "current_version": local_version, "error": str(e) } try: # 导入本地版本 from trendradar import __version__ as trendradar_version from mcp_server import __version__ as mcp_version # 从配置文件获取远程版本 URL config_path = self.project_root / "config" / "config.yaml" if not config_path.exists(): return { "success": False, "error": { "code": "CONFIG_NOT_FOUND", "message": f"配置文件不存在: {config_path}" } } with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) advanced_config = config_data.get("advanced", {}) trendradar_url = advanced_config.get( "version_check_url", "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version" ) mcp_url = advanced_config.get( "mcp_version_check_url", "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version_mcp" ) # 配置代理 proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 请求头 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/plain, */*", "Cache-Control": "no-cache", } # 检查两个版本 trendradar_result = check_single_version( "TrendRadar", trendradar_version, trendradar_url, proxies, headers ) mcp_result = check_single_version( "MCP Server", mcp_version, mcp_url, proxies, headers ) # 判断是否有任何更新 any_update = ( (trendradar_result.get("success") and trendradar_result.get("need_update", False)) or (mcp_result.get("success") and mcp_result.get("need_update", False)) ) return { "success": True, "summary": { "description": "版本检查结果(TrendRadar + MCP Server)", "any_update": any_update }, "data": { "trendradar": trendradar_result, "mcp": mcp_result, "any_update": any_update } } except ImportError as e: return { "success": False, "error": { "code": "IMPORT_ERROR", "message": f"无法导入版本信息: {str(e)}" } } except Exception as e: return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e) } }