
极简智能工作流条件分支与异常恢复的编排引擎设计一、线性之困工作流引擎的单行道局限早期的工作流引擎大多采用线性执行模型——步骤 A 完成后执行步骤 B步骤 B 完成后执行步骤 C。这种模型在简单场景下工作良好但面对真实业务时迅速暴露出不足审批被驳回后需要回到修改步骤、支付超时后需要切换到备用渠道、数据校验失败后需要根据错误类型走不同的修复路径。这些场景的核心需求是条件分支和异常恢复。条件分支允许工作流根据运行时数据选择不同的执行路径异常恢复允许工作流在步骤失败后自动重试、降级或回滚而非直接终止。对于 AI Agent 工作流而言这两个需求更加迫切。LLM 的输出具有不确定性工具调用可能失败外部 API 可能超时。一个健壮的 Agent 工作流必须能够处理这些异常情况优雅地降级而非崩溃。本文将从 DAG 编排、条件分支和异常恢复三个维度展示如何设计一个生产级的智能工作流引擎。二、引擎架构DAG 编排与状态机驱动的执行模型2.1 核心架构flowchart TD A[工作流定义br/YAML/JSON DSL] -- B[解析器br/构建 DAG 图] B -- C[验证器br/循环检测/类型检查] C -- D[执行引擎br/状态机驱动] D -- E[步骤调度器br/并行/串行执行] E -- F{步骤执行结果} F --|成功| G[条件评估器br/评估分支条件] F --|失败| H[异常处理器br/重试/降级/回滚] G -- I[选择下一组步骤] H -- I I -- J{是否还有待执行步骤} J --|是| E J --|否| K[工作流完成] subgraph 状态持久化 L[执行快照br/每步执行后保存] M[事件日志br/不可变事件流] end E -.- L E -.- M subgraph 异常恢复策略 N[自动重试br/指数退避] O[降级执行br/备用步骤] P[补偿回滚br/逆向执行] Q[人工介入br/挂起等待] end H -- N H -- O H -- P H -- Q2.2 工作流 DSL 设计工作流定义采用声明式 DSL将做什么与怎么做分离。开发者只需描述步骤、条件和异常策略执行引擎负责调度和容错。三、工程实现条件分支与异常恢复的核心模块3.1 工作流定义与解析# workflow_dsl.py — 工作流 DSL 定义 from pydantic import BaseModel, Field from typing import Literal, Optional from enum import Enum class RetryPolicy(BaseModel): 重试策略 max_retries: int Field(default3, description最大重试次数) initial_delay_ms: int Field(default1000, description初始延迟毫秒) max_delay_ms: int Field(default30000, description最大延迟毫秒) backoff_multiplier: float Field(default2.0, description退避倍数) retryable_errors: list[str] Field( default[timeout, rate_limit, connection_error], description可重试的错误类型 ) class ErrorStrategy(str, Enum): RETRY retry FALLBACK fallback COMPENSATE compensate SUSPEND suspend FAIL fail class StepDefinition(BaseModel): 步骤定义 step_id: str Field(description步骤唯一标识) name: str Field(description步骤名称) type: Literal[tool_call, llm_call, sub_workflow, human_approval] Field( description步骤类型 ) config: dict Field(description步骤配置参数) depends_on: list[str] Field(default[], description依赖的步骤ID列表) condition: Optional[str] Field( defaultNone, description执行条件表达式如 steps.extract.result.confidence 0.8 ) retry_policy: RetryPolicy Field(default_factoryRetryPolicy) error_strategy: ErrorStrategy Field( defaultErrorStrategy.RETRY, description异常处理策略 ) fallback_step: Optional[str] Field( defaultNone, description降级步骤ID当 error_strategyfallback 时使用 ) compensation_step: Optional[str] Field( defaultNone, description补偿步骤ID当 error_strategycompensate 时使用 ) timeout_ms: int Field(default30000, description步骤超时时间) class WorkflowDefinition(BaseModel): 工作流定义 workflow_id: str name: str version: str 1.0 steps: list[StepDefinition] # 全局异常策略 default_error_strategy: ErrorStrategy ErrorStrategy.SUSPEND # 工作流级别的超时 global_timeout_ms: int 300000 # 5 分钟3.2 条件分支执行引擎# workflow_engine.py — 工作流执行引擎 import asyncio import logging from datetime import datetime logger logging.getLogger(workflow-engine) class WorkflowEngine: 工作流执行引擎基于状态机的 DAG 调度 设计考量每步执行后持久化快照支持断点续跑 def __init__(self, state_store, event_bus): self.state_store state_store self.event_bus event_bus async def execute(self, workflow: WorkflowDefinition, initial_context: dict) - dict: 执行工作流 # 初始化执行上下文 context { workflow_id: workflow.workflow_id, status: running, steps: {}, # 步骤执行结果 started_at: datetime.utcnow().isoformat(), } context.update(initial_context) # 保存初始快照 await self.state_store.save_snapshot(context) # 构建步骤依赖图 pending {s.step_id: s for s in workflow.steps} completed set() try: while pending: # 找到当前可执行的步骤依赖已满足 条件为真 ready_steps self._find_ready_steps( pending, completed, context ) if not ready_steps: # 没有可执行步骤但还有待执行步骤 → 可能是循环依赖 remaining [s.step_id for s in pending.values()] logger.error(f死锁检测步骤 {remaining} 无法执行) context[status] deadlock break # 并行执行所有就绪步骤 tasks [ self._execute_step(step, context, workflow) for step in ready_steps ] results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理执行结果 for step, result in zip(ready_steps, results): if isinstance(result, Exception): # 步骤执行失败触发异常处理 handled await self._handle_error( step, result, context, workflow ) if not handled: context[status] failed break else: # 步骤执行成功 context[steps][step.step_id] { status: completed, result: result, completed_at: datetime.utcnow().isoformat(), } completed.add(step.step_id) del pending[step.step_id] # 保存快照 await self.state_store.save_snapshot(context) if context[status] ! running: break if context[status] running: context[status] completed except Exception as e: context[status] failed context[error] str(e) context[completed_at] datetime.utcnow().isoformat() await self.state_store.save_snapshot(context) return context def _find_ready_steps(self, pending: dict, completed: set, context: dict) - list: 找到依赖已满足且条件为真的步骤 ready [] for step_id, step in pending.items(): # 检查依赖是否全部完成 if not all(dep in completed for dep in step.depends_on): continue # 检查条件表达式 if step.condition: if not self._evaluate_condition(step.condition, context): # 条件不满足跳过此步骤 continue ready.append(step) return ready def _evaluate_condition(self, condition: str, context: dict) - bool: 安全地评估条件表达式 设计考量使用受限的表达式评估器禁止任意代码执行 # 简化实现使用安全的表达式解析器 # 生产环境应使用 ast.literal_eval 或自定义 DSL 解释器 try: # 将上下文变量注入命名空间 namespace {steps: context.get(steps, {})} return bool(eval(condition, {__builtins__: {}}, namespace)) except Exception: logger.warning(f条件评估失败: {condition}) return False async def _execute_step(self, step: StepDefinition, context: dict, workflow: WorkflowDefinition) - dict: 执行单个步骤包含超时控制 try: result await asyncio.wait_for( self._do_execute(step, context), timeoutstep.timeout_ms / 1000 ) return result except asyncio.TimeoutError: raise TimeoutError(f步骤 {step.step_id} 执行超时) async def _do_execute(self, step: StepDefinition, context: dict) - dict: 实际执行步骤逻辑 if step.type tool_call: return await self._call_tool(step.config, context) elif step.type llm_call: return await self._call_llm(step.config, context) elif step.type human_approval: return await self._wait_for_approval(step.config, context) else: raise ValueError(fUnknown step type: {step.type}) async def _handle_error(self, step: StepDefinition, error: Exception, context: dict, workflow: WorkflowDefinition) - bool: 异常处理根据策略执行恢复动作 strategy step.error_strategy if strategy ErrorStrategy.RETRY: return await self._retry_step(step, error, context, workflow) elif strategy ErrorStrategy.FALLBACK: return await self._execute_fallback(step, context, workflow) elif strategy ErrorStrategy.COMPENSATE: await self._execute_compensation(step, context, workflow) return False elif strategy ErrorStrategy.SUSPEND: context[status] suspended context[suspended_at_step] step.step_id return False else: return False async def _retry_step(self, step: StepDefinition, error: Exception, context: dict, workflow: WorkflowDefinition, attempt: int 1) - bool: 指数退避重试 policy step.retry_policy if attempt policy.max_retries: logger.error(f步骤 {step.step_id} 重试 {policy.max_retries} 次后仍失败) return False # 计算退避延迟 delay_ms min( policy.initial_delay_ms * (policy.backoff_multiplier ** (attempt - 1)), policy.max_delay_ms ) logger.info(f步骤 {step.step_id} 第 {attempt} 次重试延迟 {delay_ms}ms) await asyncio.sleep(delay_ms / 1000) try: result await self._execute_step(step, context, workflow) context[steps][step.step_id] { status: completed, result: result, retry_count: attempt, } return True except Exception as e: return await self._retry_step(step, e, context, workflow, attempt 1)四、灵活性的代价工作流引擎的架构权衡4.1 DSL 的表达力边界声明式 DSL 在表达复杂逻辑时存在局限。例如根据前三个步骤的结果动态决定后续步骤这类运行时决策在 DSL 中难以优雅表达。当业务逻辑复杂到一定程度DSL 的抽象反而成为负担。4.2 状态持久化的性能开销每步执行后保存快照引入了 I/O 开销。在高频步骤场景下每秒 100 步骤快照写入可能成为瓶颈。优化策略包括异步写入、增量快照和合并写入。4.3 条件评估的安全风险条件表达式的评估存在代码注入风险。使用eval()是不安全的必须使用受限的表达式解析器。但受限解析器的表达力又不足以覆盖所有业务场景。4.4 适用边界条件分支工作流引擎最适合审批流程、数据处理管道、AI Agent 编排等步骤间存在复杂依赖和条件判断的场景。不适合纯线性流程直接编码更简单、实时性要求极高的场景状态持久化引入延迟。五、总结条件分支和异常恢复是工作流引擎从玩具到生产级的关键跨越。通过 DAG 编排实现步骤间的灵活依赖通过条件表达式实现运行时路径选择通过多层异常策略重试/降级/补偿/挂起实现故障容错。工程实践中的核心挑战是 DSL 的表达力与安全性的平衡——表达力不足无法覆盖复杂业务安全性不足会引入注入风险。一个好的工作流引擎不是追求能表达一切而是在足够表达常见业务和保持安全可控之间找到平衡点。