Agent与工具调用 - 问题与解决方案

发布时间:2026/6/10 1:41:46

Agent与工具调用 - 问题与解决方案 问题1Agent和普通AI对话有什么区别核心区别能力普通AIAgent回答方式被动响应主动规划工具调用不能可以记忆仅上下文长期短期执行动作不能可以什么时候用Agent简单问答天气查询、常识回答 → 普通AI即可复杂任务多步骤、需要操作外部系统 → Agent自主执行自动化流程、监控系统 → AgentQ什么场景下必须用Agent不能用普通AIA任务需要调用外部API获取实时信息读写数据库、操作文件系统多步骤串行执行中间结果影响后续长期记忆不受上下文窗口限制问题2Function Calling的工作流程是什么标准流程用户输入 → 模型判断 → 调用工具 → 获取结果 → 模型生成回答详细步骤步骤1定义工具{ name: get_weather, description: 查询城市天气, parameters: { type: object, properties: {city: {type: string}}, required: [city] } }步骤2模型判断模型根据用户输入决定是否调用工具、调用哪个工具、传什么参数步骤3执行工具result get_weather(city北京)步骤4返回结果{temperature: 25, weather: 晴天, humidity: 60}步骤5生成回答模型根据工具返回结果生成最终回答QFunction Calling和普通API调用的区别A普通API代码显式调用固定逻辑Function CallingAI决定何时调用、调用什么、传什么参数问题3如何设计工具描述description描述的三大要素做什么工具的功能什么时候用触发条件返回什么输出格式好描述 vs 差描述差描述description: 天气工具好描述description: 查询指定城市和日期的天气预报返回温度、天气状况、湿度、风力等信息。用于用户询问天气、规划出行、穿衣建议等场景。Q工具描述写错了会有什么后果A描述过于模糊 → 模型不知道什么时候该调用描述过于具体 → 可能错过适用场景描述有歧义 → 模型可能调用错误问题4多工具协同串行和并行怎么选串行调用定义上一个工具的输出是下一个工具的输入场景1. 查订单 → 2. 根据订单查物流 → 3. 根据物流生成通知代码order get_order(user_id) tracking get_tracking(order[tracking_id]) notify_user(tracking)并行调用定义多个工具同时执行互不依赖场景同时查北京天气、上海天气、广州天气代码import asyncio ​ results await asyncio.gather( get_weather(北京), get_weather(上海), get_weather(广州) )何时用哪种场景方式A的输出是B的输入串行多个查询互不依赖并行主任务分支任务先并行查主数据再串行处理Q为什么并行调用能提升性能AIO等待时间可以叠加减少总等待时间。问题5工具调用失败怎么处理5.1 失败类型详解类型特点原因处理临时性失败再试一次可能成功网络抖动、服务器繁忙重试永久性失败怎么重试都没用参数错误、权限不足修复代码限流失败等一会儿就能恢复请求超过配额等待后重试5.2 重试策略简单重试不推荐for i in range(3): try: return call_api() except: continue问题可能让服务器更忙。指数退避推荐import time import random ​ for attempt in range(5): try: return call_api() except (TimeoutError, ConnectionError): wait_time (2 ** attempt) random.uniform(0, 1) time.sleep(wait_time)等待时间指数增长1s → 2s → 4s → 8s加随机抖动避免多进程同时重试5.3 熔断器模式什么是熔断器保险丝断了 → 停电 → 防止电线着火 熔断器跳闸 → 快速失败 → 防止服务崩溃三种状态CLOSED闭合 OPEN打开 HALF_OPEN半开 ↓ ↓ ↓ 正常运行 快速失败 尝试恢复代码实现class CircuitBreaker: def __init__(self, failure_threshold3, timeout60): self.failure_threshold failure_threshold self.timeout timeout self.failure_count 0 self.last_failure_time None self.state CLOSED def call(self, func): if self.state OPEN: # 检查是否超时 if time.time() - self.last_failure_time self.timeout: self.state HALF_OPEN else: raise Exception(服务暂时不可用熔断中) try: result func() if self.state HALF_OPEN: self.state CLOSED self.failure_count 0 return result except Exception as e: self.failure_count 1 self.last_failure_time time.time() if self.failure_count self.failure_threshold: self.state OPEN raise5.4 降级策略缓存降级cache {北京: {weather: 晴, temp: 25}} ​ def get_weather_with_cache(city): try: result call_weather_api(city) cache[city] result # 更新缓存 return result except: if city in cache: return {**cache[city], note: 缓存数据} raise功能降级def smart_reply(user_message): try: return full_feature_reply(user_message) # 完整功能 except AIOverloadError: return basic_reply(user_message) # 降级到基础功能 except: return 服务繁忙请稍后重试5.5 完整容错代码class RobustToolCaller: def __init__(self): self.circuit_breaker CircuitBreaker(failure_threshold3) self.cache {} def call(self, tool_name, func, params): # 1. 检查熔断器 if self.circuit_breaker.state OPEN: return self._fallback(tool_name, params) # 2. 带重试的调用 for attempt in range(3): try: return func(**params) except (TimeoutError, ConnectionError): if attempt 2: self.circuit_breaker.call(lambda: None) return self._fallback(tool_name, params) except Exception as e: raise return self._fallback(tool_name, params) def _fallback(self, tool_name, params): # 降级方案 if tool_name weather: return self.cache.get(params.get(city), {weather: 未知}) raise Exception(f工具{tool_name}暂时不可用)Q工具返回错误后Agent应该告诉用户什么A如果有兜底方案 → 自动切换如果无法完成 → 诚实地告诉用户哪里出了问题不要编造结果幻觉给出替代建议如稍后重试问题6如何防止Agent陷入死循环6.1 死循环的本质Agent的核心逻辑while True: 思考下一步 执行工具 检查结果 如果完成就返回否则继续为什么会死循环模型无法判断什么时候够了反思产生新任务无限循环工具返回不满意无限重试6.2 最大调用次数限制class Agent: MAX_TOOL_CALLS 10 def run(self, task): for call_num in range(self.MAX_TOOL_CALLS): result self.execute() if self.is_complete(result): return result return self.force_complete() # 达到上限强制返回6.3 熔断器防止死循环class CircuitBreaker: 连续失败N次后暂时停止调用 def __init__(self, failure_threshold5, timeout30): self.failure_threshold failure_threshold self.timeout timeout self.failure_count 0 self.last_failure_time None self.state CLOSED def call(self, func): # OPEN状态直接拒绝 if self.state OPEN: if time.time() - self.last_failure_time self.timeout: self.state HALF_OPEN else: raise Exception(超过调用限制) try: return func() except: self.failure_count 1 self.last_failure_time time.time() if self.failure_count self.failure_threshold: self.state OPEN raise6.4 资源追踪class ResourceTracker: def __init__(self): self.tool_calls 0 self.total_tokens 0 self.execution_time 0 def should_stop(self): 多维度判断是否该停止 if self.tool_calls 10: return True, 超过最大调用次数 if self.total_tokens 50000: return True, 超过Token限制 if self.execution_time 120: return True, 执行超时 return False, None6.5 阶段式Agent设计class StagedAgent: STAGES [ {name: understand, max_calls: 2}, {name: plan, max_calls: 3}, {name: execute, max_calls: 5}, {name: verify, max_calls: 2}, ] def run(self, task): for stage in self.STAGES: result self.run_stage(stage, task) if self.is_complete(result): return result return self.best_effort_result(task)6.6 完整防死循环系统class RobustAgent: def __init__(self): self.max_tool_calls 10 self.circuit_breaker CircuitBreaker(failure_threshold5) self.tracker ResourceTracker() def run(self, task): self.tracker.start() for call_num in range(self.max_tool_calls): # 1. 检查是否该停止 should_stop, reason self.tracker.should_stop() if should_stop: return self.force_complete(reason) # 2. 检查熔断器 if self.circuit_breaker.state OPEN: return self.force_complete(熔断器开启) try: result self.execute(task) self.tracker.track(tool_callTrue) if self.is_complete(result): return result except Exception as e: self.circuit_breaker.call(lambda: None) continue return self.force_complete(达到调用上限) def force_complete(self, reason): return {status: completed_with_limit, reason: reason}QAutoGPT为什么会陷入死循环A反思机制没有限制 → 每次反思都产生新任务缺乏有效的终止条件 → 模型总觉得还能更好没有强制停止机制 → 不会强制返回改进方法加入最大调用次数限制设置明确的终止条件熔断器防止雪崩阶段式设计分阶段执行

相关新闻