
从零构建轻量级 DAG 编排引擎处理大模型复杂工作流的实战一、为什么简单的链式调用不够用在真实业务里单靠一个 Prompt 很难处理复杂的流程。开发者通常会把多个 LLM 调用、API 请求和数据清洗步骤串在一起。但一旦逻辑变复杂这种线性调用就会出问题。代码里开始出现大量的if-else嵌套处理异步等待和节点依赖变得非常麻烦。这时候有向无环图DAG是一个更稳妥的选择。它不仅能理清任务顺序还能让没有依赖关系的节点并行执行减少等待时间。二、DAG 调度逻辑与拓扑排序工作流里的每个步骤就是一个“节点”节点间的依赖关系就是“边”。要执行这个图核心是先做环路检测再通过拓扑排序确定执行顺序。下面的图展示了一个简单的流程先清洗输入然后并行做情感分类和关键词提取最后汇总生成报告。graph LR Start([启动]) -- NodeA[清洗输入] NodeA -- NodeB[情感分类] NodeA -- NodeC[关键词提取] NodeB -- NodeD[生成报告] NodeC -- NodeD NodeD -- End([结束]) style NodeB fill:#bbf,stroke:#333,stroke-width:2px style NodeD fill:#bfb,stroke:#333,stroke-width:2px节点 B 和 C 都依赖 A但它们之间没关系所以引擎会让它们同时跑。三、Node.js 轻量级实现这是一个基于 JavaScript 的简单 DAG 引擎。它实现了拓扑排序来检查依赖并支持异步并发执行。class WorkflowNode { constructor(id, taskFunction) { this.id id; this.taskFunction taskFunction; this.dependencies []; this.status PENDING; this.result null; } addDependency(nodeId) { this.dependencies.push(nodeId); } } class DagEngine { constructor() { this.nodes new Map(); } registerNode(node) { this.nodes.set(node.id, node); } // 拓扑排序检查环路并决定顺序 resolveExecutionOrder() { const inDegree new Map(); const adjList new Map(); const order []; for (const [id, node] of this.nodes) { inDegree.set(id, 0); adjList.set(id, []); } for (const [id, node] of this.nodes) { for (const depId of node.dependencies) { if (!this.nodes.has(depId)) { throw new Error(节点 ${id} 依赖的 ${depId} 未注册); } adjList.get(depId).push(id); inDegree.set(id, inDegree.get(id) 1); } } const queue []; for (const [id, degree] of inDegree) { if (degree 0) queue.push(id); } while (queue.length 0) { const currId queue.shift(); order.push(currId); for (const nextId of adjList.get(currId)) { inDegree.set(nextId, inDegree.get(nextId) - 1); if (inDegree.get(nextId) 0) { queue.push(nextId); } } } if (order.length ! this.nodes.size) { throw new Error(检测到循环依赖无法执行); } return order; } // 并发执行 async executeWorkflow(inputContext) { const completedResults { ...inputContext }; const runningPromises new Map(); while (true) { let hasPending false; let progressed false; for (const [id, node] of this.nodes) { if (node.status COMPLETED || node.status FAILED) continue; hasPending true; if (node.status RUNNING) continue; // 检查依赖是否都完成了 const allDepsMet node.dependencies.every(depId { const depNode this.nodes.get(depId); return depNode depNode.status COMPLETED; }); if (allDepsMet) { node.status RUNNING; progressed true; const promise (async () { try { const depData {}; node.dependencies.forEach(depId { depData[depId] this.nodes.get(depId).result; }); node.result await node.taskFunction(completedResults, depData); node.status COMPLETED; } catch (error) { node.status FAILED; throw error; } })(); runningPromises.set(id, promise); } } if (!hasPending) break; if (!progressed runningPromises.size 0) { throw new Error(死锁没有节点能继续执行); } await Promise.race(runningPromises.values()); for (const [id, promise] of runningPromises) { const node this.nodes.get(id); if (node.status COMPLETED || node.status FAILED) { runningPromises.delete(id); } } } const finalOutput {}; for (const [id, node] of this.nodes) { finalOutput[id] node.result; } return finalOutput; } } // 测试运行 (async () { const engine new DagEngine(); const nodeA new WorkflowNode(CleanInput, async (context) { return context.rawText.trim().replace(/[]/g, ); }); const nodeB new WorkflowNode(LlmClassify, async (context, depData) { const text depData.CleanInput; await new Promise(resolve setTimeout(resolve, 500)); // 模拟 API 延迟 return text.includes(好) ? POSITIVE : NEGATIVE; }); nodeB.addDependency(CleanInput); const nodeC new WorkflowNode(ExtractKeywords, async (context, depData) { const text depData.CleanInput; return text.split( ).filter(word word.length 1); }); nodeC.addDependency(CleanInput); const nodeD new WorkflowNode(GenerateReport, async (context, depData) { const sentiment depData.LlmClassify; const keywords depData.ExtractKeywords; return 情感: ${sentiment}, 关键词: [${keywords.join(, )}]; }); nodeD.addDependency(LlmClassify); nodeD.addDependency(ExtractKeywords); engine.registerNode(nodeA); engine.registerNode(nodeB); engine.registerNode(nodeC); engine.registerNode(nodeD); const order engine.resolveExecutionOrder(); console.log(执行顺序:, order.join( - )); const result await engine.executeWorkflow({ rawText: 这个产品设计得非常 好解决了我的痛点。 }); console.log(结果:, result); })();四、生产环境需要考虑的几个问题上面的代码适合本地或简单场景如果要上生产环境还得考虑下面几点1. 内存 vs 持久化内存里的调度很快但服务器一挂中间结果就没了。如果工作流跑了几分钟才失败重头再来很浪费。生产环境通常要用 Redis 或像 Temporal 这样的状态机来存状态但这会增加网络延迟。2. 重试策略与成本大模型 API 经常超时或限流加重试机制是必须的。但要注意如果上游节点因为超时一直重试可能会在短时间内消耗大量 Token。给每个节点设置重试上限和超时时间是必要的。3. 静态图 vs 动态分支DAG 在运行前就定好了结构容易校验。但 LLM 的输出是动态的有时候需要根据结果决定下一步走哪条路。如果要支持这种动态分支图的拓扑结构得在运行时变这会大大增加调试难度。五、小结做智能工作流核心是把杂乱的调用拆成清晰的节点和依赖。用拓扑排序处理并发不需要复杂的框架也能让多个模型任务协同工作。对于小团队来说这种轻量级的方案既能控制成本也能保证流程跑得通。