[对比学习LangChain和MAF-16]基于Checkpoint的持久化

发布时间:2026/7/1 2:23:26

[对比学习LangChain和MAF-16]基于Checkpoint的持久化 LangGraph和MAF的Workflow真的很像它们的核心目标非常一致都是解决大模型逻辑链脆弱、非确定性带来的失控问题将Agent转化为可控、稳定、可扩展的业务流水线。但是它们只是看上去很像实际上在底层的实现又完全不一样。这篇文章我们就来讨论两者在基于Checkpoint的持久化方面的异同。1. LangGraph为了应对生产环境中的网络中断或长周期任务两者都内置了基于Checkpointing的持久化机制保障工作流在异常中断后可恢复、可重试。为了让用户更好地理解Checkpointing的概念我们先来演示一个利用Checkpoint恢复执行的例子。1.1 从Checkpoint所在的地方开始执行在如下这段程序中我们基于指定的状态类型State创建了一个StateGraph并为它添加了四个节点foo、bar、baz和qux。每个节点在执行时都会将自己的名称写入一个名为nodes的状态成员中。我们将foo设置为入口节点qux设置为出口节点并采用Sequential的方式将四个节点串联起来。为了跟踪四个节点的执行我们在每个节点的执行函数中将节点名称写入一个全局列表log中。fromtypingimportAnnotated,Callable,Any,Required,TypedDictfromdotenvimportload_dotenvfromlanggraph.graphimportStateGraphfromlanggraph.checkpoint.memoryimportInMemorySaverfromlangchain_core.runnablesimportRunnableConfigimportasyncio,operator load_dotenv()log[]classState(TypedDict):nodes:Required[Annotated[list[str],operator.add]]defbuild_node(node_id:str)-Callable[[State],dict[str,Any]]:defhandle(state:State)-dict[str,Any]:log.append(node_id)return{nodes:[node_id]}returnhandle checkpointerInMemorySaver()nodes{node_id:build_node(node_id)fornode_idin[foo,bar,baz,qux]}agent(StateGraph(State).add_node(foo,nodes[foo])#type: ignore.add_node(bar,nodes[bar])#type: ignore.add_node(baz,nodes[baz])#type: ignore.add_node(qux,nodes[qux])#type: ignore.set_entry_point(foo).set_finish_point(qux).add_edge(foo,bar).add_edge(bar,baz).add_edge(baz,qux).compile(checkpointercheckpointer))asyncdefmain():config:RunnableConfig{configurable:{thread_id:thread_001}}input:State{nodes:[]}awaitagent.ainvoke(inputinput,configconfig)historylist(agent.get_state_history(configconfig))print(Printing the history:)forstateinhistory:print(f{state.metadata.get(step)}:{state.values})#type: ignoreprint(\nReplaying the history:)forstateinhistory:log.clear()awaitagent.ainvoke(inputNone,configstate.config)print(fReplayed{state.metadata.get(step)}:{log})#type: ignoreasyncio.run(main())为了支持基于Checkpointing的持久化我们在compile方法对StateGraph进行编译的时候指定一个InMemorySaver对象作为Checkpointer它会在每个Superstep完成的时候创建针对当前状态创建对应的Checkpoint并存储在内存中。由于Checkpointing是基于Thread ID进行持久化的所以我们在调用编译生成的Agent对象时在作为参数的RunnableConfig中指定了一个Thread ID。在Agent调用结束后我们调用它的get_state_history方法将表示历史状态的StateSnapshot列表收集起来。每个StateSnapshot都对应一个创建的Checkpoint并提供额外的元数据。我们将Checkpoint对应的Superstep编号和状态值所有通道值打印出来。接下来我们遍历这个StateSnapshot列表从StateSnapshot的config字段中提取出RunnableConfig并将其作为参数调用Agent对象其目的是从Checkpoint所在的地方开始执行。从如下的输出可以看出整个过程涉及6个Superstep序号从-1到4。foo节点在Superstep 1执行之后将自身的名称写入状态中bar、baz和qux节点依次执行最终在Superstep 4完成整个工作流的执行。与之对应如果我们从最初的两个Superstep编号分别为-1和0处恢复执行四个节点会依次执行。但是若从Superstep 1开始支持此时Checkpoint记录的是节点foo执行后的状态所以会从节点bar开始执行以此类推。The state history: 4:{nodes: [foo, bar, baz, qux]} 3:{nodes: [foo, bar, baz]} 2:{nodes: [foo, bar]} 1:{nodes: [foo]} 0:{nodes: []} -1:{nodes: []} Replaying the history: Replayed from checkpoints[4]:[] Replayed from checkpoints[3]:[qux] Replayed from checkpoints[2]:[baz, qux] Replayed from checkpoints[1]:[bar, baz, qux] Replayed from checkpoints[0]:[foo, bar, baz, qux] Replayed from checkpoints[-1]:[foo, bar, baz, qux]1.2 基于通道的CheckpointingLangGraph与MAF Workflow的最大不同之处在于状态图并不直接用于执行而是先将其编译成一个Actor模型并将整个状态拆分为具有不同类型的通道。整个Actor模型由无状态的节点和存储状态的通道组成状态图中节点之间的边转换成节点和通道之间的订阅关系。Actor模型的执行并非基于状态图定义的消息路由而是基于节点针对通道变更的订阅。由于Agent的状态完全集中在通道中所以成功完成的Superstep来说Checkpointing过程变得异常的简单只需要持久化通道的状态转换成Checkpoint就可以了。但是对因异常或者中断尚未完成的Superstep来说需要成功执行的节点针对通道写入意图存储起来这样才能既保证成功执行的节点不再重复执行又能保证它们针对通道的写入在当前Superstep中被正确的执行。我们将这种更新称为PendingWrite除了描述节点针对通道写入之外PendingWrite还可以描述如下的未决状态节点成功执行它们它的处理方法根本不涉及通道的写入节点在执行过程中抛出异常应该将异常信息记录下来节点在执行过程中被中断应该将中断信息记录下来从某个中断点处恢复执行应该将提供的ResumeValue记录下来。对于LangGraph基于Checkpointing机制来说被持久化的不仅仅是存储通道最终状态的Checkpoint还包括PendingWrite、元数据以及一些调用时采用的配置。持久化的数据基本上可以表示为如下这个名为CheckpointTuple的元组。classCheckpointTuple(NamedTuple):config:RunnableConfig checkpoint:Checkpoint metadata:CheckpointMetadata parent_config:RunnableConfig|NoneNonepending_writes:list[PendingWrite]|NoneNonePendingWritetuple[str,str,Any]LangGraph的Checkpointing基本上可以视为针对上面这个CheckpointTuple持久化以及如何从持久化的CheckpointTuple恢复现场。当前具体的实现远不止我们说的这么简单具体的机制可以参考我如下这几篇文章基于Checkpoint的持久化持久化状态的提取非常规Pending Write的持久化三种持久化模式的差异梳理Agent的执行流程回到过去开启平行世界-上篇回到过去开启平行世界-下篇2. MAF Worflow虽然MAF Workflow的Checkpointing持久化机制也是基于Superstep进行但是由于底层的执行引擎的差异导致持久化的实现方式与LangGraph有很大的不同。不过在具体介绍之前我们先来演示一个利用Checkpoint恢复执行的例子。2.1 从Checkpoint所在的地方开始执行前面我们利用LangGraph演示了如何从Checkpoint所在的地方开始执行下面我们利用MAF Workflow演示同样的功能。我们定义了辅助方法CreateExecutor它会根据提供的Executor的ID创建一个FunctionExecutorstringstring类型的Executor对象。该对象在执行的时候会将当前ID写入log中以利于跟踪每个节点的执行。我们调用此方法创建了foo、bar、baz和qux四个Executor并以Sequential模式将它们编排成按序执行的Workflow。usingMicrosoft.Agents.AI.Workflows;usingSystem.Diagnostics;Liststringlog[];varrandomnewRandom();varworkflowBuildWorkflow();varcheckpointManagerCheckpointManager.CreateInMemory();varrunawaitInProcessExecution.Default.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow,start);awaitrun.RunToCompletionAsync();varcheckpointsrun.Checkpoints;Debug.Assert(checkpoints.Count4);for(varindex0;index4;index){log.Clear();awaitrun.RestoreCheckpointAsync(checkpoints[index]);awaitrun.RunToCompletionAsync();Console.WriteLine($Restore from Checkpoints[{index}]: [{string.Join(,,log)}]);}WorkflowBuildWorkflow(){varfooCreateExecutor(Foo);varbarCreateExecutor(Bar);varbazCreateExecutor(Baz);varquxCreateExecutor(Qux);returnnewWorkflowBuilder(foo).AddEdge(source:foo,target:bar).AddEdge(source:bar,target:baz).AddEdge(source:baz,target:qux).Build();}ExecutorBindingCreateExecutor(stringid)newFuncstring,ValueTaskstring(asyncinput{log.Add(id);awaitTask.Delay(random.Next(100,500));returnid;}).BindAsExecutor(id);在以流的形式执行Workflow之前我们调用WithCheckpointing指定了一个通过调用CheckpointManager.CreateInMemory创建的CheckpointManager对象它会帮助我们创建Checkpoint并将其存储在内存中。Workflow执行完成后我们将StreamingRun对象的Checkpoints属性存储的CheckpointInfo收集起来。通过断言我们知道这里只有4个Checkpoint被创。按照一个Superstep一个Checkpoint的原则意味着这里只涉及4个SuperstepLangGraph涉及6个Superstep。我们遍历这个CheckpointInfo列表调用StreamingRun对象的RestoreCheckpointAsync方法将Workflow恢复到指定的Checkpoint所在的地方然后调用RunToCompletionAsync方法继续执行Workflow。通过打印log我们可以看到每个Checkpoint对应的Superstep编号和节点执行顺序。Restore from Checkpoints[0]: [Bar,Baz,Qux] Restore from Checkpoints[1]: [Baz,Qux] Restore from Checkpoints[2]: [Qux] Restore from Checkpoints[3]: []同样是第一个执行的节点foo在LangGraph中式在第三个Superstep中执行的编号为1而在MAF Workflow中是在第一个Superstep中执行的编号为0。当我们从第一个Checkpoint开始恢复执行时foo已经成功执行所以它不会再被执行bar、baz和qux依次执行以此类推。2.2 基于消息路由的Checkpointing由于LangGraph创建的Agent是以Actor模型的形式运行的并且所有的状态都集中在通道中所以它的Checkpointing机制主要围绕通道进行整个设计变得很简单。而MAF Workflow的采用消息路由的方法执行并将状态控制在IWorkflowContext上下文中所以它不仅需要持久化未处理的消息还需要持久化上下文中的状态。对于跨越多个Superstep的FanInEdge,它还将针对某个Superstep的中间状态记录下来。除此之外两者对Checkpoint这个对象的定义也不一样。LangGraph的Checkpoint对象主要是针对通道的状态进行持久化并利用PendingWrite来记录未决状态。而Checkpoint在MAF Workflow中的表示的时整个持久化的状态相当于我们CheckpointTuple元组。MAF将Checkpointing的细节全部隐藏了起来所以很多核心的类型都是internal类型其中就包括承载所有持久化信息的如下这个Checkpoint类型。internalsealedclassCheckpoint{publicboolIsInitialStepNumber-1;publicintStepNumber{get;}publicWorkflowInfoWorkflow{get;}publicRunnerStateDataRunnerData{get;}publicDictionaryScopeKey,PortableValueStateData{get;}newDictionaryScopeKey,PortableValue();publicDictionaryEdgeId,PortableValueEdgeStateData{get;}newDictionaryEdgeId,PortableValue();publicCheckpointInfo?Parent{get;}}属性成员说明如下StepNumberCheckpoint对应的Superstep编号Workflow描述Workflow的WorkflowInfo对象RunnerData描述Workflow执行器状态的RunnerStateData对象StateData一个字典Key是ScopeKey对象Value是PortableValue对象用于存储Workflow中不同Scope维度的状态数据EdgeStateData一个字典Key是EdgeId对象Value是PortableValue对象用于存储Workflow中不同Edge维度的状态数据Parent一个可选的CheckpointInfo对象指向上一个Checkpoint。关于Checkpoint对象以及MAF Workflow具体的Checkpointing机制的更多细节可以参考我如下这几篇文章Workflow基于Checkpointing的持久化关于Checkpointing的一个严重Bug

相关新闻