
错误处理机制构建健壮的 AI 应用前言错误处理是软件系统可靠性的重要保障。在 AI 应用中错误来源多样需要设计完善的错误处理机制。我在多个项目中处理过各种错误今天分享一些经验和实现。错误分类与定义from enum import Enum from typing import Optional, Dict, Any class ErrorType(Enum): 错误类型 VALIDATION validation NETWORK network TIMEOUT timeout RATE_LIMIT rate_limit INTERNAL internal AGENT_ERROR agent_error TOOL_ERROR tool_error UNKNOWN unknown class AppError(Exception): 应用错误基类 def __init__(self, error_type: ErrorType, message: str, details: Optional[Dict[str, Any]] None): self.error_type error_type self.message message self.details details or {} super().__init__(message) def to_dict(self) - Dict[str, Any]: 转换为字典 return { error_type: self.error_type.value, message: self.message, details: self.details } # 具体错误类 class ValidationError(AppError): 验证错误 def __init__(self, message: str, field: Optional[str] None): details {} if field: details[field] field super().__init__(ErrorType.VALIDATION, message, details) class NetworkError(AppError): 网络错误 def __init__(self, message: str, url: Optional[str] None): details {} if url: details[url] url super().__init__(ErrorType.NETWORK, message, details) class TimeoutError(AppError): 超时错误 def __init__(self, message: str, timeout: Optional[int] None): details {} if timeout: details[timeout] timeout super().__init__(ErrorType.TIMEOUT, message, details)错误处理装饰器import functools import traceback from typing import Callable, Any class ErrorHandler: 错误处理器 staticmethod def handle_exceptions(func: Callable) - Callable: 错误处理装饰器 functools.wraps(func) async def async_wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except AppError: # 已经是 AppError直接抛出 raise except Exception as e: # 包装为未知错误 error AppError( ErrorType.UNKNOWN, str(e), {traceback: traceback.format_exc()} ) raise error functools.wraps(func) def sync_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except AppError: raise except Exception as e: error AppError( ErrorType.UNKNOWN, str(e), {traceback: traceback.format_exc()} ) raise error if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper重试机制import time import asyncio from functools import wraps from typing import Callable, Optional, List, Type class RetryStrategy: 重试策略 def __init__(self, max_attempts: int 3, delay: float 1.0, exponential_backoff: bool True, jitter: bool True): self.max_attempts max_attempts self.delay delay self.exponential_backoff exponential_backoff self.jitter jitter def get_wait_time(self, attempt: int) - float: 获取等待时间 if self.exponential_backoff: wait_time self.delay * (2 ** attempt) else: wait_time self.delay if self.jitter: wait_time * (0.5 random.random()) return wait_time def should_retry(self, attempt: int, error: Exception) - bool: 判断是否重试 if attempt self.max_attempts: return False # 可以根据错误类型判断是否重试 retryable_errors [ NetworkError, TimeoutError ] return any(isinstance(error, err_type) for err_type in retryable_errors) def with_retry(retry_strategy: RetryStrategy, retryable_errors: Optional[List[Type[Exception]]] None): 重试装饰器 def decorator(func: Callable) - Callable: wraps(func) async def async_wrapper(*args, **kwargs): last_error None for attempt in range(retry_strategy.max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_error e if attempt retry_strategy.max_attempts - 1: if retry_strategy.should_retry(attempt, e): wait_time retry_strategy.get_wait_time(attempt) await asyncio.sleep(wait_time) else: raise raise last_error wraps(func) def sync_wrapper(*args, **kwargs): last_error None for attempt in range(retry_strategy.max_attempts): try: return func(*args, **kwargs) except Exception as e: last_error e if attempt retry_strategy.max_attempts - 1: if retry_strategy.should_retry(attempt, e): wait_time retry_strategy.get_wait_time(attempt) time.sleep(wait_time) else: raise raise last_error if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator回退与降级from typing import Callable, Any, Optional class FallbackStrategy: 回退策略 def __init__(self): self.fallbacks {} def register(self, operation_name: str, fallback_func: Callable): 注册回退函数 self.fallbacks[operation_name] fallback_func def execute_with_fallback(self, operation_name: str, main_func: Callable, *args, **kwargs) - Any: 带回退的执行 try: return main_func(*args, **kwargs) except Exception as e: if operation_name in self.fallbacks: fallback self.fallbacks[operation_name] return fallback(*args, **kwargs, errore) raise完整错误处理示例import random class ExampleService: 示例服务 def __init__(self): self.error_handler ErrorHandler() self.retry_strategy RetryStrategy(max_attempts3) self.fallback FallbackStrategy() # 注册回退函数 self.fallback.register(search, self.search_fallback) with_retry(RetryStrategy()) ErrorHandler.handle_exceptions def search(self, query: str) - Dict[str, Any]: 搜索 # 模拟错误 if random.random() 0.3: raise NetworkError(网络请求失败, urlhttps://example.com) return {results: [result1, result2]} def search_fallback(self, query: str, error: Exception) - Dict[str, Any]: 搜索回退 print(f使用回退方案执行搜索: {error}) return {results: [fallback_result], is_fallback: True} # 使用 service ExampleService() try: result service.search(test) print(result) except AppError as e: print(f发生错误: {e.to_dict()})总结错误处理机制要点错误分类定义明确的错误类型异常包装统一错误处理重试策略可配置的重试机制回退方案提供降级服务监控告警记录错误并告警实践建议尽早发现错误提供明确的错误信息设计合理的回退方案监控并持续改进