LangChain 中间件(Middleware)

发布时间:2026/5/26 16:07:12

LangChain 中间件(Middleware) 一、简介LangChain 1.0 引入了正式的中间件Middleware 概念提供了一套声明式、可组合的 API用于在 Runnable 执行流程中插入横切逻辑。与早期版本中依赖 RunnableLambda、回调或手动包装不同1.0 的中间件是一等公民通过 middleware 装饰器和 Middleware 类能够以更简洁、类型安全的方式实现日志、重试、限流、缓存、输入验证等功能。1.1 为什么需要中间件Agent 具备自主推理、多步工具调用能力但也带来三大核心问题不可控性无法干预模型何时调用工具、如何生成提示词安全风险敏感操作无审核、隐私数据泄露、幻觉输出资源浪费Token 超限、重复调用、死循环、成本失控中间件的核心价值流程拦截与修改输入 / 输出 / 状态动态提示词管理、上下文压缩工具权限控制、敏感操作审核日志、监控、成本统计重试、降级、限流、缓存安全护栏PII 脱敏、内容审核直接在每个链或 Agent 中重复实现这些功能会导致代码冗余。中间件模式允许将这些横切逻辑独立封装并以声明方式应用到多个组件上。1.2 核心概念在 LangChain 1.0 中所有可执行组件模型、提示模板、解析器、Agent、链等都实现了 Runnable 接口。中间件本质上是一个 Runnable 包装器它接受一个 Runnable 并返回一个新的 Runnable在调用前后插入自定义行为。与旧版的主要区别方面1.0 之前的通用模式1.0 正式中间件 API定义方式RunnableLambda、继承 RunnableBindingmiddleware 装饰器或 Middleware 类组合手动管道 |MiddlewareChain 或自动组合配置传递通过 RunnableConfig 手动处理内置支持 config 参数和元数据错误处理自定义 try/except中间件可统一捕获异常并重试/降级官方支持无标准模式LangChain Core 原生支持二、核心 APImiddleware 装饰器LangChain 1.0 提供了 langchain_core.runnables.middleware 模块其中核心是 middleware 装饰器。2.1 基础用法middleware 装饰器既可以用于异步函数也可以用于同步函数。fromlangchain_core.runnables.middlewareimportmiddlewarefromlangchain_core.runnablesimportRunnableConfigfromlangchain_openaiimportChatOpenAImiddlewareasyncdeflog_middleware(request,call_next):记录输入和输出print(fInput:{request[input]})responseawaitcall_next(request)# 调用下层 Runnableprint(fOutput:{response})returnresponse modelChatOpenAI()wrapped_modellog_middleware(model)# 直接包装# 调用resultawaitwrapped_model.ainvoke(Hello)request包含 input用户输入、configRunnableConfig、kwargs 等字段的字典。call_next异步函数调用下一个中间件或最终的 Runnable。2.2 带参数的中间件工厂如果需要配置中间件如重试次数可以编写返回中间件装饰器的函数defretry_middleware(max_retries:int3):middlewareasyncdef_retry(request,call_next):last_exceptionNoneforattemptinrange(max_retries):try:returnawaitcall_next(request)exceptExceptionase:last_exceptioneprint(fAttempt{attempt1}failed:{e})raiselast_exceptionreturn_retry modelChatOpenAI()wrappedretry_middleware(5)(model)2.3 使用 Middleware 类面向对象方式除了装饰器还可以继承 Middleware 类实现更复杂的状态管理。fromlangchain_core.runnables.middlewareimportMiddlewareclassLoggingMiddleware(Middleware):asyncdef__call__(self,request,call_next):print(Before)responseawaitcall_next(request)print(After)returnresponse modelChatOpenAI()wrappedLoggingMiddleware()(model)2.4 组合多个中间件MiddlewareChain使用 MiddlewareChain 可以将多个中间件按顺序应用到同一个 Runnable 上。fromlangchain_core.runnables.middlewareimportMiddlewareChain# 假设已有 log_middleware, retry_middleware, cache_middlewaremiddlewares[log_middleware,retry_middleware(2),cache_middleware]modelChatOpenAI()wrappedMiddlewareChain(middlewaresmiddlewares,runnablemodel)三、核心执行流程与钩子3.1 执行流程中间件的执行基于洋葱模型类似 Express.js 或 Starlette遵循“后进先出”的顺序。当你添加多个中间件时请求会依次穿过它们到达核心的 LLM 调用然后响应再反向穿过这些中间件。请求 → Middleware A(pre)→ Middleware B(pre)→ 原始 LLM 调用 → Middleware B(post)→ Middleware A(post)→ 响应前处理before修改输入、校验、过滤、增强 Prompt后处理after解析输出、脱敏、日志、格式化包裹wrap完全接管执行可重试、短路、缓存、替换模型3.2 核心钩子Hooks节点式钩子顺序执行钩子执行时机入参返回值典型用途before_agentAgent 启动前单次inputs: dictdict初始化状态、验证输入、加载记忆before_model每次模型调用前messages: List[BaseMessage]List[BaseMessage]裁剪消息、动态提示、注入上下文after_model模型返回后response: BaseMessageBaseMessage审查输出、统计 Token、修改响应after_agentAgent 结束后单次output: dictdict清理资源、生成报告、格式化结果包装式钩子洋葱嵌套钩子作用对象入参返回值典型用途wrap_model_call模型调用model_call: Callable, messagesBaseMessage重试、缓存、限流、模型降级wrap_tool_call工具执行tool_call: Callable, tool_inputAny权限校验、超时、审计、脱敏3.3 更细粒度的钩子示例fromlangchain.middlewareimportBaseMiddlewarefromlangchain_core.messagesimportHumanMessageclassCustomMiddleware(BaseMiddleware):asyncdefpre_process(self,request):# 修改输入统一转为字符串ifisinstance(request.input,dict):request.inputstr(request.input)returnrequestasyncdefwrap_llm_call(self,handler):# 对 LLM 调用增加重试逻辑forattemptinrange(3):try:returnawaithandler()# 实际调用 LLMexceptExceptionase:ifattempt2:raisereturnNoneasyncdefpost_process(self,request,response):# 过滤敏感词response.outputresponse.output.replace(badword,***)returnresponseasyncdefon_error(self,request,error):print(f调用失败:{error})return默认回复# 可以返回备用响应3.4 执行顺序详解当调用 agent.invoke(“hello”) 时LangChain 内部构建一个中间件栈最先添加的中间件位于最外层最后执行 pre最先执行 post执行流程进入 Middleware1.pre_process → 进入 Middleware2.pre_process → … → 到达 LLM 调用LLM 返回 → 退出 MiddlewareN.post_process → … → 退出 Middleware1.post_process示例验证classA(BaseMiddleware):asyncdefpre_process(self,request):print(A pre)returnrequestasyncdefpost_process(self,request,response):print(A post)returnresponseclassB(BaseMiddleware):asyncdefpre_process(self,request):print(B pre)returnrequestasyncdefpost_process(self,request,response):print(B post)returnresponse agentcreate_agent(...)agent.add_middleware(A())agent.add_middleware(B())awaitagent.invoke(test)# 输出# A pre# B pre# (实际 LLM 调用)# B post# A post四、内置中间件LangChain 1.0 提供官方内置中间件覆盖安全、上下文、可靠性三大场景直接导入即可使用。4.1 RetryMiddleware自动重试失败调用支持指数退避。fromlangchain_core.runnables.middlewareimportRetryMiddleware retryRetryMiddleware(max_retries3,backoff_factor1.0)modelChatOpenAI()robust_modelretry(model)4.2 RateLimiterMiddleware基于令牌桶算法的速率限制。fromlangchain_core.runnables.middlewareimportRateLimiterMiddleware limiterRateLimiterMiddleware(requests_per_second10)modellimiter(ChatOpenAI())4.3 CacheMiddleware简单缓存中间件支持内存和 Redis 后端。fromlangchain_core.runnables.middlewareimportCacheMiddlewarefromlangchain_core.cachesimportInMemoryCache cacheCacheMiddleware(cacheInMemoryCache())modelcache(ChatOpenAI())4.4 TracingMiddleware自动集成 LangSmith 或自定义追踪器。fromlangchain_core.runnables.middlewareimportTracingMiddleware tracerTracingMiddleware(project_namemy_project)modeltracer(ChatOpenAI())4.5 PIIRedactionMiddleware自动检测并屏蔽手机号、邮箱、身份证、银行卡等 PII 数据支持自定义正则模式。fromlangchain.agents.middlewareimportPIIRedactionMiddleware middleware[PIIRedactionMiddleware(patterns[phone,email,ssn],# 内置模式custom_patterns{order_id:r\d{10}}# 自定义模式)]4.6 HumanInTheLoopMiddleware敏感工具调用前需人工确认如发送邮件、删除数据支持指定工具白名单。fromlangchain.agents.middlewareimportHumanInTheLoopMiddleware middleware[HumanInTheLoopMiddleware(tool_names[send_email,delete_data],# 需审核的工具approve_message确认执行此操作# 自定义提示)]4.7 SummarizationMiddleware对话过长时自动摘要历史避免 Token 超限支持自定义摘要模型与阈值。fromlangchain.agents.middlewareimportSummarizationMiddleware middleware[SummarizationMiddleware(llmChatOpenAI(modelgpt-3.5-turbo),# 摘要用轻量模型max_tokens_before_summary1500,# 触发摘要的 Token 阈值summary_prompt请用100字内摘要对话历史# 自定义提示)]4.8 MessageLimitMiddleware限制历史消息数量防止无限循环。fromlangchain.agents.middlewareimportMessageLimitMiddleware middleware[MessageLimitMiddleware(max_messages30)# 最多保留30条消息]4.9 ToolRetryMiddleware工具调用失败自动指数退避重试。fromlangchain.agents.middlewareimportToolRetryMiddleware middleware[ToolRetryMiddleware(max_retries2,# 最大重试次数retry_delay1.0# 初始延迟秒)]五、自定义中间件5.1 基础自定义节点式钩子继承 BaseAgentMiddleware重写需要的钩子方法。示例日志中间件classLoggingMiddleware(BaseAgentMiddleware):defbefore_agent(self,inputs:dict,**kwargs)-dict:print(f[Agent 启动] 输入{inputs})returninputs# 可修改输入defbefore_model(self,messages:List[BaseMessage],**kwargs)-List[BaseMessage]:print(f[模型调用前] 消息数{len(messages)})returnmessages# 可修改消息defafter_model(self,response:BaseMessage,**kwargs)-BaseMessage:print(f[模型返回] 内容{response.content[:50]}...)returnresponse# 可修改响应defafter_agent(self,output:dict,**kwargs)-dict:print(f[Agent 结束] 输出{output})returnoutput# 可修改输出5.2 高级自定义包装式钩子重写 wrap_model_call/wrap_tool_call实现重试、缓存等逻辑。示例模型重试中间件classModelRetryMiddleware(BaseAgentMiddleware):def__init__(self,max_retries:int2):self.max_retriesmax_retriesdefwrap_model_call(self,model_call:Callable[[List[BaseMessage]],BaseMessage],messages:List[BaseMessage],**kwargs)-BaseMessage:forattemptinrange(self.max_retries1):try:print(f[重试] 第{attempt1}次调用模型)returnmodel_call(messages)exceptExceptionase:ifattemptself.max_retries:raiseeprint(f[重试失败]{e}1秒后重试...)importtime time.sleep(1)returnmodel_call(messages)5.3 状态管理跨中间件通信通过 kwargs[“state”] 访问 / 修改全局状态实现数据共享。defbefore_model(self,messages:List[BaseMessage],**kwargs)-List[BaseMessage]:statekwargs[state]# 记录模型调用次数state[model_calls]state.get(model_calls,0)1# 注入状态到消息messages.append({role:system,content:f已调用模型{state[model_calls]}次})returnmessages5.4 提前终止Early Exit在 before_model 中返回 {“jumpTo”: “end”} 直接终止 Agent。defbefore_model(self,messages:List[BaseMessage],**kwargs):iflen(messages)50:return{messages:[{role:assistant,content:对话过长已终止}],jumpTo:end# 强制结束}returnmessages六、高级中间件模式6.1 中间件与 LCEL中间件可以直接应用于 LCEL 链因为整个链本身也是一个 Runnable。promptChatPromptTemplate.from_template(Tell me a joke about {topic})chainprompt|model|StrOutputParser()wrapped_chainlog_middleware(chain)resultwrapped_chain.invoke({topic:chickens})中间件会拦截整个链的输入和输出。如果需要针对链中的特定步骤如仅模型调用应用中间件只需包装那个子 Runnable 即可model_with_middlewareretry_middleware()(model)chainprompt|model_with_middleware|parser6.2 访问和修改 RunnableConfig中间件可以读取或修改请求中的 config用于传递上下文如 request ID、用户身份。middlewareasyncdefadd_request_id(request,call_next):configrequest.get(config)orRunnableConfig()ifmetadatanotinconfig:config[metadata]{}config[metadata][request_id]str(uuid.uuid4())request[config]configreturnawaitcall_next(request)6.3 条件性跳过中间件根据输入动态决定是否执行后续逻辑middlewareasyncdefconditional_cache(request,call_next):input_textrequest[input]iflen(input_text)100:# 长文本不缓存直接调用returnawaitcall_next(request)else:# 短文本走缓存逻辑returnawaitcache_middleware(request,call_next)6.4 异常处理与降级中间件可以捕获异常并返回降级结果middlewareasyncdeffallback_middleware(request,call_next):try:returnawaitcall_next(request)exceptExceptionase:print(fPrimary failed:{e}, using fallback)fallback_modelChatOpenAI(modelgpt-3.5-turbo)returnawaitfallback_model.ainvoke(request[input])七、最佳实践优先使用内置中间件RetryMiddleware、RateLimiterMiddleware 等已经经过充分测试。保持中间件幂等重试时不应产生副作用。异步优先尽管中间件支持同步但为了性能建议使用 async 定义。避免修改输入原对象如果需要修改输入应拷贝一份防止影响其他中间件。中间件顺序注意中间件列表的顺序例如日志应该在最外层重试应靠近内部。1. 安全类脱敏、审核→ 第一道防线2. 上下文类摘要、裁剪→ 优化输入3. 可靠性类重试、限流→ 保障稳定4. 监控类日志、统计→ 最后观测测试中间件使用模拟的 call_next 单元测试中间件逻辑。性能优化摘要用轻量模型如 gpt-3.5-turbo减少成本避免在 before_model/after_model 中执行耗时操作用 wrap_model_call 实现缓存减少重复调用企业标配中间件组合middleware[# 1. 安全PIIRedactionMiddleware(),ContentSafetyMiddleware(),# 2. 上下文SummarizationMiddleware(llmChatOpenAI(modelgpt-3.5-turbo)),MessageLimitMiddleware(max_messages30),# 3. 可靠ModelRetryMiddleware(max_retries2),ToolRetryMiddleware(max_retries2),HumanInTheLoopMiddleware(tool_names[send_email,delete_data]),# 4. 监控LoggingMiddleware(),TokenCounterMiddleware()]

相关新闻