errors.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. """
  2. 自定义错误类
  3. 定义MCP Server使用的所有自定义异常类型。
  4. """
  5. from typing import Optional, List, Callable
  6. # ==================== 延迟加载支持的平台列表 ====================
  7. _get_supported_platforms: Optional[Callable[[], List[str]]] = None
  8. def _load_supported_platforms() -> List[str]:
  9. """延迟加载支持的平台列表"""
  10. global _get_supported_platforms
  11. if _get_supported_platforms is None:
  12. try:
  13. from .validators import get_supported_platforms
  14. _get_supported_platforms = get_supported_platforms
  15. except ImportError:
  16. # 降级:返回空列表
  17. return []
  18. return _get_supported_platforms()
  19. class MCPError(Exception):
  20. """MCP工具错误基类"""
  21. def __init__(self, message: str, code: str = "MCP_ERROR", suggestion: Optional[str] = None):
  22. super().__init__(message)
  23. self.code = code
  24. self.message = message
  25. self.suggestion = suggestion
  26. def to_dict(self) -> dict:
  27. """转换为字典格式"""
  28. error_dict = {
  29. "code": self.code,
  30. "message": self.message
  31. }
  32. if self.suggestion:
  33. error_dict["suggestion"] = self.suggestion
  34. return error_dict
  35. class DataNotFoundError(MCPError):
  36. """数据不存在错误"""
  37. def __init__(self, message: str, suggestion: Optional[str] = None):
  38. super().__init__(
  39. message=message,
  40. code="DATA_NOT_FOUND",
  41. suggestion=suggestion or "请检查日期范围或等待爬取任务完成"
  42. )
  43. class InvalidParameterError(MCPError):
  44. """参数无效错误"""
  45. def __init__(self, message: str, suggestion: Optional[str] = None):
  46. super().__init__(
  47. message=message,
  48. code="INVALID_PARAMETER",
  49. suggestion=suggestion or "请检查参数格式是否正确"
  50. )
  51. class ConfigurationError(MCPError):
  52. """配置错误"""
  53. def __init__(self, message: str, suggestion: Optional[str] = None):
  54. super().__init__(
  55. message=message,
  56. code="CONFIGURATION_ERROR",
  57. suggestion=suggestion or "请检查配置文件是否正确"
  58. )
  59. class PlatformNotSupportedError(MCPError):
  60. """平台不支持错误"""
  61. def __init__(self, platform: str):
  62. supported = _load_supported_platforms()
  63. suggestion = f"支持的平台: {', '.join(supported)}" if supported else "请检查 config/config.yaml 中的平台配置"
  64. super().__init__(
  65. message=f"平台 '{platform}' 不受支持",
  66. code="PLATFORM_NOT_SUPPORTED",
  67. suggestion=suggestion
  68. )
  69. class CrawlTaskError(MCPError):
  70. """爬取任务错误"""
  71. def __init__(self, message: str, suggestion: Optional[str] = None):
  72. super().__init__(
  73. message=message,
  74. code="CRAWL_TASK_ERROR",
  75. suggestion=suggestion or "请稍后重试或查看日志"
  76. )
  77. class FileParseError(MCPError):
  78. """文件解析错误"""
  79. def __init__(self, file_path: str, reason: str):
  80. super().__init__(
  81. message=f"解析文件 {file_path} 失败: {reason}",
  82. code="FILE_PARSE_ERROR",
  83. suggestion="请检查文件格式是否正确"
  84. )