
1. 项目概述一次跨模型工作流复刻的探索最近在尝试把Claude的“Dispatch”体验搬到Gemini和OpenCode上这活儿听起来有点意思本质上是在做一次跨模型工作流的复刻与适配。Claude的“Dispatch”功能简单来说就是一种智能的任务分发与上下文管理机制它能理解你的复杂指令自动将任务拆解、分配给合适的内部“专家”模块并整合结果提供连贯、精准的响应。这种体验的核心魅力在于“丝滑”——用户感觉是在和一个高度协同的团队对话而非一个单一的、可能在某些领域存在短板的模型。那么为什么要把这个体验带到Gemini和OpenCode呢原因很直接生态互补与能力拓展。Claude固然强大但每个模型都有其独特的优势领域和资源限制。Gemini在多模态理解和长上下文处理上表现突出而OpenCode则在代码生成与结构化输出方面有独到之处。如果能将“Dispatch”式的智能协调能力赋予它们就意味着我们可以根据任务特性动态调用最合适的模型或工具构建一个更强大、更灵活的AI应用后端。这适合任何希望提升复杂任务处理自动化水平、追求更高响应质量的开发者、产品经理或是AI应用构建者。这个项目的目标不是简单地复制一个功能而是理解“Dispatch”体验背后的设计哲学——智能路由、上下文保持与结果合成——并在一套新的技术栈上实现它。我们将深入拆解其核心组件并一步步构建一个能够协调Gemini和OpenCode的调度系统。2. 核心设计思路与架构拆解2.1 “Dispatch”体验的本质解构要复刻体验首先要解构它。经过分析Claude的“Dispatch”体验可以拆解为三个核心环节意图识别与任务分解系统需要准确理解用户的自然语言请求判断其复杂程度并将其分解为一系列原子化的子任务。例如用户请求“分析这份财报PDF总结其财务亮点并用Python画一个趋势图”这至少涉及文档解析、文本总结、数据提取和代码生成四个子任务。智能路由与执行器调度针对每个原子子任务系统需要决定由哪个“执行器”来处理最合适。在我们的场景中执行器就是Gemini、OpenCode或者未来可能接入的其他模型、API甚至本地函数。路由决策的依据包括任务类型文本、代码、多模态、对模型能力的先验知识、成本考量以及当前上下文。上下文管理与结果整合这是保证体验“连贯性”的关键。子任务之间可能存在依赖关系例如画图需要先用总结出的数据。系统必须妥善管理整个会话的上下文将上游任务的输出作为下游任务的输入并最终将所有子任务的结果整合成一个自然、统一的回复呈现给用户。2.2 我们的系统架构设计基于以上解构我们设计了一个轻量级但功能完整的调度系统架构。整个系统围绕一个中央调度器Dispatcher展开。用户请求 | v [中央调度器 (Dispatcher)] | |-- 1. 意图解析 任务分解 |-- 2. 为每个子任务选择最佳执行器 (Router) |-- 3. 编排执行顺序管理共享上下文 (Orchestrator) | |-- [执行器池] | | | |-- [Gemini 执行器] (处理文本分析、多模态理解、创意生成) | |-- [OpenCode 执行器] (处理代码生成、代码解释、结构化输出) | |-- [未来可扩展的其他执行器...] | v 结果整合与格式化输出为什么选择这样的架构松耦合调度器与执行器分离方便我们独立升级或替换任一模型。例如如果OpenCode发布了新版本我们只需更新对应的执行器模块无需改动核心调度逻辑。可扩展性新的模型或工具可以很容易地以“执行器”的形式接入系统只需实现统一的接口。可控性中央调度器让我们能够集中实现复杂的路由策略、限流、熔断和日志记录这是直接链式调用多个API难以做到的。2.3 关键技术选型与考量后端框架选择FastAPI。它异步性能好能高效处理多个并发的模型API调用自动生成API文档并且编写简洁。对于需要高并发的调度任务异步IO至关重要。模型API接入Google Gemini API使用官方google-generativeaiPython SDK。其优势在于对多模态输入图片、PDF的原生支持以及可配置的安全策略。OpenCode由于其可能指代不同的开源或特定代码模型我们假设通过其提供的API端点如 OpenAI 格式的兼容API进行调用。使用httpx库进行异步HTTP请求保持灵活性。上下文管理使用简单的内存字典或Redis来管理会话上下文。对于简单的演示或低频使用内存存储足够但如果需要持久化或支持多用户会话Redis是更可靠的选择它支持设置过期时间避免内存泄漏。任务队列可选进阶对于耗时较长的任务如处理超大文档可以引入Celery或RQ搭配 Redis 作为消息代理实现异步任务队列避免HTTP请求超时。注意在实际开发中直接、频繁地调用商业模型API会产生费用。务必在代码中实现成本监控和用量限制例如为每个用户或每个会话设置Token消耗上限。3. 核心模块实现详解3.1 中央调度器Dispatcher的实现调度器是整个系统的大脑。我们将其核心功能实现为一个Python类。from typing import List, Dict, Any, Optional from pydantic import BaseModel import asyncio class SubTask(BaseModel): 原子子任务数据模型 id: str description: str task_type: str # 如 “text_analysis”, “code_generation”, “multimodal” depends_on: List[str] [] # 依赖的其他子任务ID required_capabilities: List[str] [] # 所需能力如 “vision”, “long_context” class Dispatcher: def __init__(self, router, orchestrator): self.router router # 路由决策模块 self.orchestrator orchestrator # 编排执行模块 async def dispatch(self, user_query: str, session_id: str) - Dict[str, Any]: 核心调度流程 # 1. 意图解析与任务分解 subtasks await self._analyze_and_decompose(user_query, session_id) # 2. 为每个子任务路由决策 routed_tasks [] for task in subtasks: best_executor self.router.select_executor(task, session_id) routed_tasks.append({task: task, executor: best_executor}) # 3. 编排执行并获取结果 final_result await self.orchestrator.execute(routed_tasks, session_id) return final_result async def _analyze_and_decompose(self, query: str, session_id: str) - List[SubTask]: 利用一个LLM这里我们先用Gemini来分析用户查询并分解任务。 这是一个递归提示工程的过程。 # 构建一个专门用于任务分解的提示词 decomposition_prompt f 你是一个高级任务规划AI。请将以下用户请求分解为一系列可以独立或顺序执行的原子子任务。 每个子任务应该足够简单可以由一个专门的AI模型如文本专家、代码专家、视觉专家处理。 用户请求: {query} 请以JSON格式输出一个子任务列表每个子任务包含以下字段 - id: 唯一标识符 (如 “task_1”) - description: 清晰的任务描述 - task_type: 任务类型 (从以下选择: text_analysis, code_generation, data_extraction, multimodal_understanding, summary) - depends_on: 该任务所依赖的其他任务id列表 (如果没有则为空列表) - required_capabilities: 执行此任务所需的能力 (如 [“long_context”] [“vision”]) 只输出JSON不要有其他解释。 # 调用Gemini API获取分解结果 # 此处为伪代码实际调用需处理API密钥和错误 from .executors.gemini_executor import GeminiExecutor gemini GeminiExecutor() decomposition_result await gemini.generate_text(decomposition_prompt) # 解析返回的JSON转化为SubTask对象列表 import json try: task_dicts json.loads(decomposition_result) return [SubTask(**t) for t in task_dicts] except json.JSONDecodeError: # 如果模型没有返回标准JSON这里需要更健壮的解析或重试逻辑 # 一个备选方案是让模型以特定标记格式输出我们再解析 raise ValueError(Failed to parse task decomposition result.)关键点解析_analyze_and_decompose方法这是实现“智能”的起点。我们利用LLM此处是Gemini来理解复杂意图并拆解任务。提示词工程的质量直接决定了分解的准确性。在实际应用中可能需要多轮调试和优化提示词甚至准备一些示例Few-shot来引导模型。错误处理模型输出不一定总是规整的JSON必须有完善的异常处理机制。生产环境中可以考虑使用像instructor或Pydantic与OpenAI函数调用结合的方式来强制结构化输出。3.2 路由决策器Router的逻辑路由器的职责是给定一个子任务返回最适合处理它的执行器名称。class Router: def __init__(self): # 定义执行器能力矩阵。这是一个静态配置也可以设计成动态学习的。 self.executor_capabilities { gemini-pro: [text_analysis, summary, multimodal_understanding, long_context, creative_writing], gemini-pro-vision: [multimodal_understanding, text_analysis], opencode: [code_generation, code_explanation, structured_output, data_extraction], } # 执行器成本每千Token的假设成本单位任意用于比较 self.executor_cost { gemini-pro: 1.0, gemini-pro-vision: 1.5, opencode: 0.8, } def select_executor(self, task: SubTask, session_context: Dict) - str: 基于任务需求和策略选择执行器。 策略可以基于能力匹配、成本、负载、历史表现等。 candidates [] for executor_name, capabilities in self.executor_capabilities.items(): # 基础筛选执行器是否具备任务所需的所有能力 if all(req in capabilities for req in task.required_capabilities): candidates.append(executor_name) if not candidates: # 如果没有完全匹配的降级处理选择能满足核心需求的 # 这里简化处理直接返回一个默认的 return gemini-pro # 策略从候选者中选择成本最低的 # 更复杂的策略可以在这里实现如加权评分能力匹配度、成本、响应速度 selected min(candidates, keylambda x: self.executor_cost.get(x, float(inf))) # 可以在此处记录路由决策用于后续分析和优化 # log_route_decision(task.id, selected, session_context) return selected路由策略的考量能力匹配优先这是底线不能让一个不懂代码的模型去生成代码。成本优化在能力满足的前提下选择更经济的模型。这对于控制项目运营成本非常重要。进阶策略可以引入性能历史数据比如某个模型对某类任务的平均响应时间和准确率实现基于效用的路由。甚至可以引入简单的负载均衡避免所有请求涌向同一个模型端点。3.3 编排执行器Orchestrator与上下文管理编排器负责按照依赖关系执行任务并管理任务间的数据传递。class Orchestrator: def __init__(self, executor_registry): self.executors executor_registry # 一个包含所有执行器实例的字典 self.context_store {} # 简化版内存上下文存储 {session_id: {task_id: result, ...}} async def execute(self, routed_tasks: List[Dict], session_id: str) - str: 执行路由后的任务列表处理依赖整合结果。 # 初始化或获取本次会话的上下文 session_context self.context_store.setdefault(session_id, {}) # 构建任务依赖图简化版使用拓扑排序思想 # 这里我们假设依赖关系不构成复杂环简单按顺序执行依赖已满足的任务 task_results {} executed set() # 一个简单的执行循环直到所有任务完成 while len(executed) len(routed_tasks): progress False for item in routed_tasks: task item[task] executor_name item[executor] if task.id in executed: continue # 检查依赖是否都已满足 if all(dep in executed for dep in task.depends_on): progress True # 准备输入合并用户原始query和依赖任务的输出 input_context f原始用户请求的上下文。\n for dep_id in task.depends_on: input_context f\n[子任务 {dep_id} 的结果]:\n{task_results[dep_id]}\n full_prompt f{input_context}\n请执行以下任务{task.description} # 获取执行器并运行 executor self.executors.get(executor_name) if not executor: raise ValueError(fExecutor {executor_name} not found.) try: result await executor.execute(full_prompt, task_typetask.task_type) task_results[task.id] result executed.add(task.id) # 更新到会话上下文 session_context[task.id] result except Exception as e: # 任务执行失败记录错误可以尝试重试或降级 task_results[task.id] f任务执行失败: {str(e)} executed.add(task.id) # 标记为已执行失败 # 根据策略决定是否终止整个流程 if not progress: # 检测到循环依赖或死锁 raise RuntimeError(无法解决任务依赖关系可能存在循环依赖。) # 所有子任务完成进行最终整合 final_integration_prompt f 你是一个结果整合专家。以下是用户原始请求和一系列子任务的结果。 请将这些结果整合成一个连贯、完整、直接回答用户问题的最终回复。 回复应自然流畅就像由一个专家一次性完成的一样。 原始用户请求{routed_tasks[0][task].description if routed_tasks else N/A} (这是初始分解的源头) 子任务结果汇总 {self._format_subtask_results_for_integration(task_results)} 请开始你的最终整合回复 # 通常最终整合也由一个LLM执行器完成比如Gemini final_executor self.executors.get(gemini-pro) final_output await final_executor.execute(final_integration_prompt, task_typetext_analysis) return final_output def _format_subtask_results_for_integration(self, results: Dict) - str: formatted for task_id, result in results.items(): formatted f\n--- 子任务 {task_id} 结果 ---\n{result}\n return formatted编排器的核心挑战与解决方案依赖管理我们实现了一个简单的轮询检查依赖的算法。对于更复杂的DAG有向无环图依赖可以使用像networkx这样的库进行拓扑排序实现真正的并行执行独立任务。上下文构建如何将上游任务的结果有效地传递给下游任务是体验“连贯性”的关键。我们采用的方式是将所有依赖结果以清晰的结构化格式拼接到提示词中。对于非常长的上下文可能需要采用摘要、选择性注入等更高级的技术。错误处理与重试生产系统中必须为每个子任务执行配置超时、重试策略和降级方案如主执行器失败后切换到备用执行器。3.4 执行器Executor的抽象与实现执行器是对不同模型API的一层统一封装。from abc import ABC, abstractmethod import httpx import google.generativeai as genai class BaseExecutor(ABC): abstractmethod async def execute(self, prompt: str, **kwargs) - str: pass class GeminiExecutor(BaseExecutor): def __init__(self, model_namegemini-1.5-pro): genai.configure(api_keyos.getenv(GEMINI_API_KEY)) self.model genai.GenerativeModel(model_name) async def execute(self, prompt: str, task_type: str None, image_data: bytes None) - str: 执行任务。支持纯文本和多模态。 try: if image_data: # 处理多模态请求 image_part genai.upload_file(image_data, mime_typeimage/jpeg) # 需根据实际类型调整 contents [image_part, prompt] else: contents [prompt] # 根据任务类型调整生成配置如温度、max_tokens generation_config self._get_generation_config(task_type) response await self.model.generate_content_async( contents, generation_configgeneration_config ) return response.text except Exception as e: # 记录日志并抛出或返回一个友好的错误信息 return f[Gemini 处理错误]{str(e)} def _get_generation_config(self, task_type): 根据任务类型返回不同的生成配置 configs { code_generation: genai.GenerationConfig(temperature0.1, max_output_tokens2048), creative_writing: genai.GenerationConfig(temperature0.8, max_output_tokens1024), text_analysis: genai.GenerationConfig(temperature0.2, max_output_tokens1024), } return configs.get(task_type, genai.GenerationConfig(temperature0.3, max_output_tokens1024)) class OpenCodeExecutor(BaseExecutor): def __init__(self, api_base: str, api_key: str): self.api_base api_base self.api_key api_key self.client httpx.AsyncClient(timeout30.0) async def execute(self, prompt: str, task_type: str None) - str: 调用类OpenAI API格式的OpenCode端点。 headers { Authorization: fBearer {self.api_key}, Content-Type: application/json } payload { model: opencode-model, # 根据实际模型名调整 messages: [{role: user, content: prompt}], temperature: 0.1 if task_type code_generation else 0.3, max_tokens: 2048 } try: resp await self.client.post(f{self.api_base}/v1/chat/completions, jsonpayload, headersheaders) resp.raise_for_status() data resp.json() return data[choices][0][message][content] except httpx.HTTPStatusError as e: return f[OpenCode API 错误] HTTP {e.response.status_code} except Exception as e: return f[OpenCode 处理错误]{str(e)} finally: await self.client.aclose()执行器设计的要点统一接口所有执行器都继承自BaseExecutor并实现execute方法这使调度器可以用相同的方式调用它们。配置化在execute方法中接收task_type等参数允许根据任务动态调整调用参数如温度、最大Token数这是优化输出质量的重要手段。健壮性每个执行器内部都应有完善的错误处理和日志记录避免一个模型的故障导致整个调度流程崩溃。4. 系统集成与API暴露将上述模块组装起来并通过FastAPI暴露为一个服务。from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uuid app FastAPI(titleGemini OpenCode Dispatcher API) # 初始化全局组件 router Router() executor_registry { gemini-pro: GeminiExecutor(model_namegemini-1.5-pro), opencode: OpenCodeExecutor(api_baseos.getenv(OPENCODE_API_BASE), api_keyos.getenv(OPENCODE_API_KEY)), } orchestrator Orchestrator(executor_registry) dispatcher Dispatcher(router, orchestrator) class DispatchRequest(BaseModel): query: str session_id: Optional[str] None # 如果提供则继续之前的会话 app.post(/dispatch) async def handle_dispatch(request: DispatchRequest): 主调度接口。 session_id request.session_id or str(uuid.uuid4()) try: result await dispatcher.dispatch(request.query, session_id) return { session_id: session_id, response: result, status: success } except Exception as e: # 记录详细日志 logger.error(fDispatch failed for session {session_id}: {e}) raise HTTPException(status_code500, detailf调度处理失败: {str(e)}) app.get(/session/{session_id}/context) async def get_session_context(session_id: str): 获取某个会话的上下文用于调试或前端状态保持 if session_id in orchestrator.context_store: return {session_id: session_id, context: orchestrator.context_store[session_id]} else: raise HTTPException(status_code404, detailSession not found)这个API提供了一个/dispatch端点接收用户查询返回处理结果并维护一个会话ID以支持多轮对话的上下文延续。5. 实战测试、常见问题与优化策略5.1 端到端测试案例假设我们向部署好的服务发送如下请求curl -X POST http://localhost:8000/dispatch \ -H Content-Type: application/json \ -d { query: 请解析这张图片中的图表告诉我2023年Q4的趋势数据然后用Python的matplotlib生成一个类似的折线图代码。 }系统内部流转Dispatcher收到请求调用_analyze_and_decompose。Gemini作为任务分解器可能返回如下子任务[ { id: task_1, description: 识别并描述图片中图表的数据特别是2023年Q4的趋势。, task_type: multimodal_understanding, depends_on: [], required_capabilities: [vision] }, { id: task_2, description: 根据任务1提取的数据生成一段Python代码使用matplotlib绘制类似的折线图。, task_type: code_generation, depends_on: [task_1], required_capabilities: [] } ]Router为task_1选择gemini-pro-vision为task_2选择opencode。Orchestrator先执行task_1将图片和提示词发给Gemini Vision得到文本描述如“图表显示Q4销售额从10月的120万增长到12月的150万”。Orchestrator将task_1的结果作为上下文与task_2的描述合并发送给OpenCode执行器。OpenCode生成对应的Matplotlib代码。Orchestrator最后可能调用Gemini进行一次结果整合将数据描述和代码块组织成一个友好的回答。5.2 常见问题与排查技巧任务分解不准确或过于琐碎现象模型把简单请求拆成太多无意义的子任务或漏掉了关键步骤。排查检查任务分解的提示词。尝试提供更明确的指令和格式要求。在提示词中加入几个高质量的示例Few-shot learning通常能极大改善效果。技巧可以设置一个“复杂度阈值”。先让一个轻量级模型判断请求是否真的需要分解对于简单查询直接路由给单一模型处理避免不必要的开销和延迟。上下文过长导致后续任务失败现象上游任务输出内容很长导致下游任务的提示词超出模型上下文窗口。排查监控每个子任务调用时的Token消耗。在Orchestrator构建提示词时进行长度检查。技巧实现“上下文摘要”功能。对于需要传递给下游的长文本先调用一次模型进行摘要浓缩只传递核心信息。或者在设计任务链时让下游任务主动去“询问”所需的具体数据而非被动接收全部上下文。模型API调用失败或超时现象某个执行器频繁报错导致整个流程中断。排查检查网络、API密钥配额和模型服务状态。在执行器内部实现详细的错误日志记录。技巧为每个执行器实现重试机制如使用tenacity库和熔断器模式。当某个模型失败率超过阈值时暂时将其从路由候选池中移除并尝试降级到其他可用模型。最终整合结果生硬、不连贯现象最终回复读起来像是拼凑的有明显的段落割裂感。排查检查最终整合的提示词。是否提供了足够的指引让模型进行“创作性”的融合而不是简单罗列。技巧在最终整合提示词中明确要求模型以特定的口吻如“专业的分析师”和格式进行回复。可以将用户最初的问题再次强调让整合工作始终围绕核心问题展开。5.3 性能与成本优化策略异步并发执行利用asyncio.gather并行执行没有依赖关系的子任务可以显著降低总响应时间。Orchestrator中的执行循环可以优化为依赖图驱动的并行执行。缓存策略对于常见、耗时的子任务结果如“将这张图转成表格数据”可以考虑将输入图片哈希和输出表格文本缓存起来下次遇到相同请求直接返回节省成本和时间。成本监控与预算在每个执行器的调用前后记录Token使用量并在路由决策中引入成本因子。可以为每个用户或每个会话设置预算超出后自动切换到更经济的模型或拒绝服务。路由策略动态优化定期收集任务执行结果的质量反馈如通过人工评估或自动化评分用于动态调整Router中的能力矩阵和权重让系统越用越“聪明”。将Claude的“Dispatch”体验迁移到Gemini和OpenCode的生态中是一次富有挑战但极具价值的工程实践。它迫使我们去深入思考复杂AI工作流的设计模式而不仅仅是简单的API调用。这套系统的核心价值在于其可观测性和可控性——每一个决策、每一次调用都清晰可见并且可以通过策略进行调整。在实际部署中你会不断在分解的粒度、路由的准确性、上下文的有效性和系统的响应速度之间进行权衡。从我搭建类似系统的经验来看起步时不必追求全自动的完美分解可以从一些预设的、高频的复杂任务模板开始逐步增加系统的智能这样更容易获得稳定可靠的效果。