并行+HITL实战)
Spring AI Graph的并行执行和HITL人机协作是Graph模块最核心的两个进阶特性。本文基于Dream-SaaS代码审查Agent的真实落地从API用法→踩坑→5轮线上数据完整记录。并行两行代码搞定HITL四步走通但State存嵌套对象直接炸、并行分支后不能接条件边、跨请求状态污染——这3个坑不提前知道排查能卡两天。先说结论特性API核心要点并行执行addEdge(START, List.of(...))自动线程池调度fan-in自动等待所有分支完成HITLinterruptBefore checkpoint resume四步updateState → getState → withNodeResumed → invoke断点恢复RedisSaver / MemorySaver实测resume 30-61ms3个坑并行分支后不能直接接条件边——必须先fan-in到普通节点State存嵌套对象checkpoint序列化炸——invoke能跑Redis存不进去跨请求状态污染——默认threadId复用reasoningTrace越积越多整体架构CodeReview子图完整流程START ├─ codeCheckNode (调 Feign → codeReviewAnalyzeResult JSON) └─ styleCheckNode (本地规则 → styleCheckResult JSON) ↓并行汇聚 → mergeCheckNode (合并 STYLE issues扣分规则每条 -2最低 0) → riskAssessNode (写 riskLevel / needHumanReview) → [条件边] → humanReviewNode 或 reportGenNode → humanReviewNode 可选HITL → reportGenNode END上篇搭了Supervisor RAG子图CodeReview子图是占位。本篇填满它。一、并行执行addEdge(START, List.of(...))1.1 API写法Spring AI Graph 1.0.0.3 并行分支用addEdge的 List 重载// 并行分叉START → codeCheck ∥ styleCheck stateGraph.addEdge( START, List.of(CodeReviewSubGraphNames.codeCheckNode, CodeReviewSubGraphNames.styleCheckNode)); // 并行汇聚两分支均完成后进入 mergeCheck stateGraph.addEdge( List.of(CodeReviewSubGraphNames.codeCheckNode, CodeReviewSubGraphNames.styleCheckNode), CodeReviewSubGraphNames.mergeCheckNode);两行代码。不需要addParallelNode之类的专用APIaddEdge(START, List.of(...))自动创建并行分支addEdge(List.of(...), target)自动等待所有源节点完成再汇聚。1.2 两个并行节点codeCheckNode——调 Feign 远程服务重Component public class CodeCheckNode implements NodeAction { private final CodeReviewClient codeReviewClient; Override public MapString, Object apply(OverAllState state) { CodeReviewGraphTiming timing CodeReviewGraphTiming.begin(); String input CodeReviewInputSupport.resolveReviewCode(state); log.info([CodeReviewGraph] nodecodeCheckNode start atMs{} codeLength{}, timing.startEpochMs(), input.length()); try { CodeReviewAnalyzeRequest request CodeReviewAnalyzeRequest.fromReview(input, CodeReviewInputSupport.resolveReviewInstruction(state)); CodeReviewAnalyzeResult result codeReviewClient.analyze(request); log.info([CodeReviewGraph] nodecodeCheckNode done atMs{} elapsedMs{} score{} issueCount{}, timing.nowEpochMs(), timing.elapsedMs(), result.score(), result.issues().size()); return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult, CodeReviewAnalyzeResults.toJson(result)); } catch (Exception ex) { return Map.of(DreamSaaSOverAllState.errorMessage, 调用 code-review-agent 失败: ex.getMessage()); } } }styleCheckNode——本地规则检查轻Component public class StyleCheckNode implements NodeAction { Override public MapString, Object apply(OverAllState state) { CodeReviewGraphTiming timing CodeReviewGraphTiming.begin(); String input CodeReviewInputSupport.resolveReviewCode(state); log.info([CodeReviewGraph] nodestyleCheckNode start atMs{} inputLength{}, timing.startEpochMs(), input.length()); ListStyleCheckViolation violations CodeStyleRules.scan(input); boolean compliant violations.isEmpty(); StyleCheckResultPayload payload new StyleCheckResultPayload(compliant, violations, compliant ? 格式检查通过 : 格式检查发现 violations.size() 处问题); log.info([CodeReviewGraph] nodestyleCheckNode done atMs{} elapsedMs{} compliant{} violationCount{}, timing.nowEpochMs(), timing.elapsedMs(), compliant, violations.size()); return Map.of(CodeReviewSubGraphNames.styleCheckResult, StyleCheckResults.toJson(payload)); } }两个节点都实现NodeAction注册时统一用AsyncNodeAction.node_async()包一层框架自动线程池并行执行。1.3 并行执行验证——线上日志部署后日志线程名和时间戳确认并行23:18:39.570 [parallel-node-action-thread-6] INFO CodeCheckNode - start atMs1780499919570 23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - start atMs1780499919573 23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - done elapsedMs0 23:19:00.026 [parallel-node-action-thread-6] INFO CodeCheckNode - done elapsedMs20455关键证据codeCheckNode 跑在 thread-6styleCheckNode 跑在 thread-5——不同线程两者启动间隔 3ms——确认并行启动styleCheckNode 1ms 完成本地规则codeCheckNode 20455ms 完成Feign 调 LLM并行总耗时 max(20455, 1) 20455ms当前 styleCheckNode 是本地规则并行收益不明显。但如果 styleCheck 也调 LLM预估 5-10s并行 max(22s, 10s) ≈ 22s串行 22s 10s 32s节省 ~31%。架构就位加节点就行。1.4 坑1并行分支后不能接条件边并行汇聚之后mergeCheckNode → riskAssessNode → [条件边]正常。但在并行分支上直接加条件边——会报错。并行分支必须先 fan-in 到一个普通节点再从那个节点出条件边。// ❌ 错误并行节点后直接条件边 stateGraph.addConditionalEdges(CodeReviewSubGraphNames.codeCheckNode, ...); // ✅ 正确先 fan-in → mergeCheck → riskAssess → 条件边 stateGraph.addEdge( List.of(codeCheckNode, styleCheckNode), mergeCheckNode); stateGraph.addConditionalEdges(riskAssessNode, routeNode, Map.of(...));二、HITLinterruptBefore checkpoint resumeHITL的本质图跑到某个节点之前暂停等人工决策后从断点继续。2.1 interruptBefore vs interruptAfter配置效果适用场景interruptBefore(node)节点执行前暂停状态不含该节点输出人工审核后决定是否执行该节点interruptAfter(node)节点执行后暂停状态已含该节点输出人工审核节点输出后决定走向本项目用interruptBefore(humanReviewNode)风险评估已产出结论人工在审核前介入可以直接 APPROVED 或 REJECTED。如果场景是「让 AI 先给建议人再拍板」用interruptAfter——比如 LLM 生成了代码修改建议人工审核后决定是否采纳。2.2 CompileConfig 声明中断点CompileConfig compileConfig GraphCheckpointCompileSupport.withSharedCheckpointSaver( CompileConfig.builder() .interruptBefore(CodeReviewSubGraphNames.humanReviewNode), graphSaverConfig) .build(); CompiledGraph compiled stateGraph.compile(compileConfig);一行interruptBefore(humanReviewNode)图跑到 humanReviewNode 前就停了。checkpoint 机制把状态持久化到 Redis等 resume 时恢复。2.3 坑2State 存储类型限制Graph state 只支持String / Number / Boolean / Map / List。嵌套对象必须转 JSON 字符串存进去读出来再反序列化。// 写入对象 → JSON 字符串 return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult, CodeReviewAnalyzeResults.toJson(result)); // 读取JSON 字符串 → 对象 CodeReviewAnalyzeResult analyze CodeReviewAnalyzeResults.parse( state.value(CodeReviewSubGraphNames.codeReviewAnalyzeResult) .orElse(null));直接塞对象进去invoke 能跑但 checkpoint 序列化时炸——Redis 里存不进去resume 反序列化也拿不到。这个坑排查成本最高因为本地跑一切正常上了 Redis 才暴露。2.4 HITL 三步走start → interrupt → resume第一步startHitl拿到 threadIdpublic MapString, Object startHitl(String instruction, String code, boolean forceHumanReview) { String threadId UUID.randomUUID().toString(); RunnableConfig runConfig GraphInvokeSupport.threadConfig(threadId); OptionalOverAllState finished codeReviewSubGraph.invoke( GraphInvokeSupport.codeReviewInputs(instruction, code, forceHumanReview), runConfig); MapString, Object result invokeAndPack(finished, threadId); enrichInterruptFlags(result, runConfig); return result; }forceHumanReviewtrue时RiskAssessNode无视实际风险等级强制needHumanReviewtrue方便测试。第二步resumeHitl写入人工决策 恢复执行public MapString, Object resumeHitl(String threadId, String decision, String reason) { RunnableConfig runConfig GraphInvokeSupport.threadConfig(threadId); // 1. updateState把人工决策写入 checkpoint MapString, Object patch new HashMap(); patch.put(CodeReviewSubGraphNames.humanDecision, decision); patch.put(CodeReviewSubGraphNames.humanReviewReason, reason); runConfig codeReviewSubGraph.updateState(runConfig, patch, null); // 2. getState读出 snapshot拿 next 节点 ID StateSnapshot snapshot codeReviewSubGraph.getState(runConfig); String nextNode snapshot.next(); // 3. withNodeResumed标记中断节点可继续 runConfig.withNodeResumed(nextNode); // 4. invoke从 checkpoint state 接着跑 OverAllState state codeReviewSubGraph .invoke(snapshot.state(), runConfig) .orElseThrow(); return invokeAndPack(Optional.of(state), threadId); }四步updateState → getState → withNodeResumed → invoke。少一步都不行。2.5 风险评估规则RiskAssessNode决定需不需要人工介入static boolean needHumanReview(int score, SeverityCounts counts) { if (counts.critical 0) return true; // 有 CRITICAL issue if (score 60) return true; // 评分低于 60 return counts.high 3; // HIGH issue ≥ 3 }风险等级双维度取高评分档位小于60CRITICAL60~74HIGH75~84MEDIUM85及以上LOW2.6 实测HITL 完整流程用一段含 SQL 注入的代码测试public class AuthController { PostMapping(/login) public String login(String username, String password) { String sql SELECT * FROM users WHERE username username AND password password; Statement stmt conn.createStatement(); ResultSet rs stmt.executeQuery(sql); return rs.next() ? token- username : null; } }HITL Start 日志23:21:04.241 [parallel-node-action-thread-5] INFO CodeCheckNode - start codeLength417 23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - start inputLength417 23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - done elapsedMs0 23:21:25.823 [parallel-node-action-thread-5] INFO CodeCheckNode - done elapsedMs21581 score70 23:21:25.832 [parallel-node-action-thread-5] INFO MergeCheckNode - done elapsedMs0 23:21:25.838 [parallel-node-action-thread-5] INFO RiskAssessNode - done elapsedMs0 riskLevelCRITICAL needHumanReviewtrue 23:21:25.846 [http-nio-8098-exec-2] INFO Service - graph interrupted nextNodehumanReviewNode awaiting resume并行执行 → 风险 CRITICAL → HITL 中断等人工。HITL ResumeAPPROVED日志23:21:40.872 [http-nio-8098-exec-7] INFO Controller - api hitl/resume threadId9d3c8010 decisionAPPROVED 23:21:40.907 [http-nio-8098-exec-7] INFO HumanReviewNode - approved elapsedMs0 note人工复核通过: 人工确认安全风险可接受已规划修复 23:21:40.916 [http-nio-8098-exec-7] INFO ReportGenNode - done elapsedMs0 modesuccess score70 23:21:40.933 [http-nio-8098-exec-7] INFO Service - resumeHitl done elapsedMs6161ms完成断点恢复。HITL ResumeREJECTED日志23:22:23.940 [http-nio-8098-exec-8] INFO HumanReviewNode - rejected elapsedMs0 note人工复核驳回: SQL注入漏洞不可接受必须修复后重新提交 23:22:23.946 [http-nio-8098-exec-8] INFO ReportGenNode - done elapsedMs0 modefailure reportLength40 23:22:23.954 [http-nio-8098-exec-8] INFO Service - resumeHitl done elapsedMs30REJECTED 时走 failure 分支30ms 完成。三、CheckpointSaverHITL 的状态怎么存HITL 能断点续跑靠的是 checkpoint 机制——图每次执行到一个节点状态会被 Saver 持久化中断后 resume 时从上次存的快照恢复。3.1 框架内置 8 种 SaverSaver存储多实例共享重启恢复适用场景MemorySaver进程内 HashMap❌❌开发调试、单实例VersionedMemorySaver进程内带版本号❌❌checkpoint 历史回溯FileSystemSaver本地文件❌✅单实例但需重启恢复RedisSaverRedis✅✅生产首选MysqlSaverMySQL✅✅已有 MySQL 基础设施PostgresSaverPostgreSQL✅✅同上MongoSaverMongoDB✅✅同上OracleSaverOracle✅✅企业级数据库入门用 MemorySaver零依赖生产用 RedisSaver本项目选择。3.2 Redis / Memory 双模式配置一套配置Redis 优先、自动降级Configuration EnableConfigurationProperties(GraphCheckpointProperties.class) ConditionalOnClass(BaseCheckpointSaver.class) public class GraphCheckpointConfiguration { Bean(name graphCheckpointSaver) ConditionalOnMissingBean(name graphCheckpointSaver) public BaseCheckpointSaver graphCheckpointSaver( GraphCheckpointProperties properties, Qualifier(graphRedissonClient) ObjectProviderRedissonClient redissonProvider) { if (properties.isRedisEnabled()) { RedissonClient client redissonProvider.getIfAvailable(); if (client ! null) { return RedisSaver.builder().redisson(client).build(); } log.warn([GraphCheckpoint] redis-enabledtrue but RedissonClient missing; falling back to MemorySaver); } return MemorySaver.builder().build(); } }配置项只有两个dream: ai: graph: checkpoint: redis-enabled: true # false → MemorySaver redis-address-prefix: redis:// # 集群/哨兵可改为 rediss://Redis 连接复用spring.data.redis.*不需要额外配数据源。MemorySaver 进程重启后 checkpoint 全丢HITL 的 resume 就找不回状态了。生产环境务必用 Redis 或数据库 Saver。3.3 多子图共享 SaverConfig项目有多个子图都要 HITL 时不需要每个子图各自配 Saver。GraphCheckpointCompileSupport把共享的SaverConfig挂到CompileConfigpublic final class GraphCheckpointCompileSupport { public static CompileConfig.Builder withSharedCheckpointSaver( CompileConfig.Builder builder, ObjectProviderSaverConfig graphSaverConfig) { return builder.saverConfig(resolveSaverConfig(graphSaverConfig)); } public static SaverConfig resolveSaverConfig(ObjectProviderSaverConfig graphSaverConfig) { return graphSaverConfig.getIfAvailable( () - SaverConfig.builder() .register(MemorySaver.builder().build()) .build()); } }子图装配时一行搞定CompileConfig compileConfig GraphCheckpointCompileSupport.withSharedCheckpointSaver( CompileConfig.builder() .interruptBefore(CodeReviewSubGraphNames.humanReviewNode), graphSaverConfig) .build();未装配dream-ai-graph模块时自动降级MemorySaver不报错、不阻塞。四、坑3防状态污染——跨请求数据串了多个请求复用同一个 CompiledGraph 实例state 如果不清理会串数据。4.1 问题现象连续请求时Chat 子图会混入上次推理的reasoningTrace越积越多。第一次正常第二次就开始乱了。4.2 根因// ❌ 错误写法 graph.invoke(inputs); // 不传 RunnableConfig不传RunnableConfig时threadId 复用Spring AI Graph 默认用固定 threadIdcheckpoint 残留上一个请求的状态还在reasoningTrace 累加节点的追加逻辑不断累积4.3 解决方案public final class GraphInvokeStateDefaults { public static MapString, Object codeReviewInputs(String instruction, String code, boolean forceHumanReview) { MapString, Object inputs new HashMap(); inputs.put(DreamSaaSOverAllState.userInput, instruction \n code); inputs.put(CodeReviewSubGraphNames.reviewInstruction, StringUtils.hasText(instruction) ? instruction : ); inputs.put(CodeReviewSubGraphNames.reviewCode, StringUtils.hasText(code) ? code : ); inputs.put(CodeReviewSubGraphNames.codeReviewAnalyzeResult, ); inputs.put(CodeReviewSubGraphNames.styleCheckResult, ); inputs.put(CodeReviewSubGraphNames.riskLevel, ); inputs.put(CodeReviewSubGraphNames.needHumanReview, false); inputs.put(CodeReviewSubGraphNames.hitlTestForceHumanReview, forceHumanReview); inputs.put(DreamSaaSOverAllState.errorMessage, ); inputs.put(DreamSaaSOverAllState.finalAnswer, ); return inputs; } }每个请求显式初始化所有 state 键防止上一个请求的残留值污染当前请求。并行执行时两个分支写同一个 state key如果初始值不干净merge 阶段会拿到脏数据。五、5轮线上耗时数据从服务器日志[CodeReviewGraph]提取的节点级耗时轮次codeCheckNodestyleCheckNodemergeriskAssess总计评分风险HITLRun120455ms1ms1ms1ms20521ms85HIGH❌Run216541ms1ms1ms1ms16577ms85CRITICAL✅Run324706ms1ms1ms1ms24755ms85HIGH❌HITL-121581ms1ms1ms1ms21615ms70CRITICAL✅HITL-226938ms1ms1ms1ms26975ms60CRITICAL✅关键结论codeCheckNodeFeign 远程调用是唯一瓶颈占 99.9% 耗时styleCheckNode 1ms / mergeCheckNode ≈1ms / riskAssessNode ≈1ms本地规则 内存计算所有轮次 codeCheckNode 和 styleCheckNode 在同一毫秒启动——并行执行确认HITL Resume 30-61ms断点恢复几乎零延迟六、代码结构src/main/java/com/zhu/dream/ai/graph/subgraph/codereview/ ├── CodeReviewSubGraphConfiguration.java # 子图装配并行条件边interruptBefore ├── CodeCheckNode.java # Feign 调 code-review-agent ├── StyleCheckNode.java # 本地格式规则 ├── CodeReviewMergeNode.java # 合并 STYLE issues ├── RiskAssessNode.java # 风险评估 needHumanReview ├── CodeReviewRouteNode.java # 条件边 ├── HumanReviewNode.java # HITL 人工复核 ├── ReportGenNode.java # 生成 Markdown 报告 ├── CodeReviewSubGraphNames.java # 常量 ├── CodeReviewSubGraphTestService.java # invoke / startHitl / resumeHitl ├── CodeReviewGraphTiming.java # 节点耗时统计 src/main/java/com/zhu/dream/ai/graph/support/ ├── CodeReviewInputSupport.java # 输入拆分 ├── GraphInvokeStateDefaults.java # state 键默认值注册表 ├── GraphInvokeSupport.java # invoke 工具防状态污染3个坑汇总坑现象解法并行分支后接条件边编译报错先 fan-in 到普通节点再出条件边State 存嵌套对象invoke 能跑checkpoint 序列化炸嵌套对象转 JSON 字符串存入读出反序列化跨请求状态污染并行分支 merge 时拿到脏数据每次 invoke 前显式初始化所有 state 键版本spring-ai-bom 1.1.6 spring-ai-alibaba 1.1.2.2