| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 |
- """
- 系统管理工具
- 实现系统状态查询和爬虫触发功能。
- """
- from pathlib import Path
- from typing import Dict, List, Optional
- from ..services.data_service import DataService
- from ..utils.validators import validate_platforms
- from ..utils.errors import MCPError, CrawlTaskError
- class SystemManagementTools:
- """系统管理工具类"""
- def __init__(self, project_root: str = None):
- """
- 初始化系统管理工具
- Args:
- project_root: 项目根目录
- """
- self.data_service = DataService(project_root)
- if project_root:
- self.project_root = Path(project_root)
- else:
- # 获取项目根目录
- current_file = Path(__file__)
- self.project_root = current_file.parent.parent.parent
- def get_system_status(self) -> Dict:
- """
- 获取系统运行状态和健康检查信息
- Returns:
- 系统状态字典
- Example:
- >>> tools = SystemManagementTools()
- >>> result = tools.get_system_status()
- >>> print(result['system']['version'])
- """
- try:
- # 获取系统状态
- status = self.data_service.get_system_status()
- return {
- **status,
- "success": True
- }
- except MCPError as e:
- return {
- "success": False,
- "error": e.to_dict()
- }
- except Exception as e:
- return {
- "success": False,
- "error": {
- "code": "INTERNAL_ERROR",
- "message": str(e)
- }
- }
- def trigger_crawl(self, platforms: Optional[List[str]] = None, save_to_local: bool = False, include_url: bool = False) -> Dict:
- """
- 手动触发一次临时爬取任务(可选持久化)
- Args:
- platforms: 指定平台列表,为空则爬取所有平台
- save_to_local: 是否保存到本地 output 目录,默认 False
- include_url: 是否包含URL链接,默认False(节省token)
- Returns:
- 爬取结果字典,包含新闻数据和保存路径(如果保存)
- Example:
- >>> tools = SystemManagementTools()
- >>> # 临时爬取,不保存
- >>> result = tools.trigger_crawl(platforms=['zhihu', 'weibo'])
- >>> print(result['data'])
- >>> # 爬取并保存到本地
- >>> result = tools.trigger_crawl(platforms=['zhihu'], save_to_local=True)
- >>> print(result['saved_files'])
- """
- try:
- import json
- import time
- import random
- import requests
- from datetime import datetime
- import pytz
- import yaml
- # 参数验证
- platforms = validate_platforms(platforms)
- # 加载配置文件
- config_path = self.project_root / "config" / "config.yaml"
- if not config_path.exists():
- raise CrawlTaskError(
- "配置文件不存在",
- suggestion=f"请确保配置文件存在: {config_path}"
- )
- # 读取配置
- with open(config_path, "r", encoding="utf-8") as f:
- config_data = yaml.safe_load(f)
- # 获取平台配置
- all_platforms = config_data.get("platforms", [])
- if not all_platforms:
- raise CrawlTaskError(
- "配置文件中没有平台配置",
- suggestion="请检查 config/config.yaml 中的 platforms 配置"
- )
- # 过滤平台
- if platforms:
- target_platforms = [p for p in all_platforms if p["id"] in platforms]
- if not target_platforms:
- raise CrawlTaskError(
- f"指定的平台不存在: {platforms}",
- suggestion=f"可用平台: {[p['id'] for p in all_platforms]}"
- )
- else:
- target_platforms = all_platforms
- # 获取请求间隔
- request_interval = config_data.get("crawler", {}).get("request_interval", 100)
- # 构建平台ID列表
- ids = []
- for platform in target_platforms:
- if "name" in platform:
- ids.append((platform["id"], platform["name"]))
- else:
- ids.append(platform["id"])
- print(f"开始临时爬取,平台: {[p.get('name', p['id']) for p in target_platforms]}")
- # 爬取数据
- results = {}
- id_to_name = {}
- failed_ids = []
- for i, id_info in enumerate(ids):
- if isinstance(id_info, tuple):
- id_value, name = id_info
- else:
- id_value = id_info
- name = id_value
- id_to_name[id_value] = name
- # 构建请求URL
- url = f"https://newsnow.busiyi.world/api/s?id={id_value}&latest"
- headers = {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Connection": "keep-alive",
- "Cache-Control": "no-cache",
- }
- # 重试机制
- max_retries = 2
- retries = 0
- success = False
- while retries <= max_retries and not success:
- try:
- response = requests.get(url, headers=headers, timeout=10)
- response.raise_for_status()
- data_text = response.text
- data_json = json.loads(data_text)
- status = data_json.get("status", "未知")
- if status not in ["success", "cache"]:
- raise ValueError(f"响应状态异常: {status}")
- status_info = "最新数据" if status == "success" else "缓存数据"
- print(f"获取 {id_value} 成功({status_info})")
- # 解析数据
- results[id_value] = {}
- for index, item in enumerate(data_json.get("items", []), 1):
- title = item["title"]
- url_link = item.get("url", "")
- mobile_url = item.get("mobileUrl", "")
- if title in results[id_value]:
- results[id_value][title]["ranks"].append(index)
- else:
- results[id_value][title] = {
- "ranks": [index],
- "url": url_link,
- "mobileUrl": mobile_url,
- }
- success = True
- except Exception as e:
- retries += 1
- if retries <= max_retries:
- wait_time = random.uniform(3, 5)
- print(f"请求 {id_value} 失败: {e}. {wait_time:.2f}秒后重试...")
- time.sleep(wait_time)
- else:
- print(f"请求 {id_value} 失败: {e}")
- failed_ids.append(id_value)
- # 请求间隔
- if i < len(ids) - 1:
- actual_interval = request_interval + random.randint(-10, 20)
- actual_interval = max(50, actual_interval)
- time.sleep(actual_interval / 1000)
- # 格式化返回数据
- news_data = []
- for platform_id, titles_data in results.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- for title, info in titles_data.items():
- news_item = {
- "platform_id": platform_id,
- "platform_name": platform_name,
- "title": title,
- "ranks": info["ranks"]
- }
- # 条件性添加 URL 字段
- if include_url:
- news_item["url"] = info.get("url", "")
- news_item["mobile_url"] = info.get("mobileUrl", "")
- news_data.append(news_item)
- # 获取北京时间
- beijing_tz = pytz.timezone("Asia/Shanghai")
- now = datetime.now(beijing_tz)
- # 构建返回结果
- result = {
- "success": True,
- "task_id": f"crawl_{int(time.time())}",
- "status": "completed",
- "crawl_time": now.strftime("%Y-%m-%d %H:%M:%S"),
- "platforms": list(results.keys()),
- "total_news": len(news_data),
- "failed_platforms": failed_ids,
- "data": news_data,
- "saved_to_local": save_to_local
- }
- # 如果需要持久化,调用保存逻辑
- if save_to_local:
- try:
- import re
- # 辅助函数:清理标题
- def clean_title(title: str) -> str:
- """清理标题中的特殊字符"""
- if not isinstance(title, str):
- title = str(title)
- cleaned_title = title.replace("\n", " ").replace("\r", " ")
- cleaned_title = re.sub(r"\s+", " ", cleaned_title)
- cleaned_title = cleaned_title.strip()
- return cleaned_title
- # 辅助函数:创建目录
- def ensure_directory_exists(directory: str):
- """确保目录存在"""
- Path(directory).mkdir(parents=True, exist_ok=True)
- # 格式化日期和时间
- date_folder = now.strftime("%Y年%m月%d日")
- time_filename = now.strftime("%H时%M分")
- # 创建 txt 文件路径
- txt_dir = self.project_root / "output" / date_folder / "txt"
- ensure_directory_exists(str(txt_dir))
- txt_file_path = txt_dir / f"{time_filename}.txt"
- # 创建 html 文件路径
- html_dir = self.project_root / "output" / date_folder / "html"
- ensure_directory_exists(str(html_dir))
- html_file_path = html_dir / f"{time_filename}.html"
- # 保存 txt 文件(按照 main.py 的格式)
- with open(txt_file_path, "w", encoding="utf-8") as f:
- for id_value, title_data in results.items():
- # id | name 或 id
- name = id_to_name.get(id_value)
- if name and name != id_value:
- f.write(f"{id_value} | {name}\n")
- else:
- f.write(f"{id_value}\n")
- # 按排名排序标题
- sorted_titles = []
- for title, info in title_data.items():
- cleaned = clean_title(title)
- if isinstance(info, dict):
- ranks = info.get("ranks", [])
- url = info.get("url", "")
- mobile_url = info.get("mobileUrl", "")
- else:
- ranks = info if isinstance(info, list) else []
- url = ""
- mobile_url = ""
- rank = ranks[0] if ranks else 1
- sorted_titles.append((rank, cleaned, url, mobile_url))
- sorted_titles.sort(key=lambda x: x[0])
- for rank, cleaned, url, mobile_url in sorted_titles:
- line = f"{rank}. {cleaned}"
- if url:
- line += f" [URL:{url}]"
- if mobile_url:
- line += f" [MOBILE:{mobile_url}]"
- f.write(line + "\n")
- f.write("\n")
- if failed_ids:
- f.write("==== 以下ID请求失败 ====\n")
- for id_value in failed_ids:
- f.write(f"{id_value}\n")
- # 保存 html 文件(简化版)
- html_content = self._generate_simple_html(results, id_to_name, failed_ids, now)
- with open(html_file_path, "w", encoding="utf-8") as f:
- f.write(html_content)
- print(f"数据已保存到:")
- print(f" TXT: {txt_file_path}")
- print(f" HTML: {html_file_path}")
- result["saved_files"] = {
- "txt": str(txt_file_path),
- "html": str(html_file_path)
- }
- result["note"] = "数据已持久化到 output 文件夹"
- except Exception as e:
- print(f"保存文件失败: {e}")
- result["save_error"] = str(e)
- result["note"] = "爬取成功但保存失败,数据仅在内存中"
- else:
- result["note"] = "临时爬取结果,未持久化到output文件夹"
- return result
- except MCPError as e:
- return {
- "success": False,
- "error": e.to_dict()
- }
- except Exception as e:
- import traceback
- return {
- "success": False,
- "error": {
- "code": "INTERNAL_ERROR",
- "message": str(e),
- "traceback": traceback.format_exc()
- }
- }
- def _generate_simple_html(self, results: Dict, id_to_name: Dict, failed_ids: List, now) -> str:
- """生成简化的 HTML 报告"""
- html = """<!DOCTYPE html>
- <html>
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>MCP 爬取结果</title>
- <style>
- body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
- .container { max-width: 900px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; }
- h1 { color: #333; border-bottom: 2px solid #4CAF50; padding-bottom: 10px; }
- .platform { margin-bottom: 30px; }
- .platform-name { background: #4CAF50; color: white; padding: 10px; border-radius: 5px; margin-bottom: 10px; }
- .news-item { padding: 8px; border-bottom: 1px solid #eee; }
- .rank { color: #666; font-weight: bold; margin-right: 10px; }
- .title { color: #333; }
- .link { color: #1976D2; text-decoration: none; margin-left: 10px; font-size: 0.9em; }
- .link:hover { text-decoration: underline; }
- .failed { background: #ffebee; padding: 10px; border-radius: 5px; margin-top: 20px; }
- .failed h3 { color: #c62828; margin-top: 0; }
- .timestamp { color: #666; font-size: 0.9em; text-align: right; margin-top: 20px; }
- </style>
- </head>
- <body>
- <div class="container">
- <h1>MCP 爬取结果</h1>
- """
- # 添加时间戳
- html += f' <p class="timestamp">爬取时间: {now.strftime("%Y-%m-%d %H:%M:%S")}</p>\n\n'
- # 遍历每个平台
- for platform_id, titles_data in results.items():
- platform_name = id_to_name.get(platform_id, platform_id)
- html += f' <div class="platform">\n'
- html += f' <div class="platform-name">{platform_name}</div>\n'
- # 排序标题
- sorted_items = []
- for title, info in titles_data.items():
- ranks = info.get("ranks", [])
- url = info.get("url", "")
- mobile_url = info.get("mobileUrl", "")
- rank = ranks[0] if ranks else 999
- sorted_items.append((rank, title, url, mobile_url))
- sorted_items.sort(key=lambda x: x[0])
- # 显示新闻
- for rank, title, url, mobile_url in sorted_items:
- html += f' <div class="news-item">\n'
- html += f' <span class="rank">{rank}.</span>\n'
- html += f' <span class="title">{self._html_escape(title)}</span>\n'
- if url:
- html += f' <a class="link" href="{self._html_escape(url)}" target="_blank">链接</a>\n'
- if mobile_url and mobile_url != url:
- html += f' <a class="link" href="{self._html_escape(mobile_url)}" target="_blank">移动版</a>\n'
- html += ' </div>\n'
- html += ' </div>\n\n'
- # 失败的平台
- if failed_ids:
- html += ' <div class="failed">\n'
- html += ' <h3>请求失败的平台</h3>\n'
- html += ' <ul>\n'
- for platform_id in failed_ids:
- html += f' <li>{self._html_escape(platform_id)}</li>\n'
- html += ' </ul>\n'
- html += ' </div>\n'
- html += """ </div>
- </body>
- </html>"""
- return html
- def _html_escape(self, text: str) -> str:
- """HTML 转义"""
- if not isinstance(text, str):
- text = str(text)
- return (
- text.replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
- .replace("'", "'")
- )
|