云原生 AI 平台:从模型仓库到弹性推理服务的全链路搭建

发布时间:2026/6/26 6:31:13

云原生 AI 平台:从模型仓库到弹性推理服务的全链路搭建 云原生 AI 平台从模型仓库到弹性推理服务的全链路搭建一、模型训练完之后的事很多团队把主要精力放在模型训练上训练完随便丢个 Flask 接口就上线了。结果在生产环境里问题不断GPU 利用率不到 30%流量高峰时推理超时模型回滚得手动拷贝文件多模型一起跑时资源争抢导致 OOM。这些问题的根本原因很简单——推理服务缺一个真正的平台级支撑。生产环境中的 AI 推理服务不能只追求跑起来就行它得解决四个实际问题模型版本怎么管、GPU 资源怎么弹性调度、多模型共存时资源怎么隔离、推理链路怎么监控。下面我会从零开始搭建一套基于 Kubernetes 的云原生 AI 推理平台覆盖模型仓库、推理服务编排、GPU 弹性伸缩和可观测性这几个关键环节。二、推理平台架构设计生产级的 AI 推理平台至少需要四层配合模型仓库层管版本和分发调度层分配 GPU 资源推理服务层负责模型加载和执行流量网关层处理路由和限流。graph TB subgraph 模型仓库层 MR[Model Registrybr/模型版本管理] MS[MinIO/S3br/模型制品存储] end subgraph 调度层 K8s[Kubernetes Scheduler] GPU[GPU Device Plugin] QM[Queue Managerbr/请求排队] end subgraph 推理服务层 TS1[Triton Serverbr/模型A v2.1] TS2[Triton Serverbr/模型B v1.3] TS3[vLLM Serverbr/LLM 模型] end subgraph 流量网关层 GW[API Gateway] LB[负载均衡] RL[限流/熔断] end MR --|模型拉取| TS1 MR --|模型拉取| TS2 MR --|模型拉取| TS3 MS --|制品下载| MR K8s --|Pod 调度| TS1 K8s --|Pod 调度| TS2 K8s --|Pod 调度| TS3 GPU --|GPU 分配| K8s GW -- LB -- RL RL --|路由| TS1 RL --|路由| TS2 RL --|路由| TS3 QM --|排队| RL为什么这么选模型仓库用了 MinIO 而不是 HuggingFace Hub主要是生产环境需要私有化部署和访问控制。推理引擎选了 Triton Inference Server因为它能同时支持 TensorRT、PyTorch 和 ONNX 这些框架还能做动态批处理。流量网关层加了 Queue Manager防止突发流量直接把推理服务打垮。三、核心代码实现3.1 模型仓库版本管理和热切换package modelregistry import ( context fmt path/filepath sync time github.com/minio/minio-go/v7 github.com/minio/minio-go/v7/pkg/credentials ) // ModelVersion 模型版本元信息 type ModelVersion struct { Name string json:name Version string json:version Framework string json:framework // pytorch, onnx, tensorrt SHA256 string json:sha256 Size int64 json:size CreatedAt time.Time json:created_at } // ModelRegistry 模型仓库管理模型版本的生命周期 type ModelRegistry struct { client *minio.Client bucket string localCache string // 本地缓存目录 mu sync.RWMutex versions map[string][]ModelVersion // key: modelName } func NewModelRegistry(endpoint, accessKey, secretKey, bucket, cacheDir string) (*ModelRegistry, error) { client, err : minio.New(endpoint, minio.Options{ Creds: credentials.NewStaticV4(accessKey, secretKey, ), Secure: false, }) if err ! nil { return nil, fmt.Errorf(创建 MinIO 客户端失败: %w, err) } ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 确保桶存在 exists, err : client.BucketExists(ctx, bucket) if err ! nil { return nil, fmt.Errorf(检查桶失败: %w, err) } if !exists { if err : client.MakeBucket(ctx, bucket, minio.MakeBucketOptions{}); err ! nil { return nil, fmt.Errorf(创建桶失败: %w, err) } } return ModelRegistry{ client: client, bucket: bucket, localCache: cacheDir, versions: make(map[string][]ModelVersion), }, nil } // RegisterModel 注册新模型版本返回版本信息 func (mr *ModelRegistry) RegisterModel(ctx context.Context, name, version, framework, localPath string) (*ModelVersion, error) { objectKey : fmt.Sprintf(%s/%s/model.tar.gz, name, version) // 上传模型到 MinIO info, err : mr.client.FPutObject(ctx, mr.bucket, objectKey, localPath, minio.PutObjectOptions{ ContentType: application/gzip, }) if err ! nil { return nil, fmt.Errorf(上传模型失败: %w, err) } mv : ModelVersion{ Name: name, Version: version, Framework: framework, Size: info.Size, CreatedAt: time.Now(), } // 线程安全地更新版本索引 mr.mu.Lock() mr.versions[name] append(mr.versions[name], *mv) mr.mu.Unlock() return mv, nil } // PullModel 拉取指定版本模型到本地支持并发安全 func (mr *ModelRegistry) PullModel(ctx context.Context, name, version string) (string, error) { objectKey : fmt.Sprintf(%s/%s/model.tar.gz, name, version) localPath : filepath.Join(mr.localCache, name, version, model.tar.gz) err : mr.client.FGetObject(ctx, mr.bucket, objectKey, localPath, minio.GetObjectOptions{}) if err ! nil { return , fmt.Errorf(拉取模型 %s:%s 失败: %w, name, version, err) } return localPath, nil } // GetLatestVersion 获取模型最新版本读锁保护并发读取 func (mr *ModelRegistry) GetLatestVersion(name string) (*ModelVersion, error) { mr.mu.RLock() defer mr.mu.RUnlock() versions, ok : mr.versions[name] if !ok || len(versions) 0 { return nil, fmt.Errorf(模型 %s 不存在, name) } latest : versions[len(versions)-1] return latest, nil }3.2 推理服务编排Kubernetes CRD 与 GPU 调度# inference-service.yaml — 推理服务自定义资源 apiVersion: ai.platform/v1 kind: InferenceService metadata: name: text-classifier namespace: ai-serving spec: model: name: bert-text-cls version: 2.1 framework: onnx runtime: engine: triton image: nvcr.io/nvidia/tritonserver:24.01-py3 resources: limits: nvidia.com/gpu: 1 memory: 8Gi requests: cpu: 2 memory: 4Gi autoscaling: minReplicas: 1 maxReplicas: 8 targetGPUUtilization: 70 scaleUpCooldown: 60s scaleDownCooldown: 300s healthCheck: readinessPath: /v2/health/ready livenessPath: /v2/health/live initialDelaySeconds: 30 periodSeconds: 103.3 GPU 弹性伸缩控制器package controller import ( context fmt math sync time metav1 k8s.io/apimachinery/pkg/apis/meta/v1 k8s.io/client-go/kubernetes k8s.io/client-go/rest k8s.io/metrics/pkg/client/clientset/versioned ) // GPUScaler GPU 弹性伸缩控制器 type GPUScaler struct { k8sClient *kubernetes.Clientset metricsClient *versioned.Clientset namespace string mu sync.Mutex cooldown map[string]time.Time // 上次伸缩时间防止抖动 } func NewGPUScaler(namespace string) (*GPUScaler, error) { config, err : rest.InClusterConfig() if err ! nil { return nil, fmt.Errorf(获取集群配置失败: %w, err) } k8sClient, err : kubernetes.NewForConfig(config) if err ! nil { return nil, fmt.Errorf(创建 K8s 客户端失败: %w, err) } metricsClient, err : versioned.NewForConfig(config) if err ! nil { return nil, fmt.Errorf(创建 Metrics 客户端失败: %w, err) } return GPUScaler{ k8sClient: k8sClient, metricsClient: metricsClient, namespace: namespace, cooldown: make(map[string]time.Time), }, nil } // ScaleDecision 伸缩决策结果 type ScaleDecision struct { DeploymentName string CurrentReplicas int32 TargetReplicas int32 Reason string } // Evaluate 根据GPU利用率计算目标副本数 func (gs *GPUScaler) Evaluate(ctx context.Context, deployName string, currentReplicas int32, targetUtil int32, minReplicas, maxReplicas int32) (*ScaleDecision, error) { gs.mu.Lock() defer gs.mu.Unlock() // 冷却期检查防止频繁伸缩导致服务抖动 if lastScale, ok : gs.cooldown[deployName]; ok { if time.Since(lastScale) 60*time.Second { return ScaleDecision{ DeploymentName: deployName, CurrentReplicas: currentReplicas, TargetReplicas: currentReplicas, Reason: 冷却期内跳过伸缩, }, nil } } // 获取 Pod 指标 podMetrics, err : gs.metricsClient.MetricsV1beta1().PodMetricses(gs.namespace).List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf(app%s, deployName), }) if err ! nil { return nil, fmt.Errorf(获取 Pod 指标失败: %w, err) } if len(podMetrics.Items) 0 { return nil, fmt.Errorf(未找到 %s 的 Pod 指标, deployName) } // 计算平均 GPU 利用率通过自定义指标 var totalUtil float64 podCount : float64(len(podMetrics.Items)) for _, pm : range podMetrics.Items { for _, container : range pm.Containers { if gpuVal, ok : container.Usage[nvidia.com/gpu-utilization]; ok { totalUtil float64(gpuVal.MilliValue()) / 1000.0 } } } avgUtil : totalUtil / podCount // 按比例计算目标副本数向上取整 ratio : avgUtil / float64(targetUtil) targetReplicas : int32(math.Ceil(float64(currentReplicas) * ratio)) // 边界保护 if targetReplicas minReplicas { targetReplicas minReplicas } if targetReplicas maxReplicas { targetReplicas maxReplicas } // 副本数没变化则不操作 if targetReplicas currentReplicas { return ScaleDecision{ DeploymentName: deployName, CurrentReplicas: currentReplicas, TargetReplicas: currentReplicas, Reason: fmt.Sprintf(当前利用率 %.1f%%无需调整, avgUtil), }, nil } gs.cooldown[deployName] time.Now() return ScaleDecision{ DeploymentName: deployName, CurrentReplicas: currentReplicas, TargetReplicas: targetReplicas, Reason: fmt.Sprintf(GPU利用率 %.1f%%目标 %d%%调整副本 %d→%d, avgUtil, targetUtil, currentReplicas, targetReplicas), }, nil }3.4 推理请求排队与背压控制package gateway import ( context fmt sync sync/atomic time ) // InferenceRequest 推理请求 type InferenceRequest struct { ID string ModelName string Payload []byte Timeout time.Duration Result chan *InferenceResponse } // InferenceResponse 推理响应 type InferenceResponse struct { Data []byte Error error } // QueueManager 请求排队管理器实现背压控制 type QueueManager struct { maxQueueSize int64 currentQueue atomic.Int64 queues map[string]chan *InferenceRequest // 每个模型一个队列 mu sync.RWMutex defaultTimeout time.Duration } func NewQueueManager(maxQueueSize int, defaultTimeout time.Duration) *QueueManager { return QueueManager{ maxQueueSize: int64(maxQueueSize), queues: make(map[string]chan *InferenceRequest), defaultTimeout: defaultTimeout, } } // getOrCreateQueue 获取或创建模型队列读写锁保护并发创建 func (qm *QueueManager) getOrCreateQueue(modelName string) chan *InferenceRequest { qm.mu.RLock() q, ok : qm.queues[modelName] qm.mu.RUnlock() if ok { return q } qm.mu.Lock() defer qm.mu.Unlock() // 双重检查避免并发创建 if q, ok qm.queues[modelName]; ok { return q } q make(chan *InferenceRequest, qm.maxQueueSize) qm.queues[modelName] q return q } // Enqueue 入队队列满时直接拒绝避免雪崩 func (qm *QueueManager) Enqueue(ctx context.Context, req *InferenceRequest) error { current : qm.currentQueue.Load() if current qm.maxQueueSize { return fmt.Errorf(队列已满(%d/%d)请求被拒绝请稍后重试, current, qm.maxQueueSize) } queue : qm.getOrCreateQueue(req.ModelName) select { case queue - req: qm.currentQueue.Add(1) return nil case -ctx.Done(): return fmt.Errorf(入队超时: %w, ctx.Err()) } } // Dequeue 出队供推理服务消费 func (qm *QueueManager) Dequeue(ctx context.Context, modelName string) (*InferenceRequest, error) { queue : qm.getOrCreateQueue(modelName) select { case req : -queue: qm.currentQueue.Add(-1) return req, nil case -ctx.Done(): return nil, fmt.Errorf(出队超时: %w, ctx.Err()) } }四、架构的局限性GPU 弹性伸缩有延迟。GPU 模型加载通常需要 10-60 秒取决于模型大小这意味着 HPA 的扩容响应速度远慢于 CPU 服务。在流量突增场景下新增 Pod 从调度到就绪可能需要 1-2 分钟这段时间内的请求只能靠排队缓冲。如果业务对延迟极度敏感建议保持一定的冗余副本而非完全依赖弹性伸缩。多模型共存导致资源碎片化。当一个节点上同时运行多个小模型时每个 Pod 占用 1 块 GPU 但利用率只有 20%导致 GPU 资源严重浪费。Triton 的多模型共享 GPU 方案可以缓解这个问题但要求模型框架一致比如都是 ONNX混合框架场景仍然受限。对于 GPU 利用率优化MPSMulti-Process Service是一个选项但它牺牲了故障隔离——一个模型崩溃可能影响同 GPU 上的其他模型。模型仓库的一致性挑战。MinIO 作为模型仓库在单集群内表现良好但多集群场景下模型同步依赖手动操作或自建同步脚本。如果需要多地域部署建议在 MinIO 上层增加一致性同步层或者直接使用支持跨区域复制的对象存储如 S3 的跨区域复制。哪些场景不适合模型推理延迟要求 10ms 的实时系统应考虑专用推理芯片和本地缓存模型体积超过单 GPU 显存的大模型需要模型并行方案如 TensorRT-LLM 的 TP以及没有 Kubernetes 基础设施的团队运维成本会超过收益。五、总结这套平台是从生产环境的实际痛点出发的。核心包括四块用 MinIO 做模型仓库实现版本管理和分发用 Triton 做推理服务编排支持多框架共存基于 GPU 利用率指标的弹性伸缩控制器实现资源调度还有基于队列的背压控制防止流量雪崩。架构上做了一些明确的妥协GPU 扩容延迟靠排队缓冲而不是即时响应多模型共存靠框架约束而不是完全自由组合模型同步靠单集群设计而不是多地域一致性。这些妥协不是偷懒而是在工程复杂度和实际收益之间找到的平衡点。基础设施的职责就是托底——在流量洪峰时不崩在模型迭代时不乱在资源紧张时不浪费。

相关新闻