AI Agent 多智能体编排生产实战

发布时间:2026/6/4 18:44:18

AI Agent 多智能体编排生产实战 引言从会用 GPT到生产级 Agent这条路比你想象的要颠簸得多。单个 LLM 调用像个优秀实习生能干但得手把手指挥真正的 Agent 系统更像一个自主工程团队——能规划、能分工、能纠错、能汇报。本文带你搭一套可在生产落地的 AI Agent 编排系统以LangGraph4j驱动状态机工作流以Spring AI封装工具调用配以背压控制、可观测性链路追踪和优雅降级把 Agent 从 Demo 推向真正的生产环境。关键数据指标数值企业 AI 项目因缺少编排层而失败Gartner 202573%多 Agent 系统相比单 Agent 平均任务完成率提升8×生产 Agent 需要可观测的延迟分位P99完整编排系统的降级防护层数3 层一、整体架构分层编排模型生产级 Agent 系统不是一个大 Prompt而是一套分层编排架构┌─────────────────────────────────────────┐ │ 外部输入层 │ │ HTTP API (REST/SSE) | MQ | 定时触发 │ └───────────────────┬─────────────────────┘ ↓ ┌─────────────────────────────────────────┐ │ 编排层LangGraph 状态机 │ │ Supervisor规划分派 │ │ Router条件分支 │ │ State StoreRedis 持久化 │ └───────────────────┬─────────────────────┘ ↓ ┌─────────────────────────────────────────┐ │ 执行层专家 Agent │ │ Search Agent | Code Agent │ │ Data Agent | Writer Agent │ └───────────────────┬─────────────────────┘ ↓ ┌─────────────────────────────────────────┐ │ 工具层Spring AI Tool Calling │ │ Tool Registry | 外部 API │ │ OpenTelemetry 可观测性 │ └─────────────────────────────────────────┘状态机节点流转START → Planner → Router → Executor → Critic │ ok ────────┤ retry ─────┘最多 3 次 超限 → 强制 END二、LangGraph4j 状态机工作流2.1 核心状态定义与图构建// ── AgentState贯穿整个工作流的共享状态 ──DataBuilderpublicclassAgentState{privateStringtaskId;privateStringuserQuery;privateListSubTaskplanSteps;// Planner 生成的子任务列表privateintcurrentStepIdx;privateMapString,ObjectstepResults;// 各步骤执行结果privateintretryCount;privateStringfinalAnswer;privateAgentStatusstatus;// RUNNING / SUCCESS / FAILED// 上下文传播确保 traceId 在子任务中可见privateStringtraceId;privateInstantstartedAt;}// ── 图构建注册节点 定义边 ──ConfigurationpublicclassAgentGraphConfig{BeanpublicCompiledGraphAgentStateagentGraph(PlannerNodeplanner,RouterNoderouter,ExecutorNodeexecutor,CriticNodecritic)throwsGraphStateException{returnStateGraph.builder(AgentState.schema())// 注册节点.addNode(planner,planner).addNode(router,router).addNode(executor,executor).addNode(critic,critic)// 定义边.addEdge(START,planner).addEdge(planner,router).addEdge(router,executor).addEdge(executor,critic)// 条件边Critic 决定结束还是重试.addConditionalEdges(critic,state-{if(state.getStatus()AgentStatus.SUCCESS)returnEND;if(state.getRetryCount()3)returnforce_end;// 超过重试上限强制终止returnplanner;// 重新规划},Map.of(END,END,force_end,END,planner,planner)).compile(CompileConfig.builder()// Redis 持久化检查点支持断点续跑.checkpointer(redisCheckpointer()).build());}}2.2 Planner 节点LLM 驱动的任务分解ComponentSlf4jpublicclassPlannerNodeimplementsNodeActionAgentState{privatefinalChatClientchatClient;privatefinalMeterRegistrymeterRegistry;privatestaticfinalStringPLANNER_PROMPT 你是一个任务规划专家。将用户任务分解为可执行的子步骤。 输出 JSON 格式{steps: [{id: s1, type: search|code|data|write, instruction: ...}]} 限制最多 5 步避免重复确保步骤间依赖关系明确。 当前任务{query} 重试原因如有{retryReason} ;OverridepublicMapString,Objectapply(AgentStatestate){Timer.SamplesampleTimer.start(meterRegistry);try(varignoredMDC.putCloseable(traceId,state.getTraceId())){log.info([Planner] 开始规划, retry{},state.getRetryCount());StringpromptPLANNER_PROMPT.replace({query},state.getUserQuery()).replace({retryReason},getRetryReason(state));// 调用 LLM带超时控制规划不应超过 30sStringplanJsonchatClient.prompt().user(prompt).options(ChatOptions.builder().temperature(0.1f)// 低温度确保确定性输出.maxTokens(1024).build()).call().content();ListSubTaskstepsparsePlan(planJson);// 埋点记录规划步骤数meterRegistry.gauge(agent.planner.steps,Tags.of(taskId,state.getTaskId()),steps.size());returnMap.of(planSteps,steps,currentStepIdx,0,status,AgentStatus.RUNNING);}finally{sample.stop(meterRegistry.timer(agent.planner.duration));}}}三、Spring AI 工具调用生产级工具注册3.1 工具定义带熔断与限流ComponentpublicclassWebSearchTool{privatefinalSearchApiClientsearchClient;privatefinalCircuitBreakercircuitBreaker;privatefinalRateLimiterrateLimiter;// Resilience4jprivatefinalMeterRegistrymeterRegistry;Tool(nameweb_search,description搜索互联网获取最新信息。输入搜索关键词输出搜索摘要列表。)publicListSearchResultsearch(ToolParam(description搜索关键词精简到核心词)Stringquery){CountercallCountermeterRegistry.counter(tool.web_search.calls,query_length,String.valueOf(query.length()/10*10));callCounter.increment();// 限流每秒最多 5 次搜索防止 LLM 滥用工具if(!rateLimiter.acquirePermission()){log.warn([WebSearch] 限流触发, query{},query);returnList.of(SearchResult.fallback(搜索频率过高请稍后重试));}// 熔断搜索 API 故障时快速失败returncircuitBreaker.executeSupplier(()-{try{returnsearchClient.search(SearchRequest.builder().query(query).maxResults(5).timeout(Duration.ofSeconds(10)).build());}catch(Exceptione){meterRegistry.counter(tool.web_search.errors).increment();throwe;}});}Tool(namecode_execution,description执行 Python 代码片段返回执行结果。仅用于数据计算和分析。)publicCodeResultexecuteCode(ToolParam(descriptionPython 代码禁止网络访问和文件系统操作)StringpythonCode){// 安全沙箱执行隔离容器 资源限制returnsandboxExecutor.run(SandboxRequest.builder().code(pythonCode).language(python).timeoutMs(5000L).memoryLimitMb(128).networkDisabled(true).build());}}// ── 将工具注入到 ChatClient ──ConfigurationpublicclassChatClientConfig{BeanpublicChatClientchatClient(ChatClient.Builderbuilder,WebSearchToolsearchTool,CodeExecutionToolcodeTool){returnbuilder.defaultTools(searchTool,codeTool).defaultAdvisors(newMessageChatMemoryAdvisor(redisChatMemory()),newTracingAdvisor()// OpenTelemetry 链路追踪).build();}}3.2 Executor 节点并发子任务执行 背压控制ComponentSlf4jpublicclassExecutorNodeimplementsNodeActionAgentState{privatestaticfinalintMAX_CONCURRENT_TASKS3;// 背压最大并发子任务privatestaticfinalintSEMAPHORE_TIMEOUT_S30;privatefinalSemaphoreconcurrencyGuardnewSemaphore(MAX_CONCURRENT_TASKS);OverridepublicMapString,Objectapply(AgentStatestate){SubTasktaskstate.getPlanSteps().get(state.getCurrentStepIdx());// 背压控制并发超限时等待而不是直接失败try{if(!concurrencyGuard.tryAcquire(SEMAPHORE_TIMEOUT_S,TimeUnit.SECONDS)){thrownewAgentBackpressureException(执行器并发饱和任务: task.getId());}}catch(InterruptedExceptione){Thread.currentThread().interrupt();thrownewAgentExecutionException(执行器被中断,e);}try(varignoredMDC.putCloseable(taskStep,task.getId())){meterRegistry.gauge(agent.executor.active,MAX_CONCURRENT_TASKS-concurrencyGuard.availablePermits());// 根据任务类型路由到专用 PromptStringsystemPromptbuildSystemPrompt(task.getType());StringcontextJsonbuildContext(state.getStepResults(),task);StringresultchatClient.prompt().system(systemPrompt).user(task.getInstruction()\n\n上下文\ncontextJson).options(ChatOptions.builder().temperature(0.3f).maxTokens(2048).build()).call().content();// 更新状态保存执行结果步骤推进MapString,ObjectupdatedResultsnewHashMap(state.getStepResults());updatedResults.put(task.getId(),result);intnextIdxstate.getCurrentStepIdx()1;booleanallDonenextIdxstate.getPlanSteps().size();returnMap.of(stepResults,updatedResults,currentStepIdx,nextIdx,status,allDone?AgentStatus.PENDING_REVIEW:AgentStatus.RUNNING);}finally{concurrencyGuard.release();}}}四、生产级可观测性三支柱全覆盖可观测性三支柱支柱技术栈存储Metrics指标MicrometerPrometheus → GrafanaTracing链路追踪OpenTelemetryJaegerLogging日志Logback JSONELK StackOpenTelemetry 全链路追踪// TracingAdvisor在每次 LLM 调用前后注入 SpanComponentpublicclassTracingAdvisorimplementsRequestResponseAdvisor{privatefinalTracertracer;OverridepublicAdvisedRequestadviseRequest(AdvisedRequestreq,MapString,Objectctx){Spanspantracer.nextSpan().name(llm.call).tag(llm.model,req.chatOptions().getModel()).tag(llm.tokens,input).start();ctx.put(llm.span,span);ctx.put(llm.start_ms,System.currentTimeMillis());returnreq;}OverridepublicChatResponseadviseResponse(ChatResponseresp,MapString,Objectctx){Spanspan(Span)ctx.get(llm.span);longdurationMsSystem.currentTimeMillis()-(Long)ctx.get(llm.start_ms);Usageusageresp.getMetadata().getUsage();span.tag(llm.prompt_tokens,usage.getPromptTokens()).tag(llm.completion_tokens,usage.getCompletionTokens()).tag(llm.duration_ms,durationMs).end();// Prometheus 指标Token 费用估算meterRegistry.counter(llm.tokens.total,type,prompt).increment(usage.getPromptTokens());returnresp;}}五、生产踩坑别让 Demo 毁了你的 KPI 高频踩坑清单1. 工具调用死循环LLM 反复调用同一工具但得不到满意结果导致无限循环消耗 Token。解法给每个工具维护callCount单轮超过 3 次后注入 “stop using this tool” 指令到下一轮 Prompt。2. 状态爆炸 —— stepResults 无限膨胀每步都把完整结果写入 AgentState传入 LLM 的 Context Window 超限。解法对 stepResults 做摘要压缩Summary Agent只保留关键信息而不是原始输出。3. Checkpoint 序列化失败AgentState 中包含不可序列化字段如 Span 对象导致 Redis 持久化崩溃。解法Span/MDC 等运行时上下文不要放进 AgentState用 ThreadLocal 传递仅在节点入口处恢复。4. 并发幻觉问题多 Agent 并发修改同一份 AgentState导致数据竞争。解法AgentState 设计为不可变快照每次节点返回的是 delta 字段更新由框架负责 merge绝不让两个节点写同一个字段。5. LLM 输出 JSON 解析失败LLM 输出的 JSON 有时含 Markdown 代码块包裹或末尾多余字符导致解析异常。解法统一用正则\{[\s\S]*\}提取 JSON 主体再用 Jackson 的readTree()包一层重试逻辑最多 2 次。6. Token 成本失控没有 Token 预算Critic 节点反复重规划一次任务消耗数万 Token。解法给每个 taskId 设置maxTokenBudget如 20000超限后触发降级直接用当前最佳结果输出记录告警。六、框架选型对比维度LangGraph4j Spring AI纯 LangChain4j自研状态机图编排能力✅ 原生 DAG 循环⚠️ 链式不支持循环✅ 完全自定义Spring 生态集成✅ 开箱即用✅ 成熟 需自行适配断点续跑✅ Checkpointer 机制❌ 不支持 需自行实现可观测性✅ 内置 OTel 支持⚠️ 需手动埋点 需完整实现学习曲线中需理解 StateGraph低高生产案例增长中多视团队而定七、生产部署要点Redis 持久化 AgentState支持任务断点续跑使用 LangGraph4j 的 RedisSaver 作为 Checkpointer任务中途宕机后可从最后一个成功的 Checkpoint 恢复避免从头重跑的 Token 浪费。异步流式输出SSE 推送中间进度用 Spring WebFlux 的 Flux 接收 LLM 流式输出通过 SSE 实时推送给前端避免用户等待黑屏Agent 任务普遍 10-60 秒。双层 Token 预算节点级 任务级每个节点设置maxTokens同时在 AgentState 记录累计消耗超过任务预算时触发 Early-Stop返回当前最优结果。工具执行隔离超时 沙箱 熔断所有外部工具调用必须有超时限制建议 10-30s代码执行类工具须在沙箱容器中运行熔断阈值设置为失败率 50% 触发开路。Grafana 告警P99 延迟 Token 消耗 失败率关键指标任务 P99 延迟、每任务平均 Token 消耗、Agent 失败率、工具调用成功率。失败率 5% 立即告警。总结把 AI Agent 推向生产需要的不仅是让 LLM 能调用工具而是一套完整的工程化体系️LangGraph 状态机有向图驱动的可控工作流支持反思重试Spring AI 工具链生产级工具注册、熔断、限流一体化三层容错限流 → 熔断 → Token 预算防止故障扩散全链路可观测Metrics Tracing Logging故障 5 分钟内定位Redis Checkpoint断点续跑避免长任务从头重来踩坑经验状态爆炸、工具死循环、JSON 解析失败……提前规避

相关新闻