[拆解LangChain执行引擎]三种持久化模式的差异

发布时间:2026/5/19 22:31:54

[拆解LangChain执行引擎]三种持久化模式的差异 . Checkpoint持久化接下来通过几个简单的实例演示来进一步加强对上述三种持久化模式的理解。为了确定Checkpoint持久化的时机我们定义了如下这个派生于InMemorySaver的ExtendedInMemorySaver类。重写的方法在返回基类的同名方法的调用结果前模拟了一秒的演示并做了相应的输出。from langgraph.checkpoint.memory import InMemorySaver import time from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersions class ExtendedInMemorySaver(InMemorySaver): def put( self, config: RunnableConfig, checkpoint: Checkpoint, metadata: CheckpointMetadata, new_versions: ChannelVersions, ) - RunnableConfig: time.sleep(1) # Simulate some delay print(fput called with checkpoint for step {metadata[step]}) return super().put(config, checkpoint, metadata, new_versions)我们构建了如下这个由四个Node组成的Pregel它的Checkpointer使用的正是上面这个ExtendedInMemorySaver。我们通过写入通道foo驱动节点foo1和foo2并行执行foo1和foo2在执行结束分别写入对应的Channel驱动bar1和bar2执行。为了确定Node执行的时机我们也在对应的处理函数中做了相应的输出。from langgraph.checkpoint.memory import InMemorySaver from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue, BinaryOperatorAggregate import operator,time from functools import partial from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersions def handle(node: str, arg: dict): print(fnode {node} is called.) return [node] foo1 (NodeBuilder() .subscribe_to(foo,read False) .do(partial(handle, foo1)) .write_to(bar1) ) foo2 (NodeBuilder() .subscribe_to(foo,read False) .do(partial(handle, foo2)) .write_to(bar2) ) bar1 (NodeBuilder() .subscribe_to(bar1,read False) .do(partial(handle, bar1)) .write_to(output) ) bar2 (NodeBuilder() .subscribe_to(bar2,read False) .do(partial(handle, bar2)) .write_to(output) ) app Pregel( nodes{foo1: foo1, foo2: foo2, bar1: bar1, bar2: bar2}, channels{ foo: LastValue(str), bar1: LastValue(str), bar2: LastValue(str), output: BinaryOperatorAggregate(list, operator.add), }, input_channels[foo], output_channels[output], checkpointerExtendedInMemorySaver(), ) config {configurable: {thread_id: 123}} result app.invoke(input{foo: start}, configconfig, durabilitysync) assert result[output] [bar1, bar2]我们调用Pregel对象的invoke方法时显式地将durability参数设置为sync。从如下所示的输出可以看出当节点foo1和foo2完成执行后对应Superstep 0的Checkpoint被持久化之后Superstep 1中的bar1和bar2才开始执行。输出结果还反映了另一个现象虽然我们采用了sync持久化模式但是针对Superstep -1针对原始输入的持久化并不能保证在Superstep 0最先驱动的节点foo1和foo2执行所在的Superstep开始之前完成。node foo1 is called. node foo2 is called. put called with checkpoint for step -1 put called with checkpoint for step 0 node bar1 is called. node bar2 is called. put called with checkpoint for step 1如下所示的是采用async持久化模式的输出结果可以看出Superstep 1针对节点bar1和bar2的执行和针对Superstep 0的基于Checkpoint持久化是同步进行的。由于put方法模拟了1秒的延时所以持久化最后才结束。node foo1 is called. node foo2 is called. node bar1 is called. node bar2 is called. put called with checkpoint for step -1 put called with checkpoint for step 0 put called with checkpoint for step 1如果将持久化模式设置为exit将会产生如下的输出结果。可以看出这种模式仅在整个调用结束后对Checkpoint作一次持久化。node foo1 is called. node foo2 is called. node bar1 is called. node bar2 is called. put called with checkpoint for step 22. Pending Write持久化持久化包括在Superstep完成后针对Checkpoint的持久化和Superstep过程中针对Pending Write的持久化但是sync和async持久化模式对后者没有任何区别当Node执行结束或者遇到中断都会针对当前产生的Pending Write作及时的持久化。为了确认我们的想法我们修改了ExtendedInMemorySaver按照如下的方式重写了put_writes方法。class ExtendedInMemorySaver(InMemorySaver): def put_writes( self, config: RunnableConfig, writes: Sequence[tuple[str, Any]], task_id: str, task_path: str , ) - None: time.sleep(1) # Simulate some delay print(fput_writes called with writes: {writes})即使我们采用sync持久化模式针对四个Node任务的Pending Write都是以异步方式执行的所以会产生如下的输出结果。node foo1 is called. node foo2 is called. node bar1 is called. node bar2 is called. put_writes called with writes: deque([(bar2, [foo2])]) put_writes called with writes: deque([(bar1, [foo1])]) put_writes called with writes: deque([(output, [bar1])]) put_writes called with writes: deque([(output, [bar2])])但是如果持久化模式设置成exit在不产生中断的情况下不会有任何的Pending Write被持久化输出将会是如下的结果。node foo1 is called. node foo2 is called. node bar1 is called. node bar2 is called.为了模拟exit持久化模式下的中断我们修改了四个Node最终调用的handle方法让它在节点bar1中模拟一个人为中断。def handle(node: str, arg: dict): print(fnode {node} is called.) if node bar1: interrupt(manual interrupt) return [node]虽然遇到中断的bar2是在Superstep 1中执行的但是整个过程中没有任何一个Checkpoint被持久化这种情况下不得不对整个过程实施回滚。此时会写入如下所示的四个Pending Write除了针对bar1的中断类型的Pending Write其他三个都是针对成功执行任务的Channel写入。node foo1 is called. node foo2 is called. node bar1 is called. node bar2 is called. put_writes called with writes: deque([(bar1, [foo1])]) put_writes called with writes: deque([(bar2, [foo2])]) put_writes called with writes: deque([(output, [bar2])]) put_writes called with writes: [(__interrupt__, (Interrupt(valuemanual interrupt, idf10d2458e1d1ff38c6b55d008907af52),))]

相关新闻