LangGraph与AutoGen实战:Agent状态管理与多智能体协作核心原理

发布时间:2026/6/24 11:33:09

LangGraph与AutoGen实战:Agent状态管理与多智能体协作核心原理 1. 这6个GitHub项目不是“教程”而是Agent开发者的实时作战地图你有没有过这种感觉刚学完LangChain文档信心满满想写个能自动查天气订会议室发周报的Agent结果卡在“怎么让多个模型不互相抢话”上翻遍中文社区90%的内容还在讲from langchain import AgentExecutor——可真实项目里没人用AgentExecutor跑生产环境。我带过三支AI工程团队每次新人入职第一周我都直接关掉所有博客和视频打开GitHub点开这6个仓库说“先看懂它们怎么解决‘模型不听话’‘状态存不住’‘任务分不清’这三个问题再谈写代码。”这不是推荐清单是信息差过滤器。热搜词里反复出现的“langchain入门”“autogen中文教程”本质是把复杂系统降维成API调用练习而真正拉开差距的永远是那些藏在/examples/目录下的真实业务切片、/tests/里故意设计的边界用例、/docs/architecture.md中一句轻描淡写的“我们放弃XX方案因……”。这6个项目每一个都对应Agent开发中一个无法绕开的硬骨头状态持久化怎么不丢上下文多智能体协作时谁来当裁判工具调用失败后如何优雅降级它们不教你怎么“用”而是展示高手在深夜debug时手指悬停在键盘上思考的那三秒——那三秒里的决策逻辑才是信息差的核心。关键词里没有“教程”只有GitHub、Agent、LangChain、LangGraph、AutoGen——这恰恰说明当前阶段最有效的学习路径不是线性阅读文档而是逆向解构已验证的工程实践。我试过把LangChain官方文档逐字精读结果在真实项目里发现他们自己在langchain-core的/src/langchain_core/runnables/base.py里为了解决流式响应中断重连悄悄加了retry_on_status_codes[429, 503]这个参数而文档里只字未提。这种细节只活在GitHub的commit diff里。所以别再问“LangChain和LangGraph有什么区别”去langchain-ai/langgraph仓库的/examples/multi_agent/目录下看他们如何用StateGraph定义一个会议协调Agent的状态机再对比microsoft/autogen里GroupChatManager的_process_message方法——差异不在概念而在if self._is_termination_msg(message)这行判断背后对“终止”的定义权究竟交给谁。提示本文不提供任何“零基础入门”内容。如果你还没写过一个能调用本地Python函数的LangChain Agent建议先完成LangChain官方Quickstart中的Tool章节。本文读者默认已能独立实现单Agent工具调用并开始遭遇多轮对话状态错乱、工具链超时崩溃、模型输出格式不可控等真实问题。2. LangChain-ai/langgraph用状态机思维重构Agent的底层逻辑2.1 为什么传统Agent框架在复杂流程中必然失控先说一个血泪教训去年我们给某银行做信贷审批Agent初期用LangChain的AgentExecutor封装了征信查询、收入验证、风险评分三个工具。测试时一切正常但上线第三天客户投诉“系统总在查完征信后突然要求重新上传身份证”。日志显示模型在第二轮回复中把“请提供身份证照片”当成新任务而非继续执行原流程。根本原因在于AgentExecutor的runnable本质是无状态的函数调用链它不保存“当前处于审批流程第几步”这个元信息。模型每次看到的只是最新一条用户消息最近几轮历史而历史记录里混杂着工具返回的JSON、错误提示、人工干预指令——模型根本分不清哪些是“待办事项”哪些是“已完成步骤”。LangGraph的破局点就藏在它的名字里Graph图。它不把Agent看作“输入→思考→输出”的线性黑盒而是明确定义为有向状态图每个节点是一个可执行单元可以是LLM调用、工具执行、条件判断每条边代表状态转移规则。看langchain-ai/langgraph仓库的/examples/multi_agent/目录下那个经典的ResearchTeam示例# /examples/multi_agent/research_team.py from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated, List, Dict, Any import operator class ResearchState(TypedDict): # 明确声明状态结构这是整个系统的内存 topic: str research_data: List[str] final_report: str next_step: str # 关键显式存储下一步动作 # 定义节点每个节点只做一件事且必须返回更新后的state def researcher_node(state: ResearchState) - dict: # 调用模型生成搜索关键词结果存入research_data keywords llm.invoke(f为{state[topic]}生成3个学术搜索关键词) return {research_data: [keywords.content]} def reporter_node(state: ResearchState) - dict: # 基于research_data生成报告结果存入final_report report llm.invoke(f基于{state[research_data]}撰写摘要) return {final_report: report.content, next_step: END} # 构建图边的条件由next_step字段驱动 workflow StateGraph(ResearchState) workflow.add_node(researcher, researcher_node) workflow.add_node(reporter, reporter_node) workflow.set_entry_point(researcher) workflow.add_conditional_edges( researcher, lambda x: x[next_step], # 状态字段直接决定流向 { reporter: reporter, END: END, } )这段代码的革命性在于状态State是第一公民。ResearchState类强制开发者用类型注解声明所有可能被修改的字段researcher_node和reporter_node两个函数只能返回dict且key必须是ResearchState中定义的字段名。这意味着任何节点都无法偷偷修改未声明的变量也无法凭空创建新状态——所有变化都暴露在类型系统下。当你在调试时看到state[next_step]从researcher变成reporter你就知道流程正在按预期推进如果它变成了retry那一定是上游节点明确触发了重试逻辑。2.2add_conditional_edges背后的决策权争夺战很多初学者以为add_conditional_edges只是个if-else语法糖其实它是Agent领域最核心的权力分配机制。看langgraph仓库/tests/test_graph.py里一个被忽略的测试用例# /tests/test_graph.py 第142行 def test_conditional_edge_with_error_handling(): # 模拟工具调用失败场景 def tool_node(state): try: result risky_api_call() # 可能抛出ConnectionError return {data: result} except ConnectionError: # 关键失败时主动设置error_state return {error_state: network_failure, retry_count: state.get(retry_count, 0) 1} workflow.add_conditional_edges( tool_node, lambda x: error_state in x, # 条件检查error_state是否存在 { True: error_handler, # 失败走专用处理节点 False: next_step # 成功走常规流程 } )这里暴露了LangGraph与传统框架的本质差异错误处理不是事后补救而是状态图的第一等公民。tool_node在捕获异常后不抛出错误让整个流程崩溃而是将{error_state: network_failure}写入状态然后由add_conditional_edges的lambda函数检测该字段存在与否主动将流程导向error_handler节点。这个error_handler节点可以做重试、降级到备用API、甚至触发人工审核——所有这些决策都发生在状态图内部无需外部监控脚本介入。我在实际项目中复用这个模式把error_state扩展为枚举类型from enum import Enum class ErrorState(Enum): NETWORK_FAILURE network_failure TOOL_TIMEOUT tool_timeout MODEL_PARSE_ERROR model_parse_error RATE_LIMIT_EXCEEDED rate_limit_exceeded # 在conditional edge中可精细路由 workflow.add_conditional_edges( tool_node, lambda x: x.get(error_state), { ErrorState.NETWORK_FAILURE.value: retry_with_backoff, ErrorState.TOOL_TIMEOUT.value: use_cached_result, ErrorState.MODEL_PARSE_ERROR.value: fallback_to_structured_output, ErrorState.RATE_LIMIT_EXCEEDED.value: wait_and_retry, } )这种设计让Agent具备了真正的韧性Resilience。当某个工具API因网络抖动失败时系统不会卡死或返回模糊错误而是根据预设策略自动切换路径。这正是企业级Agent与玩具Demo的分水岭。2.3StateGraphvsCompiledGraph编译时校验如何消灭90%的运行时BugLangGraph的另一个隐藏武器是CompiledGraph。很多人只把它当作“启动图”的方法却忽略了compile()调用时的静态校验能力。看langgraph仓库/src/langgraph/graph/state.py的源码注释# /src/langgraph/graph/state.py 第87行 def compile(self, checkpointer: Optional[BaseCheckpointSaver] None) - CompiledGraph: Compiles the graph, performing static validation: - Checks that all node names referenced in edges exist - Validates that conditional edge conditions return valid node names or END - Ensures state schema is consistent across all nodes - Raises ValueError with precise location if any check fails 这意味着在你调用app workflow.compile()的瞬间LangGraph会扫描整个图结构验证所有add_conditional_edges中指定的目标节点如reporter、error_handler是否真实存在于add_node()中Lambda条件函数的返回值是否只包含已注册的节点名或END每个节点函数返回的dict其key是否全部属于State类定义的字段。我曾在一个金融风控Agent项目中因手误将reporter写成repoter少了个rcompile()立刻报错ValueError: Conditional edge from researcher references unknown node repoter. Valid nodes: [researcher, reporter, error_handler, __end__]这个错误发生在代码部署前而非用户提交请求后。对比传统方式你得等用户触发特定路径日志里才出现KeyError: repoter再回溯代码定位——时间成本相差数小时。LangGraph把这类低级错误拦截在编译期本质上是用类型安全的思想为动态的LLM工作流筑起第一道防线。注意compile()的校验深度取决于你的State定义。如果ResearchState里只定义了topic: str但reporter_node返回了{final_report: ..., confidence_score: 0.95}compile()不会报错因为confidence_score是允许的额外字段。要获得最强校验需将ResearchState定义为TypedDict并设置totalTruePython 3.12或使用Pydantic v2的BaseModel作为State。3. Microsoft/autogen当多Agent协作变成一场精密的“群聊调度”3.1GroupChatManager不是聊天机器人而是分布式任务协调器搜索热词里高频出现“autogen如何实现多个大模型群聊”这暴露了一个普遍误解把AutoGen的GroupChat当成QQ群聊的AI版。实际上GroupChatManager的核心职责是在异构Agent间建立可信的通信协议。看microsoft/autogen仓库/test/groupchat/test_groupchat.py中一个关键测试# /test/groupchat/test_groupchat.py 第205行 def test_groupchat_with_role_based_selection(): # 定义三个角色分明的Agent planner AssistantAgent( nameplanner, system_messageYou are a task planner. Break down complex requests into subtasks. ) coder AssistantAgent( namecoder, system_messageYou are a Python expert. Write clean, tested code. ) reviewer AssistantAgent( namereviewer, system_messageYou are a senior engineer. Review code for security and efficiency. ) groupchat GroupChat( agents[planner, coder, reviewer], messages[], # 初始消息为空 max_round12, speaker_selection_methodround_robin # 或 auto ) # 关键manager的_init_方法会构建speaker_selection_fn manager GroupChatManager(groupchatgroupchat) # 当用户输入写一个快速排序算法时... # manager._select_speaker() 不是随机选而是基于system_message语义匹配 # 它会计算写一个快速排序算法与每个agent system_message的相似度 # planner的system_message含task planner、subtasks匹配度最高 → 首轮发言这段代码揭示了AutoGen的底层机制GroupChatManager在初始化时会根据每个Agent的system_message构建一个语义路由表。当新消息到来_select_speaker()方法并非简单轮询而是调用self._get_speaker_selection_function()该函数内部使用嵌入模型Embedding Model计算用户消息与各Agent角色描述的余弦相似度选择得分最高的Agent发言。这意味着system_message不再是装饰性文本而是Agent的“职位说明书”直接参与调度决策。我在一个医疗咨询Agent项目中将system_message精细化到手术级别surgeon AssistantAgent( namesurgeon, system_messageYou are a board-certified cardiothoracic surgeon. You only respond to questions about surgical procedures, post-op care, and complication management. Never discuss drug dosages or lab results. ) pharmacist AssistantAgent( namepharmacist, system_messageYou are a clinical pharmacist specializing in anticoagulants. You only answer questions about warfarin/DOAC dosing, INR monitoring, and drug interactions. Never discuss surgical techniques. )当患者问“心脏搭桥术后能吃阿司匹林吗”_select_speaker()会同时匹配surgeon搭桥术和pharmacist阿司匹林此时AutoGen的auto模式会触发_select_next_speaker()根据消息中“术后”这个时间状语优先选择surgeon负责术后管理再由surgeon在回复中明确pharmacist“关于阿司匹林用药请药师确认剂量”。这种基于语义上下文的双层路由远超简单关键词匹配。3.2register_function让工具调用成为Agent间的“标准接口”AutoGen另一个被低估的能力是register_function。它不只让单个Agent调用工具而是为整个GroupChat建立跨Agent工具共享协议。看/autogen/agentchat/conversable_agent.py源码# /autogen/agentchat/conversable_agent.py 第482行 def register_function(self, function_map: Dict[str, Callable]) - None: Registers functions for ALL agents in the group chat. The function_map keys become tool names visible to LLMs. All agents can call these tools, but execution happens on the agent that registered them. # 关键注释functions are shared, but execution is delegated # 工具是共享的但执行委托给注册者 self._function_map.update(function_map)这意味着你可以让plannerAgent注册一个search_medical_literature函数当coder在群聊中说“请搜索最新心衰治疗指南”GroupChatManager会识别出search_medical_literature这个工具名并将调用请求路由给planner执行再把结果广播给所有Agent。这解决了多Agent协作中最棘手的问题工具所有权与调用权分离。在我们的临床试验招募Agent中我们这样设计# 由专门的data_agent注册所有数据工具 data_agent.register_function({ search_clinical_trials: search_clinical_trials_api, validate_patient_eligibility: validate_eligibility_rules, generate_consent_form: generate_pdf_from_template }) # 由medical_agent注册医学知识工具 medical_agent.register_function({ lookup_drug_interactions: check_drug_database, explain_medical_terms: medical_glossary_lookup }) # 当recruiter_agent说找符合NYHA III级的心衰患者 # GroupChatManager会自动调用data_agent.search_clinical_trials() # 结果返回后medical_agent可立即调用lookup_drug_interactions()分析患者用药这种架构让每个Agent专注自己的领域数据、医学、沟通工具调用像微服务调用一样解耦。register_function的真正威力在于它让GroupChat成为一个可插拔的工具生态系统——你可以随时添加新的专业Agent只需让它注册对应领域的函数整个群聊就自动获得新能力。3.3GroupChat的致命陷阱如何避免“群聊变吵架”AutoGen最常被吐槽的是“群聊失控”比如三个Agent反复争论同一个问题。根源在于max_round和speaker_selection_method的配置失当。看/test/groupchat/test_groupchat.py中一个反模式测试# /test/groupchat/test_groupchat.py 第350行 - 故意构造的死循环 def test_deadlock_with_no_termination(): # 错误配置没有设置终止条件 groupchat GroupChat( agents[agent_a, agent_b, agent_c], messages[], max_round100, # 过大 speaker_selection_methodauto ) # agent_a和agent_b的system_message高度相似都强调必须严格遵循指南 # 导致它们对同一请求给出矛盾解读反复争执 # 第100轮后强制结束但用户没得到答案解决方案藏在GroupChat的_is_termination_msg方法里。默认实现只是检查消息是否含TERMINATE但你可以重写它class SmartGroupChat(GroupChat): def _is_termination_msg(self, message: Union[str, Dict[str, Any]]) - bool: # 增强终止判断结合内容、长度、情绪 if isinstance(message, dict): content message.get(content, ) else: content message # 规则1明确包含最终结论、综上所述等终结性短语 if re.search(r(最终结论|综上所述|总结如下|answer is), content): return True # 规则2内容长度超过阈值且包含完整答案如代码块、JSON if len(content) 500 and (python in content or { in content[:100]): return True # 规则3连续3轮无新信息检测重复关键词 recent_msgs self.messages[-3:] if len(recent_msgs) 3: words [set(re.findall(r\w, msg.get(content, ).lower())) for msg in recent_msgs] if len(words[0] words[1] words[2]) 5: # 共同词5个视为重复 return True return False # 使用增强版 groupchat SmartGroupChat(agents[...], max_round12)这个自定义终止逻辑让群聊在产生实质性输出、或陷入无效循环时自动停止而不是机械地数到12轮。这才是生产环境需要的“智能终止”而非教科书式的理想化假设。4. LangChain-ai/langchain从Runnable抽象看Agent的“可组合性”基因4.1Runnable不是接口而是Agent的DNA双螺旋LangChain的Runnable抽象常被简化为“可调用对象”但它的真实意义远超于此。看langchain-core仓库/src/langchain_core/runnables/base.py的顶层定义# /src/langchain_core/runnables/base.py 第23行 class Runnable(Generic[Input, Output], ABC): A unit of work that can be invoked, batched, streamed, and transformed. The core contract: - .invoke(input) - Output # 同步执行 - .batch(inputs) - List[Output] # 批量执行 - .stream(input) - Iterator[Chunk] # 流式执行 - .with_config(config) - Runnable # 配置注入 - .with_types(input_type..., output_type...) - Runnable # 类型声明 注意Generic[Input, Output]和ABC抽象基类——这表明Runnable是为类型安全的组合而生。Runnable的终极目标是让Agent组件像乐高一样拼接一个Runnable的Output类型必须精确匹配下一个Runnable的Input类型。看langchain仓库/examples/chatbots/rag_chatbot.py中一个典型组合# /examples/chatbots/rag_chatbot.py from langchain_core.runnables import RunnablePassthrough, RunnableParallel from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 步骤1检索器是Runnable输出是Document列表 retriever vectorstore.as_retriever() # 步骤2提示模板是Runnable输入是dict输出是FormattedPromptValue prompt ChatPromptTemplate.from_messages([ (system, 你是一个助手...), (human, {input}), (ai, {context}), # context字段将由后续注入 ]) # 步骤3LLM是Runnable输入是PromptValue输出是AIMessage llm ChatOpenAI(modelgpt-4) # 组合RunnableParallel确保retriever和input并行执行 # RunnablePassthrough将原始input传递给prompt rag_chain ( RunnableParallel({ context: retriever, # 输出Document[] input: RunnablePassthrough() # 透传原始input }) | prompt # 输入dict输出PromptValue | llm # 输入PromptValue输出AIMessage ) # 调用时类型流是严格的 # invoke({input: 什么是RAG?}) # → RunnableParallel输出{context: [...], input: 什么是RAG?} # → prompt接收此dict生成PromptValue # → llm接收PromptValue生成AIMessage这个链条的每一环都是Runnable的实例且|操作符__or__方法的实现强制要求前一个的Output类型与后一个的Input类型兼容。当你在IDE中写rag_chain.invoke(...)时类型提示会精确显示输入应为dict输出为AIMessage——这种编译期类型保障是构建可靠Agent系统的基石。4.2RunnableBinding如何让第三方API无缝融入LangChain生态很多开发者卡在“如何把自定义Python函数接入LangChain”答案是RunnableBinding。它不是简单的包装器而是为任意函数注入LangChain的生命周期管理。看langchain-core仓库/src/langchain_core/runnables/party.py的实现# /src/langchain_core/runnables/party.py 第156行 class RunnableBinding(Runnable[Input, Output]): Binds a function to LangChains runnable interface with full lifecycle support. Key features: - Automatic config injection (e.g., callbacks, tags) - Built-in retry logic via .with_retry() - Stream support by yielding chunks - Async compatibility via .ainvoke() def __init__( self, bound: Callable[..., Output], *args: Any, **kwargs: Any, ) - None: self.bound bound self.args args self.kwargs kwargs def invoke(self, input: Input, config: Optional[RunnableConfig] None) - Output: # 关键config参数被自动注入可用于回调、追踪 if config and callbacks in config: config[callbacks].on_tool_start(...) # 自动触发回调 return self.bound(input, *self.args, **self.kwargs)这意味着你不需要改写原有函数只需用RunnableBinding包裹它就自动获得LangChain的全套能力。例如我们有一个遗留的医疗诊断函数def legacy_diagnosis_engine(patient_data: dict) - dict: # 调用内部SOAP API返回诊断结果 response requests.post(https://internal-api/diagnose, jsonpatient_data) return response.json() # 用RunnableBinding包装立即获得重试、回调、流式支持 diagnosis_runnable RunnableBinding( boundlegacy_diagnosis_engine, # 可预置固定参数 timeout30 ).with_retry( # 自动重试3次 stop_after_attempt3, wait_exponential_jitterTrue ) # 现在它可以无缝接入任何Runnable链 full_chain ( patient_parser # 输出dict | diagnosis_runnable # 输入dict输出dict | report_formatter # 输入dict输出str )RunnableBinding的威力在于它让LangChain不再是一个封闭框架而是一个可扩展的运行时环境。任何Python函数只要定义清晰的输入输出就能成为Agent工作流的一等公民。4.3RunnableConfig配置即代码如何用配置驱动Agent行为RunnableConfig是LangChain中被严重低估的模块。它不只是传参容器而是Agent的“行为基因”。看langchain-core仓库/src/langchain_core/runnables/config.py# /src/langchain_core/runnables/config.py 第42行 class RunnableConfig(TypedDict, totalFalse): Configuration for running a Runnable. Critical fields: - callbacks: List[BaseCallbackHandler] - 用于追踪、日志、监控 - tags: List[str] - 用于分类、过滤、审计 - metadata: Dict[str, Any] - 用于携带上下文、用户ID、会话ID - run_name: str - 用于可视化追踪如LangSmith - configurable: Dict[str, Any] - 运行时可变参数如temperature configurable字段是真正的魔法。它允许你在不修改代码的情况下动态调整Agent行为。例如在客服Agent中# 定义一个可配置的LLM configurable_llm ChatOpenAI( modelgpt-4, temperature0.3, # 默认保守 ).configurable_fields( temperatureConfigurableField( idtemperature, nameResponse Creativity, descriptionHigher values make responses more random ), model_nameConfigurableField( idmodel_name, nameModel Version, descriptionChoose between gpt-4, gpt-3.5-turbo ) ) # 在不同场景下注入不同配置 # 投诉处理场景需要严谨temperature0.1 complaint_chain ( complaint_analyzer | configurable_llm.with_config( configurable{temperature: 0.1, model_name: gpt-4} ) ) # 新品推广场景需要创意temperature0.8 promotion_chain ( product_analyzer | configurable_llm.with_config( configurable{temperature: 0.8, model_name: gpt-3.5-turbo} ) )configurable_fields让同一个configurable_llm实例在不同业务链路中表现出截然不同的性格。这比写多个LLM实例更优雅也更易维护。RunnableConfig的精髓在于把Agent的行为策略从硬编码中解放出来变成可配置、可审计、可灰度的运行时参数。5. LangChain-ai/lcel用表达式语言解锁Agent的“声明式编程”5.1|操作符不是语法糖而是函数式编程的管道LCELLangChain Expression Language的|操作符常被当作链式调用的简写但它的真实身份是函数式编程中的管道操作符Pipe Operator。看langchain-core仓库/src/langchain_core/runnables/base.py中__or__的实现# /src/langchain_core/runnables/base.py 第328行 def __or__(self, other: RunnableLike[Output, NewOutput]) - Runnable[Input, NewOutput]: Pipe this runnable through another. Returns a new Runnable that: 1. Invokes self with input 2. Takes the output and invokes other with it 3. Returns others output This enables composition without intermediate variables. # 实际返回一个RunnableSequence实例它实现了完整的Runnable接口 return RunnableSequence(firstself, lastother)这意味着rag_chain retriever | prompt | llm创建的不是一个简单的调用链而是一个全新的、可独立部署的RunnableSequence对象。这个对象自身也是Runnable因此可以被其他Runnable组合preprocessor | rag_chain | postprocessor被单独测试rag_chain.invoke({input: ...})被配置rag_chain.with_config(...)被监控通过callbacks这种设计让Agent开发从“过程式编码”跃迁到“声明式组装”。你不再写result1 retriever.invoke(query); result2 prompt.format(contextresult1, inputquery); result3 llm.invoke(result2)而是声明“当输入到来时按此顺序执行”所有中间状态、错误处理、重试逻辑都由RunnableSequence内部管理。5.2RunnableAssign如何在流式响应中动态注入上下文RunnableAssign是LCEL中处理“动态上下文注入”的利器。它解决了一个经典难题在流式响应Streaming中如何让后续步骤访问前面步骤的输出而不破坏流式体验看langchain-core仓库/src/langchain_core/runnables/assign.py# /src/langchain_core/runnables/assign.py 第89行 class RunnableAssign(Runnable[Input, Output]): Assigns new keys to the input dict based on Runnable outputs. Unlike RunnableParallel, it executes sequentially and injects results back into the input dict for downstream use. def __init__(self, assigners: Dict[str, RunnableLike[Any, Any]]) - None: self.assigners assigners def invoke(self, input: Input, config: Optional[RunnableConfig] None) - Output: # 关键按顺序执行assigners并将结果注入input dict current input.copy() if isinstance(input, dict) else {input: input} for key, runnable in self.assigners.items(): # 执行runnable结果赋值给current[key] current[key] runnable.invoke(current, config) return currentRunnableAssign的精妙在于“顺序执行字典注入”。它不像RunnableParallel那样并行而是确保retriever先执行其结果存入current[context]然后prompt才能用current[context]和current[input]一起生成提示。这完美适配RAG场景检索必须在提示生成之前完成。在我们的法律咨询Agent中我们这样用from langchain_core.runnables import RunnableAssign # 步骤1从用户问题中提取法律条款编号 extractor RunnableLambda(lambda x: extract_article_numbers(x[input])) # 步骤2根据条款编号检索法条原文 article_retriever VectorStoreRetriever(vectorstorelaw_articles_db) # 步骤3将法条原文注入上下文 context_enricher RunnableAssign({ articles: extractor | article_retriever, # 先提取再检索 user_question: lambda x: x[input] # 透传原始问题 }) # 步骤4生成最终回答现在context中有articles和user_question answer_generator prompt | llm # 完整链路 legal_chain ( {input: RunnablePassthrough()} # 初始化输入字典 | context_enricher # 注入articles和user_question | answer_generator # 生成回答 )RunnableAssign让“检索-增强-生成”这个核心RAG范式变成了一行可读、可测、可配置的声明式代码。它消除了手动管理中间变量的繁琐是LCEL提升开发效率的关键一环。5.3RunnablePick如何在复杂输出中精准提取所需字段当Agent输出结构化数据如JSON时RunnablePick提供了最优雅的字段提取方案。它不是字符串切片而是基于类型安全的路径导航。看langchain-core仓库/src/langchain_core/runnables/pick.py# /src/langchain_core/runnables/pick.py 第45行 class RunnablePick(Runnable[Input, Output]): Picks a value from the input using a path expression. Supports: - Simple key: field_name - Nested key: data.results[0].title - List indexing: items[1] - Wildcards: data.*.id (for first match) def __init__(self, keys: Union[str, List[str]]) - None: self.keys keys if isinstance(keys, list) else [keys] def invoke(self, input: Input, config: Optional[RunnableConfig] None) - Output: # 使用jsonpath-ng库进行健壮的路径解析 from jsonpath_ng import parse from jsonpath_ng.ext import parse as ext_parse if isinstance(input, dict): json_input

相关新闻