LangGraph Durable Execution 深度解析

发布时间:2026/5/20 23:46:52

LangGraph Durable Execution 深度解析 一、问题背景与设计目标在复杂 AI 工作流尤其是 LLM 驱动系统中常见以下问题工作流执行时间长分钟级甚至小时级依赖外部系统LLM、API、数据库存在非确定性随机性、模型输出包含副作用写数据库、发请求需要人工介入human-in-the-loop传统同步执行模型在这些场景下存在明显缺陷一旦失败必须从头执行副作用可能重复执行导致数据污染无法安全暂停与恢复Durable Execution 的目标正是解决上述问题其核心能力包括在任意中断点安全恢复 workflow同时保证副作用不重复执行、执行结果一致。二、核心抽象与执行模型Durable Execution 在 LangGraph 中建立在三个核心抽象之上1. Checkpoint状态持久化Checkpoint 是 Durable Execution 的基础保存 workflow 的执行状态state保存 task 的执行结果支持按thread_id维度隔离执行实例其本质是workflow 的“可恢复快照”2. Task副作用隔离单元Task 是 Durable Execution 的关键设计用于封装非确定性操作random、LLM副作用操作API 调用、写文件task 执行结果会被持久化replay 时不会重新执行而是直接读取结果本质上Task 可缓存的执行单元memoized execution unit3. Replay重放执行Durable Execution 并不是“断点续跑”而是基于 checkpoint 的确定性重放deterministic replay关键特征不从“中断的代码行”恢复从一个语义安全的起点重新执行已完成 task → 读取缓存未完成 task → 重新执行三、执行语义Determinism 与 Replay1. 非断点恢复模型LangGraph 的恢复机制❌ 不是恢复调用栈stack❌ 不是恢复程序计数器PC✅ 是重新执行 workflowreplay恢复流程加载 checkpoint确定恢复起点node / entrypoint从起点重新执行对已完成 task 进行“跳过”读取结果2. 起点选择规则不同 API 的恢复起点场景起点StateGraph停止的 node 开头子图调用父 nodeFunctional APIentrypoint这保证replay 始终从一个语义完整的边界开始3. 一致性保证机制Durable Execution 的一致性依赖1Task 结果缓存第一次执行 task → 执行 → 保存结果 恢复执行 task → 直接返回缓存结果2幂等性要求如果 task 未成功完成未写入 checkpoint会重新执行因此必须保证幂等性idempotency四、Durability Modes一致性与性能权衡LangGraph 提供三种持久化策略1.exit模式仅在 workflow 结束时写入 checkpoint中间状态不保存特点✅ 性能最高❌ 无法中途恢复✅ 可记录最终状态适用于可重跑任务非关键流程2.async模式异步写入 checkpoint特点⚖️ 性能与一致性折中⚠️ 存在少量丢失风险进程崩溃3.sync模式每步执行前同步写入 checkpoint特点✅ 最强一致性❌ 性能开销最大适用于关键业务流程强一致性要求场景五、Task 设计的必要性1. 为什么 checkpoint 不足仅有 checkpointnode 内 调用 API 写数据库问题replay 时会重新执行副作用导致重复写入 / 重复请求2. Task 的作用Task 提供副作用隔离结果缓存replay 跳过执行因此checkpoint 负责“状态”task 负责“行为的可重放性”六、暂停与恢复Human-in-the-loopDurable Execution 支持人工干预1. interrupt在 node 内主动暂停 workflow保存当前 state2. Command 恢复外部提供新 state继续执行 workflow应用场景审批流程人工校验LLM 输出审核七、失败恢复机制当 workflow 失败时恢复方式graph.invoke(None,config)条件使用相同thread_idcheckpoint 存在行为从最近 checkpoint 开始 replay已完成 task 不重复执行八、工程设计原则1. 拆分副作用每个副作用 → 单独 task避免 node 内混合多个副作用2. 保持确定性避免随机数未封装时间依赖外部状态依赖3. Task 幂等性关键策略使用 idempotency key检查已有结果避免重复写入4. 合理选择 durability 模式场景推荐模式高性能、可重跑exit常规任务async关键流程sync九、设计权衡与局限性1. 成本checkpoint 带来存储与 I/O 开销task 增加代码复杂度2. 编程模型约束必须遵守副作用必须 task 化workflow 需具备确定性3. 非真正“断点续跑”需要 replaynode 可能被重复执行但 task 不会十、总结LangGraph Durable Execution 的本质可以概括为基于 checkpoint task 缓存 replay 的确定性执行模型其核心价值在于可恢复性支持中断、失败恢复一致性副作用不会重复执行可扩展性支持长运行与复杂 workflow可交互性支持 human-in-the-loop最终总结LangGraph 的 Durable Execution 并不是“从中断点继续执行”而是通过 task 缓存与 checkpoint 重放整个执行路径在保证副作用不重复的前提下实现语义一致的恢复执行。 Durable Execution Workflow using LangGraph. This module demonstrates durable execution with: 1. Tasks - for side effects and non-deterministic operations 2. Subgraphs - for modular, reusable workflow components 3. Checkpointing - for state persistence and recovery 4. Exception recovery - resume from last checkpoint on failure 5. Interrupt/Resume - pause for human-in-the-loop, then resume import random from re import sub import time import uuid from typing import Any, Callable, TypedDict, NotRequired from langgraph.checkpoint.memory import InMemorySaver from langgraph.func import task from langgraph.graph import END, START, StateGraph from langgraph.types import interrupt, Command from langchain_core.runnables import RunnableConfig class State(TypedDict): State schema for the durable execution workflow. urls: list[str] results: NotRequired[list[str]] random_number: NotRequired[int] thread_id: NotRequired[str] error_url: NotRequired[str] task def generate_random_number() - int: Non-deterministic operation: generate a random number. print( generate_random_number starting... ) return random.randint(1, 100) task def simulate_request(url: str) - str: Simulate a network request with potential failure. print(f simulate_request starting for {url}...) # print(f Requesting {url}...) time.sleep(0.3) if fail in url: raise RuntimeError(fSimulated request failure for {url}) return fresponse-{url[:5]} def create_subgraph() - Any: Create and compile the subgraph for parallel URL requests. def subgraph_node(state: State) - dict: Execute parallel requests for all URLs. print( subgraph_node starting...) task_results [simulate_request(url) for url in state[urls]] results [t.result() for t in task_results] return {results: results} sub_builder StateGraph(State) sub_builder.add_node(subgraph_api, subgraph_node) sub_builder.add_edge(START, subgraph_api) sub_builder.add_edge(subgraph_api, END) return sub_builder.compile(checkpointerTrue) # return sub_builder.compile() def create_main_node() - Callable: Factory function to create the main node with injected subgraph. def main_node(state: State) - dict: Generate random number and invoke subgraph. print( Main Node: Starting workflow execution ) rand_task generate_random_number() state[random_number] rand_task.result() print(f Generated random number: {state[random_number]}) return { random_number: state[random_number] } return main_node def create_pausing_node() - Callable: Factory function to create a node that pauses for human-in-the-loop. def pausing_node(state: State) - dict: Pause workflow for human intervention. print( Pausing Node: Workflow will pause for human approval ) print( [INTERRUPT] Pausing workflow for human approval...) result interrupt(Waiting for human approval) print(f [RESUMED] Received: {result}) return {results: [approved]} return pausing_node def build_workflow(checkpointer: Any None) - Any: Build and compile the main workflow graph (without pause node). subgraph create_subgraph() main_node create_main_node() workflow StateGraph(State) workflow.add_node(main, main_node) workflow.add_node(subgraph_node, subgraph) workflow.add_edge(START, main) workflow.add_edge(main, subgraph_node) workflow.add_edge(subgraph_node, END) return workflow.compile(checkpointercheckpointer) # return workflow.compile() def build_workflow_with_interrupt(checkpointer: Any None) - Any: Build workflow with interrupt/pause capability. subgraph create_subgraph() main_node create_main_node() pausing_node create_pausing_node() workflow StateGraph(State) workflow.add_node(main, main_node) workflow.add_node(subgraph_node, subgraph) workflow.add_node(pause_node, pausing_node) workflow.add_edge(START, main) workflow.add_edge(main, subgraph_node) workflow.add_edge(subgraph_node, pause_node) workflow.add_edge(pause_node, END) return workflow.compile(checkpointercheckpointer) def run_with_recovery(graph: Any, initial_input: dict, config: RunnableConfig) - Any: Execute workflow with automatic recovery from failures. Steps: 1. First run with a failing URL - triggers exception 2. Check the checkpoint to see what was saved 3. Update state with fixed URLs 4. Resume from checkpoint using graph.invoke(None, config) try: print( First run (will fail) ) result graph.invoke(initial_input, config) print(fResult: {result}) return result # for chunk in graph.stream(initial_input, subgraphsTrue, versionv2): # if chunk[type] updates: # print(chunk[ns], chunk[data]) # return None except Exception as e: print(f ❌ Workflow failed: {e}) # Step 1: Check the checkpoint to see current state print(\n Checking checkpoint ) parent_state graph.get_state(config, subgraphsTrue) state parent_state.tasks[0].state print(f Saved URLs: {state.values.get(urls, [])}) print(f Next node to execute: {state.next}) # Step 2: Update state with fixed URLs print(\n Updating state with fixed URLs ) fixed_urls [site1_fixed, site2] sub_config graph.update_state(state.config, {urls: fixed_urls}) print(f Updated URLs to: {fixed_urls}) # Step 3: Resume from checkpoint # inputNone tells LangGraph to use the saved state from checkpoint print(\n Resuming from checkpoint (inputNone) ) result graph.invoke(None, sub_config) print(f ✅ Recovery successful!) return result def demonstrate_interrupt_resume(checkpointer: Any, thread_id: str) - None: Demonstrate interrupt and resume functionality. When an interrupt occurs: 1. Workflow pauses at interrupt point (checkpointer saves state BEFORE interrupt) 2. User can check state and decide to approve/reject 3. Resume with Command to continue graph_with_pause build_workflow_with_interrupt(checkpointer) config: RunnableConfig {configurable: {thread_id: thread_id}} print(\n * 50) print(DEMONSTRATING INTERRUPT/RESUME) print( * 50) print(\n Step 1: Run workflow until interrupt ) try: result graph_with_pause.invoke( {urls: [site3], thread_id: thread_id}, config ) print(fResult: {result}) except Exception as e: print(f ⏸️ Workflow interrupted (expected): {type(e).__name__}) print(\n Step 2: Check checkpoint state ) state graph_with_pause.get_state(config, subgraphsTrue) print(f Saved state: {state.values}) print(f Next node: {state.next}) print(\n Step 3: Resume workflow using Command ) print( Using Command(resumeapproved) to continue...) try: result graph_with_pause.invoke( Command(resumeapproved), configconfig, ) print(f ✅ Resumed result: {result}) except Exception as e: print(f ❌ Resume failed: {e}) if __name__ __main__: checkpointer InMemorySaver() print( * 50) print(DEMONSTRATING EXCEPTION RECOVERY) print( * 50) # Test 1: Exception Recovery thread_id_1 str(uuid.uuid4()) config_1: RunnableConfig {configurable: {thread_id: thread_id_1}} graph build_workflow(checkpointercheckpointer) initial_input { urls: [site1, site_fail], # site_fail will cause error thread_id: thread_id_1 } print(\n--- Testing Exception Recovery ---) print(First run contains site_fail which will raise an exception) print(Recovery will:) print( 1. Check the checkpoint (saved after random_number generated)) print( 2. Update state with fixed URLs) print( 3. Resume from checkpoint\n) result run_with_recovery(graph, initial_input, config_1) print(f\n Final result after recovery ) print(result) # Test 2: Interrupt/Resume thread_id_2 str(uuid.uuid4()) print(\n) demonstrate_interrupt_resume(checkpointer, thread_id_2) print(\n * 50) print(ALL TESTS COMPLETED) print( * 50)

相关新闻