HAMi 源码阅读笔记 05:从 PredicateRoute 看 HAMi /filter 的请求处理流程

发布时间:2026/5/21 22:14:10

HAMi 源码阅读笔记 05:从 PredicateRoute 看 HAMi /filter 的请求处理流程 一、/filter 在 HAMi 调度流程中的位置本文分析 HAMi scheduler extender 的/filter入口方法func PredicateRoute(s *scheduler.Scheduler) httprouter.Handle该方法位于pkg/scheduler/routes/route.go在 Kubernetes 调度流程中一个 Pod 的调度大体可以分为两个阶段Scheduling Cycle和Binding Cycle。Scheduling Cycle 负责为 Pod 选择一个合适的节点Binding Cycle 负责把这个调度结果真正应用到集群中。HAMi 是一个面向 Kubernetes 集群的异构 AI 计算设备管理平台支持 GPU、NPU 等异构设备共享并可以在容器之间共享设备资源。HAMi 使用 scheduler extender 机制参与 kube-scheduler 的调度。简单来说kube-scheduler 负责 Kubernetes 原生调度主流程而 HAMi scheduler extender 通过 HTTP 接口参与部分调度阶段例如/filter和/bind。HAMi scheduler 启动时会创建一个httprouter.Router并注册/filter、/bind、/webhook等 HTTP 路由router : httprouter.New() router.POST(/filter, routes.PredicateRoute(sher)) router.POST(/bind, routes.Bind(sher)) router.POST(/webhook, routes.WebHookRoute()) router.GET(/healthz, routes.HealthzRoute()) router.GET(/readyz, routes.ReadyzRoute(sher))也就是说/filter对应的入口函数就是routes.PredicateRoute(sher)从 HAMi 当前源码可以看到PredicateRoute()定义在pkg/scheduler/routes/route.go中HAMi 在 scheduler 进程启动时通过router.POST(/filter, routes.PredicateRoute(sher))注册该路由。二、什么情况下 kube-scheduler 会调用 HAMi /filter要进入 HAMi/filter一般需要满足几个条件。第一kube-scheduler 配置了 HAMi extender并且配置了filterVerb。在 kube-scheduler 配置中filterVerb表示 extender 的过滤接口路径如果这个字段为空表示该 extender 不提供 filter 能力。Kubernetes 官方 kube-scheduler 配置文档对Extender的说明是如果某个 verb 未指定或为空就认为该 extender 不提供对应扩展能力其中filterVerb会被追加到urlPrefix后面用于发起 filter 调用。第二如果 extender 配置了managedResources那么只有当 Pod 请求了其中至少一种扩展资源时kube-scheduler 才会把这个 Pod 发送给 extender。如果managedResources为空或未指定则所有 Pod 都可能发送给这个 extender。Kubernetes 官方配置文档中对managedResources的说明是Pod 请求了该列表中至少一种扩展资源时才会在 Filter、Prioritize、Bind 阶段发送给 extender如果为空或未指定则所有 Pod 都会发送给 extender。第三Pod 必须进入 kube-scheduler 的调度流程。这里要注意不是 kube-apiserver 主动把 Pod “发送给 scheduler”而是 kube-scheduler 通过 watch kube-apiserver 中未绑定节点的 Pod然后根据 Pod 的spec.schedulerName进行处理。Kubernetes kube-scheduler 配置文档中也说明schedulerName是调度 profile 关联的调度器名称如果它和 Pod 的spec.schedulerName匹配那么这个 Pod 会使用对应 profile 进行调度。对于请求 HAMi 管理资源的 PodHAMi mutating webhook 通常会根据 Pod 的资源请求修改schedulerName让它进入 HAMi 对应的调度链路。所以可以这样理解用户创建 Pod ↓ kube-apiserver 接收 Pod 创建请求 ↓ HAMi mutating webhook 根据资源请求修改 schedulerName ↓ kube-scheduler watch 到这个待调度 Pod ↓ kube-scheduler 根据 schedulerName 处理该 Pod ↓ Pod 请求了 HAMi extender managedResources 中声明的资源 ↓ kube-scheduler 在过滤阶段调用 HAMi /filter三、PredicateRoute 只是 /filter 的 HTTP 入口PredicateRoute()是 HAMi/filter的 HTTP 入口方法但它本身不负责真正的 GPU/NPU/vGPU 资源计算。它主要负责1. 接收 kube-scheduler 发来的 POST /filter 请求 2. 限制请求体大小 3. 解码 kube-scheduler 传来的 ExtenderArgs 4. 等待 HAMi 本地缓存同步 5. 调用 s.Filter(extenderArgs) 6. 把 ExtenderFilterResult 序列化成 JSON 返回给 kube-scheduler真正的设备资源判断、节点过滤、设备分配、节点评分和 Pod annotation 写入是从下面这一行继续展开的extenderFilterResult, err s.Filter(extenderArgs)因此这篇文章只分析/filter的 HTTP 入口PredicateRoute()。至于s.Filter()内部的getNodesUsage()、calcScore()、fitInDevices()等核心调度逻辑放到下一篇继续分析。四、PredicateRoute 源码逐段解析4.1 PredicateRoute 的整体源码结构HAMi 当前PredicateRoute()的源码结构如下func PredicateRoute(s *scheduler.Scheduler) httprouter.Handle { klog.Infoln(Initializing Predicate Route) return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { klog.V(5).Infoln(Entering Predicate Route handler) checkBody(w, r) var buf bytes.Buffer // Limit the body size to prevent deep nesting/resource exhaustion attacks limitedReader : io.LimitReader(r.Body, maxRequestSize) body : io.TeeReader(limitedReader, buf) var extenderArgs extenderv1.ExtenderArgs var extenderFilterResult *extenderv1.ExtenderFilterResult if err : json.NewDecoder(body).Decode(extenderArgs); err ! nil { klog.ErrorS(err, Failed to decode extender arguments) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), } } else { synced : s.WaitForCacheSync(r.Context()) if !synced { // Poll may return false when context is cancelled err : fmt.Errorf(context cancelled) klog.ErrorS(err, Cache not synced, cannot proceed with filtering) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), } } else { extenderFilterResult, err s.Filter(extenderArgs) if err ! nil { klog.ErrorS(err, Filter error for pod, pod, extenderArgs.Pod.Name) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), } } } } if resultBody, err : json.Marshal(extenderFilterResult); err ! nil { klog.ErrorS(err, Failed to marshal extender filter result, result, extenderFilterResult) w.Header().Set(Content-Type, application/json) w.WriteHeader(http.StatusInternalServerError) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: fmt.Sprintf(Failed to marshal extender filter result: %s, err.Error()), } resultBody, _ json.Marshal(extenderFilterResult) w.Write(resultBody) } else { w.Header().Set(Content-Type, application/json) w.WriteHeader(http.StatusOK) w.Write(resultBody) } } }从整体上看PredicateRoute()做的事情可以分成几步接收/filter请求、检查请求体、限制请求体大小、解码ExtenderArgs、等待 HAMi 本地缓存同步、调用s.Filter(extenderArgs)、最后把ExtenderFilterResult序列化成 JSON 返回给 kube-scheduler。4.2 方法签名PredicateRoute 接收什么、返回什么方法签名如下func PredicateRoute(s *scheduler.Scheduler) httprouter.Handle第一个重点是入参s *scheduler.Scheduler这个s是 HAMi 自己的 scheduler 对象。PredicateRoute()本身不做 GPU 显存计算、不做节点打分、不做设备分配它只是 HTTP 入口。真正的调度逻辑会继续交给s.Filter(extenderArgs)第二个重点是返回值httprouter.Handlehttprouter.Handle本质上是一个 HTTP handler 函数签名是func(http.ResponseWriter, *http.Request, httprouter.Params)所以PredicateRoute()外层在注册路由时执行一次真正每次 kube-scheduler 请求/filter时执行的是它返回的匿名函数。源码中对应的是return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { ... }这里的_ httprouter.Params表示忽略路径参数。因为/filter这个路径没有动态参数所以 HAMi 当前并不需要使用这个参数。4.3 kube-scheduler 传进来的数据ExtenderArgskube-scheduler 调用 extender/filter时请求体是一个 JSON反序列化之后对应 Kubernetes 的ExtenderArgstype ExtenderArgs struct { Pod *v1.Pod Nodes *v1.NodeList NodeNames *[]string }官方类型定义中是这样描述的type ExtenderArgs struct { // Pod being scheduled Pod *v1.Pod // List of candidate nodes where the pod can be scheduled; // to be populated only if Extender.NodeCacheCapable false Nodes *v1.NodeList // List of candidate node names where the pod can be scheduled; // to be populated only if Extender.NodeCacheCapable true NodeNames *[]string }其中Pod表示当前正在调度的 Pod。Nodes表示候选节点对象列表。只有当 extender 不具备本地 Node cache 能力也就是nodeCacheCapablefalse时kube-scheduler 才需要把完整 Node 对象传给 extender。NodeNames表示候选节点名称列表。只有当 extender 具备本地 Node cache 能力也就是nodeCacheCapabletrue时kube-scheduler 才只传节点名称。因为 extender 自己已经缓存了完整节点信息不需要 kube-scheduler 每次都传完整 Node 对象。Kubernetes 官方ExtenderArgs类型说明里也明确写了Nodes在Extender.NodeCacheCapable false时填充NodeNames在Extender.NodeCacheCapable true时填充。4.4 进入 /filter handler当 kube-scheduler 对 HAMi scheduler 发起POST /filter请求时真正执行的是PredicateRoute()返回的匿名函数return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {三个参数分别是w http.ResponseWriter用于写 HTTP 响应最终响应给 kube-scheduler。r *http.Request表示 kube-scheduler 发来的 HTTP 请求请求体里是ExtenderArgsJSON。_ httprouter.Params表示 httprouter 的路径参数。/filter这个路径没有动态参数所以这里用_忽略。这里可以这样理解PredicateRoute()外层函数只是注册路由时执行一次真正每一次/filter请求到来时执行的是它返回的内部匿名函数。4.5 检查请求体是否为空源码中先执行checkBody(w, r)checkBody()的代码是func checkBody(w http.ResponseWriter, r *http.Request) { if r.Body nil { http.Error(w, Please send a request body, 400) return } }它的意图很简单如果 HTTP 请求没有 body就写一个 400 响应。正常情况下kube-scheduler 调用/filter一定会带ExtenderArgs请求体所以这个问题一般不会触发。4.6 限制请求体大小接着源码创建了一个bytes.Buffervar buf bytes.Buffer然后构造一个限制读取大小的 readerlimitedReader : io.LimitReader(r.Body, maxRequestSize)maxRequestSize定义如下const maxRequestSize 1024 * 1024 // 1MB limit也就是说HAMi/filter最多从 HTTP body 中读取 1MB 数据。当前 HAMi 源码中maxRequestSize定义为1024 * 1024并且PredicateRoute()里通过io.LimitReader(r.Body, maxRequestSize)限制请求体读取大小。这一步的作用是给/filter入口加一层保护避免异常请求体过大导致 HAMi scheduler 消耗过多内存或 CPU。需要注意的是LimitReader本身不是“检测到超过 1MB 就报错”。它只是最多让后续逻辑读到 1MB。4.7 TeeReader 边读边复制请求体接着源码执行body : io.TeeReader(limitedReader, buf)io.TeeReader的作用可以理解成 Linux 里的tee后续从body读到什么它就同步往buf写一份。数据流可以画成这样r.Body ↓ io.LimitReader最多读取 1MB ↓ io.TeeReader一边给 JSON decoder 读一边复制到 buf ↓ json.NewDecoder(body).Decode(extenderArgs)不过在当前 HAMiPredicateRoute()中buf后续没有被读取或打印。因此当前这段代码里buf没有参与实际调度决策更像是预留给调试、日志或审计的缓冲。也就是说当前真正被 JSON decoder 使用的是body而buf只是同步保存了一份读取过的请求体内容。4.8 定义请求对象和响应对象源码中定义了两个变量var extenderArgs extenderv1.ExtenderArgs var extenderFilterResult *extenderv1.ExtenderFilterResult第一个变量extenderArgs用于保存 kube-scheduler 发来的请求参数也就是当前 Pod 和候选节点。第二个变量extenderFilterResult用于保存 HAMi 返回给 kube-scheduler 的过滤结果。ExtenderFilterResult定义如下type ExtenderFilterResult struct { Nodes *v1.NodeList NodeNames *[]string FailedNodes FailedNodesMap FailedAndUnresolvableNodes FailedNodesMap Error string }其中Nodes / NodeNames 表示过滤后仍然可以调度的节点。 FailedNodes 表示被过滤掉的节点以及失败原因。 FailedAndUnresolvableNodes 表示即使抢占也无法解决的失败节点。 Error 表示 extender 自身处理失败的信息。Kubernetes 官方ExtenderFilterResult类型说明中也明确写了Nodes和NodeNames表示过滤后的可调度节点集合FailedNodes表示被过滤掉的节点及原因FailedAndUnresolvableNodes表示抢占也无法解决的失败节点Error表示错误信息。4.9 解码 kube-scheduler 发来的 ExtenderArgsHAMi 使用 JSON decoder 把 HTTP body 解码成ExtenderArgsif err : json.NewDecoder(body).Decode(extenderArgs); err ! nil { klog.ErrorS(err, Failed to decode extender arguments) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), } } else { ... }如果请求体不是合法 JSON或者 JSON 结构无法正确解码成ExtenderArgsHAMi 不会继续进入s.Filter()而是构造一个带Error字段的ExtenderFilterResult。注意这里不是直接把 HTTP 状态码写成 500而是把错误放进ExtenderFilterResult.Error后面统一序列化成 JSON 返回给 kube-scheduler。当前 HAMi 源码中解码失败后只是设置extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), }后续仍然会进入统一的json.Marshal(extenderFilterResult)返回流程。4.10 等待 HAMi 本地缓存同步如果 JSON 解码成功HAMi 会先执行synced : s.WaitForCacheSync(r.Context())为什么要等 cache 同步因为 HAMi 的调度判断依赖本地缓存例如Node 信息 Pod 已分配设备信息 设备使用情况 ResourceQuota 信息 HAMi 自己维护的 Pod / Node cache如果本地缓存还没有同步完成就进入s.Filter()可能会基于不完整的数据做资源判断导致调度结果不准确。当前WaitForCacheSync()的核心逻辑如下func (s *Scheduler) WaitForCacheSync(ctx context.Context) bool { err : wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(context.Context) (done bool, err error) { s.lock.RLock() defer s.lock.RUnlock() return s.synced, nil }) if err ! nil { klog.ErrorS(err, failed to poll until context cancel) return false } return true }也就是说它会周期性检查s.synced直到同步完成或者请求上下文被取消。HAMi 当前源码中WaitForCacheSync()通过wait.PollUntilContextCancel()轮询s.synced如果轮询被取消或出错则返回 false。如果没有同步成功当前 HAMi 源码会构造一个错误结果err : fmt.Errorf(context cancelled) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), }然后继续走后面的统一 JSON 返回逻辑。这里也要注意当前代码不是在 cache 未同步时直接写 HTTP 500 并return而是把错误写到ExtenderFilterResult.Error里最后统一 marshal 返回。4.11 调用真正的过滤逻辑 s.Filter()当请求体解码成功并且 HAMi 本地缓存已经同步完成后才会进入真正的过滤逻辑extenderFilterResult, err s.Filter(extenderArgs)PredicateRoute()只负责 HTTP 入口层面的事情接收 HTTP 请求 限制 body 大小 解码 ExtenderArgs 等待 cache 同步 调用 s.Filter() 序列化 ExtenderFilterResult 返回给 kube-scheduler真正的异构设备调度逻辑从s.Filter(extenderArgs)开始。s.Filter()的主流程包括1. 解析 Pod 请求的设备资源device.Resourcereqs(args.Pod) 2. 如果 Pod 没有请求设备资源直接返回原始 NodeNames 3. 删除当前 Pod 的旧分配缓存s.podManager.DelPod(args.Pod) 4. 统计候选节点设备使用情况s.getNodesUsage(args.NodeNames, args.Pod) 5. 计算节点分数s.calcScore(...) 6. 如果没有可用节点返回 FailedNodes 7. 对节点分数排序 8. 选择分数最高的节点 9. 生成设备分配 annotation 10. 更新 HAMi 内部 Pod / quota 使用缓存 11. Patch Pod annotations 12. 返回只包含目标节点的 ExtenderFilterResult这里要特别注意PredicateRoute() 本身不写 Pod annotation但它调用的 s.Filter() 会写 Pod annotation。从代码职责边界来看PredicateRoute()是 HTTP 门面层s.Filter()才是 HAMi 资源过滤、节点评分和设备分配的核心入口。4.12 处理 s.Filter() 返回的错误如果s.Filter(extenderArgs)返回错误PredicateRoute()会执行if err ! nil { klog.ErrorS(err, Filter error for pod, pod, extenderArgs.Pod.Name) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: err.Error(), } }也就是说PredicateRoute()不会自己修复调度错误也不会自己重新计算节点它只是把错误包装进ExtenderFilterResult.Error然后交给 kube-scheduler 处理。从 kube-scheduler 的角度看如果 extender 返回的ExtenderFilterResult.Error非空kube-scheduler 会把它视为 extender 调用失败。4.13 序列化 ExtenderFilterResult 并返回最后PredicateRoute()会统一把extenderFilterResult序列化成 JSONif resultBody, err : json.Marshal(extenderFilterResult); err ! nil { klog.ErrorS(err, Failed to marshal extender filter result, result, extenderFilterResult) w.Header().Set(Content-Type, application/json) w.WriteHeader(http.StatusInternalServerError) extenderFilterResult extenderv1.ExtenderFilterResult{ Error: fmt.Sprintf(Failed to marshal extender filter result: %s, err.Error()), } resultBody, _ json.Marshal(extenderFilterResult) w.Write(resultBody) } else { w.Header().Set(Content-Type, application/json) w.WriteHeader(http.StatusOK) w.Write(resultBody) }这里分两种情况。第一种情况JSON 序列化失败。这时 HAMi 会返回 HTTP 500并且构造一个新的ExtenderFilterResult.Error返回。第二种情况JSON 序列化成功。这时 HAMi 返回HTTP 200 Content-Type: application/json ExtenderFilterResult JSON当前 HAMi/filter中不管前面是 JSON 解码失败、cache 未同步还是s.Filter()返回错误只要最终json.Marshal(extenderFilterResult)成功都会走 HTTP 200 返回只是响应体里的Error字段可能非空。五、PredicateRoute 的职责边界PredicateRoute()是 HAMi scheduler extender/filter的 HTTP 入口。它主要负责入口层面的请求处理不负责具体 GPU/NPU/vGPU 调度算法。它负责1. 接收 kube-scheduler 发来的 POST /filter 请求 2. 检查请求体是否存在 3. 限制请求体最大读取 1MB 4. 把 HTTP body 解码成 ExtenderArgs 5. 等待 HAMi scheduler 本地 cache 同步 6. 调用 s.Filter(extenderArgs) 7. 把 ExtenderFilterResult 序列化成 JSON 返回给 kube-scheduler它不直接负责1. 判断 GPU 显存是否足够 2. 判断 GPU core 是否足够 3. 统计每个节点设备使用情况 4. 给节点打分 5. 选择具体设备 6. 写 Pod 设备分配 annotation 7. 绑定 Pod 到 Node真正的调度逻辑从下面这一行开始extenderFilterResult, err s.Filter(extenderArgs)而s.Filter()会继续进入device.Resourcereqs() s.getNodesUsage() s.calcScore() sort.Sort(nodeScores) val.PatchAnnotations() util.PatchPodAnnotations()最后返回ExtenderFilterResult{NodeNames: []string{m.NodeID}}所以PredicateRoute()可以理解为kube-scheduler extender filter 请求的 HTTP 门面层而s.Filter()才是HAMi 设备资源过滤、节点评分、节点选择和 annotation 写入的核心入口这就是/filter入口方法PredicateRoute()在 HAMi 调度链路中的真正作用。本人运维小白欢迎各位大佬批评指正。

相关新闻