""" 系统管理工具 实现系统状态查询和爬虫触发功能。 """ from pathlib import Path from typing import Dict, List, Optional from ..services.data_service import DataService from ..utils.validators import validate_platforms from ..utils.errors import MCPError, CrawlTaskError class SystemManagementTools: """系统管理工具类""" def __init__(self, project_root: str = None): """ 初始化系统管理工具 Args: project_root: 项目根目录 """ self.data_service = DataService(project_root) if project_root: self.project_root = Path(project_root) else: # 获取项目根目录 current_file = Path(__file__) self.project_root = current_file.parent.parent.parent def get_system_status(self) -> Dict: """ 获取系统运行状态和健康检查信息 Returns: 系统状态字典 Example: >>> tools = SystemManagementTools() >>> result = tools.get_system_status() >>> print(result['system']['version']) """ try: # 获取系统状态 status = self.data_service.get_system_status() return { "success": True, "summary": { "description": "系统运行状态和健康检查信息" }, "data": status } except MCPError as e: return { "success": False, "error": e.to_dict() } except Exception as e: return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e) } } def trigger_crawl(self, platforms: Optional[List[str]] = None, save_to_local: bool = False, include_url: bool = False) -> Dict: """ 手动触发一次临时爬取任务(可选持久化) Args: platforms: 指定平台列表,为空则爬取所有平台 save_to_local: 是否保存到本地 output 目录,默认 False include_url: 是否包含URL链接,默认False(节省token) Returns: 爬取结果字典,包含新闻数据和保存路径(如果保存) Example: >>> tools = SystemManagementTools() >>> # 临时爬取,不保存 >>> result = tools.trigger_crawl(platforms=['zhihu', 'weibo']) >>> print(result['data']) >>> # 爬取并保存到本地 >>> result = tools.trigger_crawl(platforms=['zhihu'], save_to_local=True) >>> print(result['saved_files']) """ try: import time import yaml from trendradar.crawler.fetcher import DataFetcher from trendradar.storage.local import LocalStorageBackend from trendradar.storage.base import convert_crawl_results_to_news_data from trendradar.utils.time import get_configured_time, format_date_folder, format_time_filename from ..services.cache_service import get_cache # 参数验证 platforms = validate_platforms(platforms) # 加载配置文件 config_path = self.project_root / "config" / "config.yaml" if not config_path.exists(): raise CrawlTaskError( "配置文件不存在", suggestion=f"请确保配置文件存在: {config_path}" ) # 读取配置 with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) # 获取平台配置(嵌套结构:{enabled: bool, sources: [...]}) platforms_config = config_data.get("platforms", {}) if not platforms_config.get("enabled", True): raise CrawlTaskError( "热榜平台已禁用", suggestion="请检查 config/config.yaml 中的 platforms.enabled 配置" ) all_platforms = platforms_config.get("sources", []) if not all_platforms: raise CrawlTaskError( "配置文件中没有平台配置", suggestion="请检查 config/config.yaml 中的 platforms.sources 配置" ) # 过滤平台 if platforms: target_platforms = [p for p in all_platforms if p["id"] in platforms] if not target_platforms: raise CrawlTaskError( f"指定的平台不存在: {platforms}", suggestion=f"可用平台: {[p['id'] for p in all_platforms]}" ) else: target_platforms = all_platforms # 构建平台ID列表 ids = [] for platform in target_platforms: if "name" in platform: ids.append((platform["id"], platform["name"])) else: ids.append(platform["id"]) print(f"开始临时爬取,平台: {[p.get('name', p['id']) for p in target_platforms]}") # 初始化数据获取器 advanced = config_data.get("advanced", {}) crawler_config = advanced.get("crawler", {}) proxy_url = None if crawler_config.get("use_proxy"): proxy_url = crawler_config.get("default_proxy") fetcher = DataFetcher(proxy_url=proxy_url) request_interval = crawler_config.get("request_interval", 100) # 执行爬取 results, id_to_name, failed_ids = fetcher.crawl_websites( ids_list=ids, request_interval=request_interval ) # 获取当前时间(统一使用 trendradar 的时间工具) # 从配置中读取时区,默认为 Asia/Shanghai timezone = config_data.get("app", {}).get("timezone", "Asia/Shanghai") current_time = get_configured_time(timezone) crawl_date = format_date_folder(None, timezone) crawl_time_str = format_time_filename(timezone) # 转换为标准数据模型 news_data = convert_crawl_results_to_news_data( results=results, id_to_name=id_to_name, failed_ids=failed_ids, crawl_time=crawl_time_str, crawl_date=crawl_date ) # 初始化存储后端 storage = LocalStorageBackend( data_dir=str(self.project_root / "output"), enable_txt=True, enable_html=True, timezone=timezone ) # 尝试持久化数据 save_success = False save_error_msg = "" saved_files = {} try: # 1. 保存到 SQLite (核心持久化) if storage.save_news_data(news_data): save_success = True # 2. 如果请求保存到本地,生成 TXT/HTML 快照 if save_to_local: # 保存 TXT txt_path = storage.save_txt_snapshot(news_data) if txt_path: saved_files["txt"] = txt_path # 保存 HTML (使用简化版生成器) html_content = self._generate_simple_html(results, id_to_name, failed_ids, current_time) html_filename = f"{crawl_time_str}.html" html_path = storage.save_html_report(html_content, html_filename) if html_path: saved_files["html"] = html_path except Exception as e: # 捕获所有保存错误(特别是 Docker 只读卷导致的 PermissionError) print(f"[System] 数据保存失败: {e}") save_success = False save_error_msg = str(e) # 3. 清除缓存,确保下次查询获取最新数据 # 即使保存失败,内存中的数据可能已经通过其他方式更新,或者是临时的 get_cache().clear() print("[System] 缓存已清除") # 构建返回结果 news_response_data = [] for platform_id, titles_data in results.items(): platform_name = id_to_name.get(platform_id, platform_id) for title, info in titles_data.items(): news_item = { "platform_id": platform_id, "platform_name": platform_name, "title": title, "ranks": info.get("ranks", []) } if include_url: news_item["url"] = info.get("url", "") news_item["mobile_url"] = info.get("mobileUrl", "") news_response_data.append(news_item) result = { "success": True, "summary": { "description": "爬取任务执行结果", "task_id": f"crawl_{int(time.time())}", "status": "completed", "crawl_time": current_time.strftime("%Y-%m-%d %H:%M:%S"), "total_news": len(news_response_data), "platforms": list(results.keys()), "failed_platforms": failed_ids, "saved_to_local": save_success and save_to_local }, "data": news_response_data } if save_success: if save_to_local: result["saved_files"] = saved_files result["note"] = "数据已保存到 SQLite 数据库及 output 文件夹" else: result["note"] = "数据已保存到 SQLite 数据库 (仅内存中返回结果,未生成TXT快照)" else: # 明确告知用户保存失败 result["saved_to_local"] = False result["save_error"] = save_error_msg if "Read-only file system" in save_error_msg or "Permission denied" in save_error_msg: result["note"] = "爬取成功,但无法写入数据库(Docker只读模式)。数据仅在本次返回中有效。" else: result["note"] = f"爬取成功但保存失败: {save_error_msg}" # 清理资源 storage.cleanup() return result except MCPError as e: return { "success": False, "error": e.to_dict() } except Exception as e: import traceback return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e), "traceback": traceback.format_exc() } } def _generate_simple_html(self, results: Dict, id_to_name: Dict, failed_ids: List, now) -> str: """生成简化的 HTML 报告""" html = """ MCP 爬取结果

