
AI 推理服务弹性调度与 GPU 资源管理实践一、场景痛点GPU 资源稀缺与弹性需求随着大语言模型在各行业的广泛应用GPU 资源的管理和调度成为影响 AI 应用性能和成本的核心问题。与传统 CPU 计算不同GPU 有以下独特挑战资源稀缺高端 GPU如 H100、A100供应紧张价格昂贵异构资源不同型号 GPU 性能差异大需要智能调度弹性需求波动AI 推理请求量波动剧烈需要快速扩缩容多任务共享同一 GPU 上可能运行多个推理任务需要合理分配传统的资源管理方式固定分配、手动扩缩容已经无法满足 AI 时代的挑战需要更智能的弹性调度方案。二、底层机制与原理深度剖析2.1 GPU 资源调度架构flowchart TD subgraph 调度层 A[API Gateway] -- B[调度器] B -- C{调度决策} end subgraph GPU 资源层 D[GPU Pool Manager] E[节点1: A100 x4] F[节点2: A100 x4] G[节点3: H100 x2] end subgraph AI 推理服务 H[vLLM Instance 1] I[vLLM Instance 2] J[TensorRT-LLM Instance] end C --|资源分配| D D -- E D -- F D -- G E -- H E -- I G -- J K[Prometheus] -- B K -- D K --监控指标 style B fill:#b8d4ff style D fill:#FFE4B5调度器的核心职责资源感知了解 GPU 的类型、数量、显存、温度等状态请求路由将推理请求路由到合适的 GPU 实例弹性扩缩根据负载自动调整实例数量公平分配在多个租户之间公平分配 GPU 资源2.2 GPU 调度算法分类flowchart LR A[调度算法] -- B[基于规则] A -- C[基于队列] A -- D[基于预测] A -- E[基于强化学习] B -- B1[轮询] B -- B2[最少连接] B -- B3[亲和性] C -- C1[优先级队列] C -- C2[公平调度] C -- C3[资源预留] D -- D1[流量预测] D -- D2[容量规划] E -- E1[DeepRM] E -- E2[Decima]三、生产级代码实现与最佳实践3.1 GPU 资源管理器# GPU 资源管理器 生产级 GPU 资源管理系统 支持多节点、多 GPU 的资源调度 import asyncio import time from dataclasses import dataclass, field from typing import Dict, List, Optional, Set from enum import Enum import threading import psutil import subprocess from collections import defaultdict import logging logger logging.getLogger(__name__) class GPUState(Enum): IDLE idle # 空闲 ALLOCATED allocated # 已分配 RESERVED reserved # 预留 FAULTY faulty # 故障 dataclass class GPUInfo: GPU 信息 index: int name: str memory_total: int # bytes memory_free: int # bytes utilization: float # 0-1 temperature: float # celsius state: GPUState GPUState.IDLE allocated_instances: Set[str] field(default_factoryset) property def memory_used(self) - int: return self.memory_total - self.memory_free property def memory_utilization(self) - float: return self.memory_used / self.memory_total if self.memory_total 0 else 0 property def can_allocate(self) - bool: return self.state GPUState.IDLE and len(self.allocated_instances) 0 class GPUResourceManager: GPU 资源管理器 核心功能 1. GPU 状态监控 2. 资源分配与回收 3. 负载均衡 4. 故障检测 def __init__(self, config: GPUManagerConfig): self.config config self.nodes: Dict[str, List[GPUInfo]] {} self.instance_to_gpu: Dict[str, tuple] {} # instance_id - (node_id, gpu_index) self.gpu_to_instance: Dict[tuple, str] {} # (node_id, gpu_index) - instance_id self._lock threading.RLock() self._monitoring False self._monitor_task: Optional[asyncio.Task] None async def start(self): 启动资源管理器 await self._discover_gpus() self._monitoring True self._monitor_task asyncio.create_task(self._monitor_loop()) logger.info(GPU Resource Manager started) async def stop(self): 停止资源管理器 self._monitoring False if self._monitor_task: self._monitor_task.cancel() logger.info(GPU Resource Manager stopped) async def _discover_gpus(self): 发现集群中的 GPU 资源 # 通过 nvidia-smi 或 NVML 发现 GPU try: result subprocess.run( [nvidia-smi, --query-gpuindex,name,memory.total,memory.free,utilization.gpu,temperature.gpu, --formatcsv,noheader,nounits], capture_outputTrue, textTrue, checkTrue ) node_id self._get_node_id() for line in result.stdout.strip().split(\n): if not line: continue parts [p.strip() for p in line.split(,)] if len(parts) ! 6: continue gpu_info GPUInfo( indexint(parts[0]), nameparts[1], memory_totalint(parts[2]) * 1024 * 1024, # MB to bytes memory_freeint(parts[3]) * 1024 * 1024, utilizationfloat(parts[4]) / 100, temperaturefloat(parts[5]) ) if node_id not in self.nodes: self.nodes[node_id] [] self.nodes[node_id].append(gpu_info) logger.info(fDiscovered {len(self.nodes.get(node_id, []))} GPUs on node {node_id}) except Exception as e: logger.warning(fFailed to discover GPUs: {e}) def _get_node_id(self) - str: 获取节点 ID return subprocess.run([hostname], capture_outputTrue, textTrue).stdout.strip() async def _monitor_loop(self): 监控循环 while self._monitoring: try: await self._update_gpu_status() await self._check_gpu_health() await asyncio.sleep(self.config.monitoring_interval) except Exception as e: logger.error(fMonitoring error: {e}) async def _update_gpu_status(self): 更新 GPU 状态 try: result subprocess.run( [nvidia-smi, --query-gpuindex,memory.free,utilization.gpu,temperature.gpu, --formatcsv,noheader,nounits], capture_outputTrue, textTrue, checkTrue ) node_id self._get_node_id() gpus self.nodes.get(node_id, []) for line, gpu in zip(result.stdout.strip().split(\n), gpus): if not line: continue parts [p.strip() for p in line.split(,)] if len(parts) ! 4: continue with self._lock: gpu.memory_free int(parts[1]) * 1024 * 1024 gpu.utilization float(parts[2]) / 100 gpu.temperature float(parts[3]) except Exception as e: logger.warning(fFailed to update GPU status: {e}) async def _check_gpu_health(self): 检查 GPU 健康状态 node_id self._get_node_id() gpus self.nodes.get(node_id, []) for gpu in gpus: with self._lock: # 温度过高的 GPU 标记为故障 if gpu.temperature self.config.max_temperature: if gpu.state ! GPUState.FAULTY: logger.warning(fGPU {gpu.index} temperature too high: {gpu.temperature}°C) gpu.state GPUState.FAULTY # 显存不足的 GPU 标记为 Reserve if gpu.memory_free self.config.min_free_memory: if gpu.state ! GPUState.RESERVED: logger.warning(fGPU {gpu.index} low memory: {gpu.memory_free / 1024**2:.0f}MB free) gpu.state GPUState.RESERVED def allocate_gpu( self, instance_id: str, memory_required: int, preference: Optional[Dict] None ) - Optional[tuple]: 分配 GPU 资源 返回 (node_id, gpu_index) 或 None with self._lock: for node_id, gpus in self.nodes.items(): for gpu in sorted(gpus, keylambda g: g.memory_free, reverseTrue): # 检查是否可用 if gpu.state GPUState.FAULTY: continue if gpu.memory_free memory_required: continue if gpu.state GPUState.RESERVED and ( preference is None or not preference.get(allow_reserved, False) ): continue # 分配 GPU gpu.state GPUState.ALLOCATED gpu.allocated_instances.add(instance_id) self.instance_to_gpu[instance_id] (node_id, gpu.index) self.gpu_to_instance[(node_id, gpu.index)] instance_id logger.info(fAllocated GPU {node_id}:{gpu.index} to instance {instance_id}) return (node_id, gpu.index) return None def release_gpu(self, instance_id: str) - bool: 释放 GPU 资源 with self._lock: if instance_id not in self.instance_to_gpu: logger.warning(fInstance {instance_id} not allocated) return False node_id, gpu_index self.instance_to_gpu[instance_id] if node_id in self.nodes: for gpu in self.nodes[node_id]: if gpu.index gpu_index: gpu.allocated_instances.discard(instance_id) if len(gpu.allocated_instances) 0: gpu.state GPUState.IDLE logger.info(fReleased GPU {node_id}:{gpu.index} from instance {instance_id}) break del self.instance_to_gpu[instance_id] del self.gpu_to_instance[(node_id, gpu_index)] return True def get_allocation_summary(self) - Dict: 获取资源分配摘要 with self._lock: total_gpus sum(len(gpus) for gpus in self.nodes.values()) allocated_gpus sum( 1 for gpus in self.nodes.values() for gpu in gpus if gpu.state GPUState.ALLOCATED ) return { total_gpus: total_gpus, allocated_gpus: allocated_gpus, idle_gpus: total_gpus - allocated_gpus, utilization: allocated_gpus / total_gpus if total_gpus 0 else 0, by_node: { node_id: { total: len(gpus), allocated: sum(1 for g in gpus if g.state GPUState.ALLOCATED), idle: sum(1 for g in gpus if g.state GPUState.IDLE), } for node_id, gpus in self.nodes.items() } } dataclass class GPUManagerConfig: 资源配置 monitoring_interval: int 5 # 秒 max_temperature: float 85.0 # celsius min_free_memory: int 2 * 1024 * 1024 * 1024 # 2GB3.2 弹性调度器# AI 推理弹性调度器 基于预测的弹性调度器 支持 1. 主动扩缩容 2. 流量预测 3. 蓝绿部署 4. 金丝雀发布 import asyncio import time from dataclasses import dataclass, field from typing import Dict, List, Optional, Set from collections import deque import logging from datetime import datetime, timedelta logger logging.getLogger(__name__) dataclass class ScalingConfig: 扩缩容配置 min_instances: int 1 max_instances: int 10 scale_up_threshold: float 0.8 # 80% 利用率触发扩容 scale_down_threshold: float 0.3 # 30% 利用率触发缩容 scale_up_cooldown: int 60 # 扩容冷却时间秒 scale_down_cooldown: int 300 # 缩容冷却时间秒 target_utilization: float 0.7 # 目标利用率 dataclass class InstanceInfo: 推理实例信息 instance_id: str gpu_node: str gpu_index: int model_name: str memory_required: int current_load: float 0.0 request_count: int 0 created_at: datetime field(default_factorydatetime.now) status: str starting # starting, ready, draining, stopped class ElasticScheduler: 弹性调度器 核心功能 1. 基于利用率的自动扩缩容 2. 基于时间序列的预测性扩容 3. 流量分配与负载均衡 def __init__( self, resource_manager: GPUResourceManager, scaling_config: ScalingConfig ): self.resource_manager resource_manager self.scaling_config scaling_config self.instances: Dict[str, InstanceInfo] {} self.instances_by_model: Dict[str, Set[str]] {} # 流量历史用于预测 self.request_history: deque deque(maxlen1000) # 指标收集 self.metrics_history: deque deque(maxlen100) # 扩缩容状态 self.last_scale_up_time: datetime datetime.min self.last_scale_down_time: datetime datetime.min self._scheduler_task: Optional[asyncio.Task] None self._running False async def start(self): 启动调度器 self._running True self._scheduler_task asyncio.create_task(self._scheduler_loop()) logger.info(Elastic Scheduler started) async def stop(self): 停止调度器 self._running False if self._scheduler_task: self._scheduler_task.cancel() logger.info(Elastic Scheduler stopped) async def _scheduler_loop(self): 调度循环 while self._running: try: # 收集指标 await self._collect_metrics() # 预测流量 predicted_load self._predict_load() # 执行扩缩容决策 await self._execute_scaling_decision(predicted_load) # 负载均衡 await self._rebalance_load() await asyncio.sleep(10) # 每 10 秒检查一次 except Exception as e: logger.error(fScheduler error: {e}) async def _collect_metrics(self): 收集实例指标 total_load 0.0 total_instances 0 for instance in self.instances.values(): if instance.status ready: # 模拟指标收集实际应从实例获取 instance.current_load min(1.0, instance.request_count / 100) total_load instance.current_load total_instances 1 if total_instances 0: avg_utilization total_load / total_instances self.metrics_history.append({ timestamp: datetime.now(), utilization: avg_utilization, instances: total_instances }) def _predict_load(self) - float: 基于时间序列预测未来负载 if len(self.request_history) 10: return 0.5 # 默认 50% # 简单移动平均预测 recent list(self.request_history)[-30:] if not recent: return 0.5 avg_load sum(r[load] for r in recent) / len(recent) # 检测趋势 if len(recent) 60: older sum(r[load] for r in recent[-60:-30]) / 30 newer sum(r[load] for r in recent[-30:]) / 30 trend (newer - older) / (older 1e-6) else: trend 0 # 预测 predicted avg_load * (1 trend * 0.5) return max(0, min(1, predicted)) async def _execute_scaling_decision(self, predicted_load: float): 执行扩缩容决策 now datetime.now() current_instances sum(1 for i in self.instances.values() if i.status ready) # 计算目标实例数 target_instances current_instances # 获取当前平均利用率 current_utilization 0 if self.metrics_history: current_utilization self.metrics_history[-1][utilization] # 扩容决策 if current_utilization self.scaling_config.scale_up_threshold: if (now - self.last_scale_up_time).total_seconds() self.scaling_config.scale_up_cooldown: if current_instances self.scaling_config.max_instances: target_instances min( self.scaling_config.max_instances, current_instances 1 ) self.last_scale_up_time now logger.info(fScaling up: {current_instances} - {target_instances}) # 缩容决策 elif current_utilization self.scaling_config.scale_down_threshold: if (now - self.last_scale_down_time).total_seconds() self.scaling_config.scale_down_cooldown: if current_instances self.scaling_config.min_instances: target_instances max( self.scaling_config.min_instances, current_instances - 1 ) self.last_scale_down_time now logger.info(fScaling down: {current_instances} - {target_instances}) # 执行扩缩容 if target_instances ! current_instances: if target_instances current_instances: await self._scale_up(target_instances - current_instances) else: await self._scale_down(current_instances - target_instances) async def _scale_up(self, count: int): 扩容 for _ in range(count): instance_id finst_{int(time.time() * 1000)} # 分配 GPU gpu_allocation self.resource_manager.allocate_gpu( instance_idinstance_id, memory_required8 * 1024**3 # 8GB ) if gpu_allocation is None: logger.warning(fFailed to allocate GPU for new instance) break node_id, gpu_index gpu_allocation instance InstanceInfo( instance_idinstance_id, gpu_nodenode_id, gpu_indexgpu_index, model_namedefault, memory_required8 * 1024**3 ) self.instances[instance_id] instance if instance.model_name not in self.instances_by_model: self.instances_by_model[instance.model_name] set() self.instances_by_model[instance.model_name].add(instance_id) # 异步启动实例 asyncio.create_task(self._start_instance(instance)) async def _scale_down(self, count: int): 缩容 # 选择最空闲的实例 sorted_instances sorted( [i for i in self.instances.values() if i.status ready], keylambda x: x.current_load ) for instance in sorted_instances[:count]: await self._stop_instance(instance) async def _start_instance(self, instance: InstanceInfo): 启动推理实例 instance.status starting # 模拟启动过程实际应启动 vLLM 等 await asyncio.sleep(5) instance.status ready logger.info(fInstance {instance.instance_id} started on {instance.gpu_node}:{instance.gpu_index}) async def _stop_instance(self, instance: InstanceInfo): 停止推理实例 instance.status draining # 等待现有请求处理完成 await asyncio.sleep(10) # 释放 GPU self.resource_manager.release_gpu(instance.instance_id) # 移除实例 self.instances_by_model[instance.model_name].discard(instance.instance_id) del self.instances[instance.instance_id] logger.info(fInstance {instance.instance_id} stopped) async def _rebalance_load(self): 负载均衡 if len(self.request_history) 0: return # 获取最新请求的模型 latest_request self.request_history[-1] model_name latest_request.get(model, default) # 选择负载最低的实例 ready_instances [ i for i in self.instances.values() if i.status ready and i.model_name model_name ] if not ready_instances: return # 选择最空闲的实例 selected min(ready_instances, keylambda x: x.current_load) logger.debug(fSelected instance {selected.instance_id} with load {selected.current_load}) async def route_request( self, model_name: str, request_data: dict ) - Optional[str]: 路由请求到合适实例 self.request_history.append({ timestamp: datetime.now(), model: model_name, load: 0.5 # 简化 }) ready_instances [ i for i in self.instances.values() if i.status ready and i.model_name model_name ] if not ready_instances: return None # 简单轮询 return ready_instances[0].instance_id def get_status(self) - dict: 获取调度器状态 return { total_instances: len(self.instances), ready_instances: sum(1 for i in self.instances.values() if i.status ready), metrics: { avg_utilization: ( self.metrics_history[-1][utilization] if self.metrics_history else 0 ), request_count: len(self.request_history), }, scaling: { last_scale_up: self.last_scale_up_time.isoformat(), last_scale_down: self.last_scale_down_time.isoformat(), } }四、边界分析与架构权衡4.1 GPU 调度策略对比策略优点缺点适用场景FIFO简单可能导致长等待批处理公平调度公平性好可能导致资源浪费多租户负载均衡资源利用率高可能导致请求延迟在线推理预测调度提前扩容预测不准确时浪费流量可预测4.2 弹性调度注意事项风险缓解措施扩容不及时预测性扩容 资源预留缩容过快设置最小实例数 冷却时间GPU 碎片化资源池化 动态绑定故障传播熔断 自动恢复五、总结AI 推理服务的弹性调度是 AI 基础设施的核心能力。通过智能的资源管理和调度算法可以实现资源高效利用最大化 GPU 利用率降低单位推理成本弹性伸缩快速响应流量变化保证服务质量成本优化预测性扩容避免资源浪费高可用故障自动检测和恢复保证服务稳定性关键成功因素完善的监控实时了解 GPU 状态和负载情况智能调度算法结合预测和实时状态做决策资源预留为突发流量预留缓冲资源渐进式实施从简单策略开始逐步引入 AIGPU 资源管理的智能化是 AI 时代运维的核心挑战需要持续投入和优化。