MCP 爬取结果

""" # 添加时间戳 html += f'

爬取时间: {now.strftime("%Y-%m-%d %H:%M:%S")}

\n\n' # 遍历每个平台 for platform_id, titles_data in results.items(): platform_name = id_to_name.get(platform_id, platform_id) html += f'
\n' html += f'
{platform_name}
\n' # 排序标题 sorted_items = [] for title, info in titles_data.items(): ranks = info.get("ranks", []) url = info.get("url", "") mobile_url = info.get("mobileUrl", "") rank = ranks[0] if ranks else 999 sorted_items.append((rank, title, url, mobile_url)) sorted_items.sort(key=lambda x: x[0]) # 显示新闻 for rank, title, url, mobile_url in sorted_items: html += f'
\n' html += f' {rank}.\n' html += f' {self._html_escape(title)}\n' if url: html += f' 链接\n' if mobile_url and mobile_url != url: html += f' 移动版\n' html += '
\n' html += '
\n\n' # 失败的平台 if failed_ids: html += '
\n' html += '

请求失败的平台

\n' html += ' \n' html += '
\n' html += """
""" return html def _html_escape(self, text: str) -> str: """HTML 转义""" if not isinstance(text, str): text = str(text) return ( text.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) .replace("'", "'") ) def check_version(self, proxy_url: Optional[str] = None) -> Dict: """ 检查版本更新 同时检查 TrendRadar 和 MCP Server 两个组件的版本更新。 远程版本 URL 从 config.yaml 获取: - version_check_url: TrendRadar 版本 - mcp_version_check_url: MCP Server 版本 Args: proxy_url: 可选的代理URL,用于访问远程版本 Returns: 版本检查结果字典,包含: - success: 是否成功 - trendradar: TrendRadar 版本检查结果 - mcp: MCP Server 版本检查结果 - any_update: 是否有任何组件需要更新 Example: >>> tools = SystemManagementTools() >>> result = tools.check_version() >>> print(result['data']['any_update']) """ import yaml import requests def parse_version(version_str: str): """将版本号字符串解析为元组""" try: parts = version_str.strip().split(".") if len(parts) != 3: raise ValueError("版本号格式不正确") return int(parts[0]), int(parts[1]), int(parts[2]) except: return 0, 0, 0 def check_single_version( name: str, local_version: str, remote_url: str, proxies: Optional[Dict], headers: Dict ) -> Dict: """检查单个组件的版本""" try: response = requests.get( remote_url, proxies=proxies, headers=headers, timeout=10 ) response.raise_for_status() remote_version = response.text.strip() local_tuple = parse_version(local_version) remote_tuple = parse_version(remote_version) need_update = local_tuple < remote_tuple if need_update: message = f"发现新版本 {remote_version},当前版本 {local_version},建议更新" elif local_tuple > remote_tuple: message = f"当前版本 {local_version} 高于远程版本 {remote_version}(可能是开发版本)" else: message = f"当前版本 {local_version} 已是最新版本" return { "success": True, "name": name, "current_version": local_version, "remote_version": remote_version, "need_update": need_update, "current_parsed": list(local_tuple), "remote_parsed": list(remote_tuple), "message": message } except requests.exceptions.Timeout: return { "success": False, "name": name, "current_version": local_version, "error": "获取远程版本超时" } except requests.exceptions.RequestException as e: return { "success": False, "name": name, "current_version": local_version, "error": f"网络请求失败: {str(e)}" } except Exception as e: return { "success": False, "name": name, "current_version": local_version, "error": str(e) } try: # 导入本地版本 from trendradar import __version__ as trendradar_version from mcp_server import __version__ as mcp_version # 从配置文件获取远程版本 URL config_path = self.project_root / "config" / "config.yaml" if not config_path.exists(): return { "success": False, "error": { "code": "CONFIG_NOT_FOUND", "message": f"配置文件不存在: {config_path}" } } with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) advanced_config = config_data.get("advanced", {}) trendradar_url = advanced_config.get( "version_check_url", "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version" ) mcp_url = advanced_config.get( "mcp_version_check_url", "https://raw.githubusercontent.com/sansan0/TrendRadar/refs/heads/master/version_mcp" ) # 配置代理 proxies = None if proxy_url: proxies = {"http": proxy_url, "https": proxy_url} # 请求头 headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/plain, */*", "Cache-Control": "no-cache", } # 检查两个版本 trendradar_result = check_single_version( "TrendRadar", trendradar_version, trendradar_url, proxies, headers ) mcp_result = check_single_version( "MCP Server", mcp_version, mcp_url, proxies, headers ) # 判断是否有任何更新 any_update = ( (trendradar_result.get("success") and trendradar_result.get("need_update", False)) or (mcp_result.get("success") and mcp_result.get("need_update", False)) ) return { "success": True, "summary": { "description": "版本检查结果(TrendRadar + MCP Server)", "any_update": any_update }, "data": { "trendradar": trendradar_result, "mcp": mcp_result, "any_update": any_update } } except ImportError as e: return { "success": False, "error": { "code": "IMPORT_ERROR", "message": f"无法导入版本信息: {str(e)}" } } except Exception as e: return { "success": False, "error": { "code": "INTERNAL_ERROR", "message": str(e) } }