<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          還沒吃透 K8S 調(diào)度器?看這篇文章就夠了

          共 38631字,需瀏覽 78分鐘

           ·

          2022-08-25 12:48

          1. kube-scheduler 的設(shè)計(jì)

          Scheduler 在整個(gè)系統(tǒng)中承擔(dān)了“承上啟下”的重要功能。“承上”是指它負(fù)責(zé)接受 Controller Manager 創(chuàng)建的新 Pod,為其安排 Node;“啟下”是指安置工作完成后,目標(biāo) Node 上的 kubelet 服務(wù)進(jìn)程接管后續(xù)工作。Pod 是 Kubernetes 中最小的調(diào)度單元,Pod 被創(chuàng)建出來的工作流程如圖所示:

          在這張圖中

          • 第一步通過 apiserver REST API 創(chuàng)建一個(gè) Pod。
          • 然后 apiserver 接收到數(shù)據(jù)后將數(shù)據(jù)寫入到 etcd 中。
          • 由于 kube-scheduler 通過 apiserver watch API 一直在監(jiān)聽資源的變化,這個(gè)時(shí)候發(fā)現(xiàn)有一個(gè)新的 Pod,但是這個(gè)時(shí)候該 Pod 還沒和任何 Node 節(jié)點(diǎn)進(jìn)行綁定,所以 kube-scheduler 就進(jìn)行調(diào)度,選擇出一個(gè)合適的 Node 節(jié)點(diǎn),將該 Pod 和該目標(biāo) Node 進(jìn)行綁定。綁定之后再更新消息到 etcd 中。
          • 這個(gè)時(shí)候一樣的目標(biāo) Node 節(jié)點(diǎn)上的 kubelet 通過 apiserver watch API 檢測到有一個(gè)新的 Pod 被調(diào)度過來了,他就將該 Pod 的相關(guān)數(shù)據(jù)傳遞給后面的容器運(yùn)行時(shí)(container runtime),比如 Docker,讓他們?nèi)ミ\(yùn)行該 Pod。
          • 而且 kubelet 還會(huì)通過 container runtime 獲取 Pod 的狀態(tài),然后更新到 apiserver 中,當(dāng)然最后也是寫入到 etcd 中去的。

          通過這個(gè)流程我們可以看出整個(gè)過程中最重要的就是 apiserver watch API 和kube-scheduler的調(diào)度策略。

          總之,kube-scheduler 的功能是為還沒有和任何 Node 節(jié)點(diǎn)綁定的 Pods 逐個(gè)地挑選最合適 Pod 的 Node 節(jié)點(diǎn),并將綁定信息寫入 etcd 中。整個(gè)調(diào)度流程分為,預(yù)選(Predicates)和優(yōu)選(Priorities)兩個(gè)步驟。

          1. 預(yù)選(Predicates):kube-scheduler 根據(jù)預(yù)選策略(xxx Predicates)過濾掉不滿足策略的 Nodes。例如,官網(wǎng)中給的例子 node3 因?yàn)闆]有足夠的資源而被剔除。
          2. 優(yōu)選(Priorities):優(yōu)選會(huì)根據(jù)優(yōu)先策略(xxx Priority)為通過預(yù)選的 Nodes 進(jìn)行打分排名,選擇得分最高的 Node。例如,資源越富裕、負(fù)載越小的 Node 可能具有越高的排名。

          2. kube-scheduler 源碼分析

          kubernetes 版本: v1.21

          2.1 scheduler.New() 初始化 scheduler 結(jié)構(gòu)體

          在程序的入口,是通過一個(gè) runCommand 函數(shù)來喚醒 scheduler 的操作的。首先會(huì)進(jìn)入 Setup 函數(shù),它會(huì)根據(jù)命令參數(shù)和選項(xiàng)創(chuàng)建一個(gè)完整的 config 和 scheduler。創(chuàng)建 scheduler 的方式就是使用 New 函數(shù)。

          Scheduler 結(jié)構(gòu)體:

          ?
          • SchedulerCache:通過 SchedulerCache 做出的改變將被 NodeLister 和 Algorithm 觀察到。
          • NextPod :應(yīng)該是一個(gè)阻塞直到下一個(gè) Pod 存在的函數(shù)。之所以不使用 channel 結(jié)構(gòu),是因?yàn)檎{(diào)度 pod 可能需要一些時(shí)間,k8s 不希望 pod 位于通道中變得陳舊。
          • Error:在出現(xiàn)錯(cuò)誤的時(shí)候被調(diào)用。如果有錯(cuò)誤,它會(huì)傳遞有問題的 pod 信息,和錯(cuò)誤。
          • StopEverything:通過關(guān)閉它來停止 scheduler。
          • SchedulingQueue:保存著正在準(zhǔn)備被調(diào)度的 pod 列表。
          • Profiles:調(diào)度的策略。

          scheduler.New() 方法是初始化 scheduler 結(jié)構(gòu)體的,該方法主要的功能是初始化默認(rèn)的調(diào)度算法以及默認(rèn)的調(diào)度器 GenericScheduler。

          • 創(chuàng)建 scheduler 配置文件
          • 根據(jù)默認(rèn)的 DefaultProvider 初始化schedulerAlgorithmSource然后加載默認(rèn)的預(yù)選及優(yōu)選算法,然后初始化 GenericScheduler
          • 若啟動(dòng)參數(shù)提供了 policy config 則使用其覆蓋默認(rèn)的預(yù)選及優(yōu)選算法并初始化 GenericScheduler,不過該參數(shù)現(xiàn)已被棄用

          kubernetes/pkg/scheduler/scheduler.go:189

          // New函數(shù)創(chuàng)建一個(gè)新的scheduler
          func New(client clientset.Interface, informerFactory informers.SharedInformerFactory,recorderFactory profile.RecorderFactory, stopCh <-chan struct{},opts ...Option) (*Scheduler, error) {

            //查看并設(shè)置傳入的參數(shù)
                ……
            snapshot := internalcache.NewEmptySnapshot()
            // 創(chuàng)建scheduler的配置文件
            configurator := &Configurator{……}
            metrics.Register()

            var sched *Scheduler
            source := options.schedulerAlgorithmSource
            switch {
            case source.Provider != nil:
              // 根據(jù)Provider創(chuàng)建config
              sc, err := configurator.createFromProvider(*source.Provider)
              ……
            case source.Policy != nil:
              // 根據(jù)用戶指定的策略(policy source)創(chuàng)建config
              
              // 既然已經(jīng)設(shè)置了策略,在configuation內(nèi)設(shè)置extender為nil
              // 如果沒有,從Configuration的實(shí)例里設(shè)置extender
              configurator.extenders = policy.Extenders
              sc, err := configurator.createFromConfig(*policy)
              ……
            }
            // 對配置器生成的配置進(jìn)行額外的調(diào)整
            sched.StopEverything = stopEverything
            sched.client = client

            addAllEventHandlers(sched, informerFactory)
            return sched, nil
          }

          在 New 函數(shù)里提供了兩種初始化 scheduler 的方式,一種是 source.Provider,一種是 source.Policy,最后生成的 config 信息都會(huì)通過sched = sc創(chuàng)建新的調(diào)度器。Provider 方法對應(yīng)的是createFromProvider函數(shù),Policy 方法對應(yīng)的是createFromConfig函數(shù),最后它們都會(huì)調(diào)用 Create 函數(shù),實(shí)例化 podQueue,返回配置好的 Scheduler 結(jié)構(gòu)體。

          2.2 Run() 啟動(dòng)主邏輯

          kubernetes 中所有組件的啟動(dòng)流程都是類似的,首先會(huì)解析命令行參數(shù)、添加默認(rèn)值,kube-scheduler 的默認(rèn)參數(shù)在 k8s.io/kubernetes/pkg/scheduler/apis/config/v1alpha1/defaults.go 中定義的。然后會(huì)執(zhí)行 run 方法啟動(dòng)主邏輯,下面直接看 kube-scheduler 的主邏輯 run 方法執(zhí)行過程。

          Run() 方法主要做了以下工作:

          • 配置了 Configz 參數(shù)
          • 啟動(dòng)事件廣播器,健康檢測服務(wù),http server
          • 啟動(dòng)所有的 informer
          • 執(zhí)行 sched.Run() 方法,執(zhí)行主調(diào)度邏輯

          kubernetes/cmd/kube-scheduler/app/server.go:136

          // Run 函數(shù)根據(jù)指定的配置執(zhí)行調(diào)度程序。當(dāng)出現(xiàn)錯(cuò)誤或者上下文完成的時(shí)候才會(huì)返回。
          func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
            // 為了幫助debug,先記錄Kubernetes的版本號(hào)
            klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

            // 1、配置Configz
            if cz, err := configz.New("componentconfig"); err == nil {……}

            // 2、準(zhǔn)備事件廣播管理器,此處涉及到Events事件
          cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

            // 3、啟動(dòng) http server,進(jìn)行健康監(jiān)控服務(wù)器監(jiān)聽
            if cc.InsecureServing != nil {……}
            if cc.InsecureMetricsServing != nil {……}
            if cc.SecureServing != nil {……}

            // 4、啟動(dòng)所有 informer
            cc.InformerFactory.Start(ctx.Done())
            // 等待所有的緩存同步后再進(jìn)行調(diào)度。
            cc.InformerFactory.WaitForCacheSync(ctx.Done())

            // 5、因?yàn)镸aster節(jié)點(diǎn)可以存在多個(gè),選舉一個(gè)作為Leader。通過 LeaderElector 運(yùn)行命令直到完成并退出。
            if cc.LeaderElection != nil {
              cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
                OnStartedLeading: func(ctx context.Context) {
                  close(waitingForLeader)
                  // 6、執(zhí)行 sched.Run() 方法,執(zhí)行主調(diào)度邏輯
                  sched.Run(ctx)
                },
                // 鉤子函數(shù),開啟Leading時(shí)運(yùn)行調(diào)度,結(jié)束時(shí)打印報(bào)錯(cuò)
                OnStoppedLeading: func() {
                  klog.Fatalf("leaderelection lost")
                },
              }
              leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
              // 參加選舉的會(huì)持續(xù)通信
              leaderElector.Run(ctx)
              return fmt.Errorf("lost lease")
            }

            // 領(lǐng)導(dǎo)者選舉失敗,所以runCommand函數(shù)會(huì)一直運(yùn)行直到完成
            close(waitingForLeader)
            // 6、執(zhí)行 sched.Run() 方法,執(zhí)行主調(diào)度邏輯
            sched.Run(ctx)
            return fmt.Errorf("finished without leader elect")
          }

          • 這里相比 16 版本增加了一個(gè)waitingForLeader的 channel 用來監(jiān)聽信號(hào)
          • Setup 函數(shù)中提到了 Informer。k8s 中有各種類型的資源,包括自定義的。而 Informer 的實(shí)現(xiàn)就將調(diào)度和資源結(jié)合了起來。pod informer 的啟動(dòng)邏輯是,只監(jiān)聽 status.phase 不為 succeeded 以及 failed 狀態(tài)的 pod,即非 terminating 的 pod。

          2.3 sched.Run()開始監(jiān)聽和調(diào)度

          然后繼續(xù)看 Run() 方法中最后執(zhí)行的 sched.Run() 調(diào)度循環(huán)邏輯,若 informer 中的 cache 同步完成后會(huì)啟動(dòng)一個(gè)循環(huán)邏輯執(zhí)行 sched.scheduleOne 方法。

          kubernetes/pkg/scheduler/scheduler.go:313

          // Run函數(shù)開始監(jiān)視和調(diào)度。SchedulingQueue開始運(yùn)行。一直處于調(diào)度狀態(tài)直到Context完成一直阻塞。
          func (sched *Scheduler) Run(ctx context.Context) {
          sched.SchedulingQueue.Run()
          wait.UntilWithContext(ctx, sched.scheduleOne, 0)
          sched.SchedulingQueue.Close()
          }
          • sched.SchedulingQueue.Run():會(huì)將 backoffQ 中的 Pods 節(jié)點(diǎn)和 unschedulableQ 中的節(jié)點(diǎn)移至 activeQ 中。即將之前運(yùn)行失敗的節(jié)點(diǎn)和已經(jīng)等待了很長時(shí)間超過時(shí)間設(shè)定的節(jié)點(diǎn)重新進(jìn)入活躍節(jié)點(diǎn)隊(duì)列中。
            • backoffQ 是并發(fā)編程中常見的一種機(jī)制,就是如果一個(gè)任務(wù)重復(fù)執(zhí)行,但依舊失敗,則會(huì)按照失敗的次數(shù)提高重試等待時(shí)間,避免頻繁重試?yán)速M(fèi)資源。
          • sched.SchedulingQueue.Close(),關(guān)閉調(diào)度之后,對隊(duì)列也進(jìn)行關(guān)閉。SchedulingQueue 是一個(gè)優(yōu)先隊(duì)列。
            • 優(yōu)先作為實(shí)現(xiàn) SchedulingQueue 的實(shí)現(xiàn),其核心數(shù)據(jù)結(jié)構(gòu)主要包含三個(gè)隊(duì)列:activeQ、podBackoffQ、unschedulableQ 內(nèi)部通過 cond 來實(shí)現(xiàn) Pop 操作的阻塞與通知。當(dāng)前隊(duì)列中沒有可調(diào)度的 pod 的時(shí)候,則通過 cond.Wait 來進(jìn)行阻塞,然后在往 activeQ 中添加 pod 的時(shí)候通過 cond.Broadcast 來實(shí)現(xiàn)通知。
          • wait.UntilWithContext()中出現(xiàn)了 sched.scheduleOne 函數(shù),它負(fù)責(zé)了為單個(gè) Pod 執(zhí)行整個(gè)調(diào)度工作流程,也是本次研究的重點(diǎn),接下來將會(huì)詳細(xì)地進(jìn)行分析。

          2.4 scheduleOne() 分配 pod 的流程

          scheduleOne() 每次對一個(gè) pod 進(jìn)行調(diào)度,主要有以下步驟:

          • 從 scheduler 調(diào)度隊(duì)列中取出一個(gè) pod,如果該 pod 處于刪除狀態(tài)則跳過
          • 執(zhí)行調(diào)度邏輯 sched.schedule() 返回通過預(yù)算及優(yōu)選算法過濾后選出的最佳 node
          • 如果過濾算法沒有選出合適的 node,則返回 core.FitError
          • 若沒有合適的 node 會(huì)判斷是否啟用了搶占策略,若啟用了則執(zhí)行搶占機(jī)制
          • 執(zhí)行 reserve plugin
          • pod 對應(yīng)的 spec.NodeName 寫上 scheduler 最終選擇的 node,更新 scheduler cache
          • 執(zhí)行 permit plugin
          • 執(zhí)行 prebind plugin
          • 進(jìn)行綁定,請求 apiserver 異步處理最終的綁定操作,寫入到 etcd
          • 執(zhí)行 postbind plugin

          kubernetes/pkg/scheduler/scheduler.go:441

          1. 準(zhǔn)備工作
          // scheduleOne為單個(gè)pod做整個(gè)調(diào)度工作流程。它被序列化在調(diào)度算法的主機(jī)擬合上。
          func (sched *Scheduler) scheduleOne(ctx context.Context) {
             // podInfo就是從隊(duì)列中獲取到的Pod對象
             podInfo := sched.NextPod()
             // 檢查pod的有效性,當(dāng) schedulerQueue 關(guān)閉時(shí),pod 可能為nil
             if podInfo == nil || podInfo.Pod == nil {
                return
             }
             pod := podInfo.Pod
             //根據(jù)定義的pod.Spec.SchedulerName查到對應(yīng)的profile
             fwk, err := sched.frameworkForPod(pod)
             if err != nil {
                // 這不應(yīng)該發(fā)生,因?yàn)槲覀冎唤邮苷{(diào)度指定與配置文件之一匹配的調(diào)度程序名稱的pod。
                klog.ErrorS(err, "Error occurred")
                return
             }
             // 可以跳過調(diào)度的情況,一般pod進(jìn)不來
             if sched.skipPodSchedule(fwk, pod) {
                return
             }

             klog.V(3).InfoS("Attempting to schedule pod""pod", klog.KObj(pod))

          1. 調(diào)用調(diào)度算法,獲取結(jié)果
          // 執(zhí)行調(diào)度策略選擇node
            start := time.Now()
            state := framework.NewCycleState()
            state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
            schedulingCycleCtxcancel := context.WithCancel(ctx)
            defer cancel()
            scheduleResulterr := sched.Algorithm.Schedule(schedulingCycleCtxfwkstatepod)
            if err != nil {
            /*
              出現(xiàn)調(diào)度失敗的情況:
              這個(gè)時(shí)候可能會(huì)觸發(fā)搶占preempt,搶占是一套復(fù)雜的邏輯,這里略去
              目前假設(shè)各類資源充足,能正常調(diào)度
              */

            }


          1. assumedPod 是假設(shè)這個(gè) Pod 按照前面的調(diào)度算法分配后,進(jìn)行驗(yàn)證。告訴緩存假設(shè)一個(gè) pod 現(xiàn)在正在某個(gè)節(jié)點(diǎn)上運(yùn)行,即使它還沒有被綁定。這使得我們可以繼續(xù)調(diào)度,而不需要等待綁定的發(fā)生。
          metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
             assumedPodInfo := podInfo.DeepCopy()
             assumedPod := assumedPodInfo.Pod
             // 為pod設(shè)置NodeName字段,更新scheduler緩存
             err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
             if err != nil {……} // 如果出現(xiàn)錯(cuò)誤,重新開始調(diào)度

             // 運(yùn)行相關(guān)插件的代碼不作展示,這里省略運(yùn)行reserve插件的Reserve方法、運(yùn)行 "permit" 插件、 運(yùn)行 "prebind" 插件.

             // 真正做綁定的動(dòng)作
          err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
              if err != nil {
                // 錯(cuò)誤處理,清除狀態(tài)并重試
              } else {
                // 打印結(jié)果,調(diào)試時(shí)將log level調(diào)整到2以上
                if klog.V(2).Enabled() {
                  klog.InfoS("Successfully bound pod to node""pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
                }
                // metrics中記錄相關(guān)的監(jiān)控指標(biāo)
                metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
                metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
                metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

             // 運(yùn)行 "postbind" 插件

          Binder 負(fù)責(zé)將調(diào)度器的調(diào)度結(jié)果,傳遞給 apiserver,即將一個(gè) pod 綁定到選擇出來的 node 節(jié)點(diǎn)。

          2.5 sched.Algorithm.Schedule() 選出 node

          在上一節(jié)中scheduleOne() 通過調(diào)用 sched.Algorithm.Schedule() 來執(zhí)行預(yù)選與優(yōu)選算法處理:

          scheduleResulterr := sched.Algorithm.Schedule(schedulingCycleCtxfwkstatepod)

          Schedule()方法屬于 ScheduleAlgorithm 接口的一個(gè)方法實(shí)現(xiàn)。ScheduleAlgorithm 是一個(gè)知道如何將 pods 調(diào)度到機(jī)器上的事物實(shí)現(xiàn)的接口。在 1.16 版本中 ScheduleAlgorithm 有四個(gè)方法——Schedule()Preempt()Predicates()Prioritizers(),現(xiàn)在則是Schedule()Extenders() 在目前的代碼中進(jìn)行優(yōu)化,保證了程序的安全性。代碼中有一個(gè) todo,目前的

          名字已經(jīng)不太符合這個(gè)接口所做的工作。

          kubernetes/pkg/scheduler/core/generic_scheduler.go 61

          type ScheduleAlgorithm interface {
            Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
            // 擴(kuò)展器返回?cái)U(kuò)展器配置的一個(gè)片斷。這是為測試而暴露的。
            Extenders() []framework.Extender
          }

          點(diǎn)擊查看 Scheduler()的具體實(shí)現(xiàn),發(fā)現(xiàn)它是由 genericScheduler 來進(jìn)行實(shí)現(xiàn)的。

          kubernetes/pkg/scheduler/core/generic_scheduler.go 97

          func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
            trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
            defer trace.LogIfLong(100 * time.Millisecond)
            // 1.快照 node 信息,每次調(diào)度 pod 時(shí)都會(huì)獲取一次快照
            if err := g.snapshot(); err != nil {
              return result, err
            }
            trace.Step("Snapshotting scheduler cache and node infos done")

            if g.nodeInfoSnapshot.NumNodes() == 0 {
              return result, ErrNoNodesAvailable
            }
            // 2.Predict階段:找到所有滿足調(diào)度條件的節(jié)點(diǎn)feasibleNodes,不滿足的就直接過濾
            feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
            if err != nil {
              return result, err
            }
            trace.Step("Computing predicates done")
            // 3.預(yù)選后沒有合適的 node 直接返回
            if len(feasibleNodes) == 0 {
              return result, &framework.FitError{
                Pod:         pod,
                NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
                Diagnosis:   diagnosis,
              }
            }
            // 4.當(dāng)預(yù)選之后只剩下一個(gè)node,就使用它
            if len(feasibleNodes) == 1 {
              return ScheduleResult{
                SuggestedHost:  feasibleNodes[0].Name,
                EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
                FeasibleNodes:  1,
              }, nil
            }
            // 5.Priority階段:執(zhí)行優(yōu)選算法,獲取打分之后的node列表
            priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
            if err != nil {
              return result, err
            }
            // 6.根據(jù)打分選擇分?jǐn)?shù)最高的node
            host, err := g.selectHost(priorityList)
            trace.Step("Prioritizing done")

            return ScheduleResult{
              SuggestedHost:  host,
              EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
              FeasibleNodes:  len(feasibleNodes),
            }, err
          }

          流程圖如圖所示:

          • 在程序運(yùn)行的整個(gè)過程中會(huì)使用 trace 來記錄當(dāng)前的運(yùn)行狀態(tài),做安全處理。
          • 如果超過了 trace 預(yù)定的時(shí)間會(huì)進(jìn)行回滾

          至此整個(gè) Scheduler 分配 node 節(jié)點(diǎn)給 pod 的調(diào)度策略的基本流程介紹完畢。

          2.6 總結(jié)

          在本章節(jié)中,首先對 Kube-scheduler 進(jìn)行了介紹。它在整個(gè) k8s 的系統(tǒng)里,啟承上啟下的中藥作用,是核心組件之一。它的目的就是為每一個(gè) pod 選擇一個(gè)合適的 node,整體流程可以概括為五步:

          1. 首先是 scheduler 組件的初始化;
          2. 其次是客戶端發(fā)起 command,啟動(dòng)調(diào)度過程中用的服務(wù),比如事件廣播管理器,啟動(dòng)所有的 informer 組件等等;
          3. 再次是啟動(dòng)整個(gè)調(diào)度器的主流程,特別需要指出的是,整個(gè)流程都會(huì)堵塞在wait.UntilWithContext()這個(gè)函數(shù)中,一直調(diào)用ScheduleOne()進(jìn)行 pod 的調(diào)度分配策略。
          4. 然后客戶獲取未調(diào)度的 podList,通過執(zhí)行調(diào)度邏輯 sched.schedule() 為 pod 選擇一個(gè)合適的 node,如果沒有合適的 node,則觸發(fā)搶占的操作,最后提進(jìn)行綁定,請求 apiserver 異步處理最終的綁定操作,寫入到 etcd,其核心則是一系列調(diào)度算法的設(shè)計(jì)與執(zhí)行。
          5. 最后對一系列的調(diào)度算法進(jìn)行了解讀,調(diào)度過程主要為,對當(dāng)前的節(jié)點(diǎn)情況做快照,然后通過預(yù)選和優(yōu)選兩個(gè)主要步驟,為 pod 分配一個(gè)合適的 node。

          3. 預(yù)選與優(yōu)選算法源碼細(xì)節(jié)分析

          3.1 預(yù)選算法

          預(yù)選顧名思義就是從當(dāng)前集群中的所有的 node 中進(jìn)行過濾,選出符合當(dāng)前 pod 運(yùn)行的 nodes。預(yù)選的核心流程是通過findNodesThatFit來完成,其返回預(yù)選結(jié)果供優(yōu)選流程使用。預(yù)選算法的主要邏輯如圖所示:

          kubernetes/pkg/scheduler/core/generic_scheduler.go 223

          // 根據(jù)prefilter插件和extender過濾節(jié)點(diǎn)以找到適合 pod 的節(jié)點(diǎn)。
          func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
             // prefilter插件用于預(yù)處理 Pod 的相關(guān)信息,或者檢查集群或 Pod 必須滿足的某些條件。
             s := fwk.RunPreFilterPlugins(ctx, state, pod)
             ……
             // 查找能夠滿足filter過濾插件的節(jié)點(diǎn),返回結(jié)果有可能是0,1,N
             feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
             // 查找能夠滿足Extenders過濾插件的節(jié)點(diǎn),返回結(jié)果有可能是0,1,N
             feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
             return feasibleNodes, diagnosis, nil
          }

          • 這個(gè)方法首先會(huì)通過前置過濾器來校驗(yàn) pod 是否符合條件;
          • 然后調(diào)用findNodesThatPassFilters方法過濾掉不符合條件的 node。這樣就能設(shè)定最多需要檢查的節(jié)點(diǎn)數(shù),作為預(yù)選節(jié)點(diǎn)數(shù)組的容量,避免總結(jié)點(diǎn)過多影響效率。
          • 最后是findNodesThatPassExtenders函數(shù),它是 kubernets 留給用戶的外部擴(kuò)展方式,暫且不表。

          findNodesThatPassFilters查找適合過濾器插件的節(jié)點(diǎn),在這個(gè)方法中首先會(huì)根據(jù)numFeasibleNodesToFind方法選擇參與調(diào)度的節(jié)點(diǎn)的數(shù)量,調(diào)用Parallelizer().Until方法開啟 16 個(gè)線程來調(diào)用checkNode方法尋找合適的節(jié)點(diǎn)。判別節(jié)點(diǎn)合適的方式函數(shù)為checkNode(),函數(shù)中會(huì)對節(jié)點(diǎn)進(jìn)行兩次檢查,確保所有的節(jié)點(diǎn)都有相同的機(jī)會(huì)被選擇。

          kubernetes/pkg/scheduler/core/generic_scheduler.go 274

          func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context,fwk framework.Framework,state *framework.CycleState,pod *v1.Pod,diagnosis framework.Diagnosis,nodes []*framework.NodeInfo) ([]*v1.Node, error) {……}
            // 根據(jù)集群節(jié)點(diǎn)數(shù)量選擇參與調(diào)度的節(jié)點(diǎn)的數(shù)量
            numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
            // 初始化一個(gè)大小和numNodesToFind一樣的數(shù)組,用來存放node節(jié)點(diǎn)
            feasibleNodes := make([]*v1.Node, numNodesToFind)
            ……
            checkNode := func(i int) {
              // 我們從上一個(gè)調(diào)度周期中停止的地方開始檢查節(jié)點(diǎn),這是為了確保所有節(jié)點(diǎn)都有相同的機(jī)會(huì)在 pod 中被檢查
              nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
              status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
              if status.Code() == framework.Error {
                errCh.SendErrorWithCancel(status.AsError(), cancel)
                return
              }
              //如果該節(jié)點(diǎn)合適,那么放入到feasibleNodes列表中
              if status.IsSuccess() {……}
            }
            ……
            // 開啟N個(gè)線程并行尋找符合條件的node節(jié)點(diǎn),數(shù)量等于feasibleNodes。一旦找到配置的可行節(jié)點(diǎn)數(shù),就停止搜索更多節(jié)點(diǎn)。
            fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
            processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
            //設(shè)置下次開始尋找node的位置
            g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
            // 合并返回結(jié)果
            feasibleNodes = feasibleNodes[:feasibleNodesLen]
            return feasibleNodes, nil
          }

          在整個(gè)函數(shù)調(diào)用的過程中,有個(gè)很重要的函數(shù)——checkNode()會(huì)被傳入函數(shù),進(jìn)行每個(gè) node 節(jié)點(diǎn)的判斷。具體更深入的細(xì)節(jié)將會(huì)在 3.1.2 節(jié)進(jìn)行介紹。現(xiàn)在根據(jù)這個(gè)函數(shù)的定義可以看出,RunFilterPluginsWithNominatedPods會(huì)判斷當(dāng)前的 node 是否符合要求。如果當(dāng)前的 node 符合要求,就講當(dāng)前的 node 加入預(yù)選節(jié)點(diǎn)的數(shù)組中(feasibleNodes),如果不符合要求,那么就加入到失敗的數(shù)組中,并且記錄原因。

          3.1.1 確定參與調(diào)度的節(jié)點(diǎn)的數(shù)量

          numFeasibleNodesToFind 返回找到的可行節(jié)點(diǎn)的數(shù)量,調(diào)度程序停止搜索更多可行節(jié)點(diǎn)。算法的具體邏輯如下圖所示:

          • 找出能夠進(jìn)行調(diào)度的節(jié)點(diǎn),如果節(jié)點(diǎn)小于minFeasibleNodesToFind(默認(rèn)值為 100),那么全部節(jié)點(diǎn)參與調(diào)度。
          • percentageOfNodesToScore參數(shù)值是一個(gè)集群中所有節(jié)點(diǎn)的百分比,范圍是 1 和 100 之間,0 表示不啟用。如果集群節(jié)點(diǎn)數(shù)大于 100,那么就會(huì)根據(jù)這個(gè)值來計(jì)算讓合適的節(jié)點(diǎn)數(shù)參與調(diào)度。
            • 舉個(gè)例子,如果一個(gè) 5000 個(gè)節(jié)點(diǎn)的集群,percentageOfNodesToScore會(huì)默認(rèn)設(shè)置為 10%,也就是 500 個(gè)節(jié)點(diǎn)參與調(diào)度。因?yàn)槿绻粋€(gè) 5000 節(jié)點(diǎn)的集群來進(jìn)行調(diào)度的話,不進(jìn)行控制時(shí),每個(gè) pod 調(diào)度都需要嘗試 5000 次的節(jié)點(diǎn)預(yù)選過程時(shí)非常消耗資源的。
          • 如果百分比后的數(shù)目小于minFeasibleNodesToFind,那么還是要返回最小節(jié)點(diǎn)的數(shù)目。

          kubernetes/pkg/scheduler/core/generic_scheduler.go 179

          func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
            // 對于一個(gè)小于minFeasibleNodesToFind(100)的節(jié)點(diǎn),全部節(jié)點(diǎn)參與調(diào)度
            // percentageOfNodesToScore參數(shù)值是一個(gè)集群中所有節(jié)點(diǎn)的百分比,范圍是1和100之間,0表示不啟用,如果大于100,就是全量取樣
            // 這兩種情況都是直接便利整個(gè)集群中的所有節(jié)點(diǎn)
             if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
                return numAllNodes
             }
             adaptivePercentage := g.percentageOfNodesToScore
            //當(dāng)numAllNodes大于100時(shí),如果沒有設(shè)置percentageOfNodesToScore,那么這里需要計(jì)算出一個(gè)值
             if adaptivePercentage <= 0 {
                basePercentageOfNodesToScore := int32(50)
                adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
                if adaptivePercentage < minFeasibleNodesPercentageToFind {
                   adaptivePercentage = minFeasibleNodesPercentageToFind
                }
             }
             // 正常取樣計(jì)算,比如numAllNodes為5000,而adaptivePercentage為50%
              // 則numNodes=50000*0.5/100=250
             numNodes = numAllNodes * adaptivePercentage / 100
             // 也不能太小,不能低于minFeasibleNodesToFind的值
             if numNodes < minFeasibleNodesToFind {
                return minFeasibleNodesToFind
             }

             return numNodes
          }

          3.1.2 并行化二次篩選節(jié)點(diǎn)

          并行取樣主要通過調(diào)用工作隊(duì)列的ParallelizeUntil函數(shù)來啟動(dòng) N 個(gè) goroutine 來進(jìn)行并行取樣,并通過 ctx 來協(xié)調(diào)退出。選取節(jié)點(diǎn)的規(guī)則由函數(shù) checkNode 來定義,checkNode里面使用RunFilterPluginsWithNominatedPods篩選出合適的節(jié)點(diǎn)。

          在 k8s 中經(jīng)過調(diào)度器調(diào)度后的 pod 結(jié)果會(huì)放入到 SchedulingQueue 中進(jìn)行暫存,這些 pod 未來可能會(huì)經(jīng)過后續(xù)調(diào)度流程運(yùn)行在提議的 node 上,也可能因?yàn)槟承┰驅(qū)е伦罱K沒有運(yùn)行,而預(yù)選流程為了減少后續(xù)因?yàn)檎{(diào)度沖突,則會(huì)在進(jìn)行預(yù)選的時(shí)候,將這部分 pod 考慮進(jìn)去。如果在這些 pod 存在的情況下,node 可以滿足當(dāng)前 pod 的篩選條件,則可以去除被提議的 pod 再進(jìn)行篩選。

          在搶占的情況下我們會(huì)運(yùn)行兩次過濾器。如果節(jié)點(diǎn)有大于或等于優(yōu)先級(jí)的被提名的 pod,我們在這些 pod 被添加到 PreFilter 狀態(tài)和 nodeInfo 時(shí)運(yùn)行它們。如果所有的過濾器在這一次都成功了,我們在這些被提名的 pod 沒有被添加時(shí)再運(yùn)行它們。

          kubernetes/pkg/scheduler/framework/runtime/framework.go 650

          func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
             var status *framework.Status
             // podsAdded主要用于標(biāo)識(shí)當(dāng)前是否有提議的pod如果沒有提議的pod則就不需要再進(jìn)行一輪篩選了。
             podsAdded := false
            //待檢查的 Node 是一個(gè)即將被搶占的節(jié)點(diǎn),調(diào)度器就會(huì)對這個(gè)Node用同樣的 Predicates 算法運(yùn)行兩遍。
             for i := 0; i < 2; i++ {
                stateToUse := state
                nodeInfoToUse := info
                //處理優(yōu)先級(jí)pod的邏輯
                if i == 0 {
                   var err error
                //查找是否有優(yōu)先級(jí)大于或等于當(dāng)前pod的NominatedPods,然后加入到nodeInfoToUse中
                   podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
                  // 如果第一輪篩選出錯(cuò),則不會(huì)進(jìn)行第二輪篩選
                   if err != nil {
                      return framework.AsStatus(err)
                   }
                } else if !podsAdded || !status.IsSuccess() {
                   break
                }
                //運(yùn)行過濾器檢查該pod是否能運(yùn)行在該節(jié)點(diǎn)上
                statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
                status = statusMap.Merge()
                if !status.IsSuccess() && !status.IsUnschedulable() {
                   return status
                }
             }
             return status
          }

          這個(gè)方法用來檢測 node 是否能通過過濾器,此方法會(huì)在調(diào)度 Schedule 和搶占 Preempt 的時(shí)被調(diào)用,如果在 Schedule 時(shí)被調(diào)用,那么會(huì)測試 node,能否可以讓所有存在的 pod 以及更高優(yōu)先級(jí)的 pod 在該 node 上運(yùn)行。如果在搶占時(shí)被調(diào)用,那么我們首先要移除搶占失敗的 pod,添加將要搶占的 pod。

          RunFilterPlugins 會(huì)運(yùn)行過濾器,過濾器總共有這些:nodeunschedulable, noderesources, nodename, nodeports, nodeaffinity, volumerestrictions, tainttoleration, nodevolumelimits, nodevolumelimits, nodevolumelimits, nodevolumelimits, volumebinding, volumezone, podtopologyspread, interpodaffinity。這里就不詳細(xì)贅述。

          至此關(guān)于預(yù)選模式的調(diào)度算法的執(zhí)行過程已經(jīng)分析完畢。

          3.2 優(yōu)選算法

          優(yōu)選階段通過分離計(jì)算對象來實(shí)現(xiàn)多個(gè) node 和多種算法的并行計(jì)算,并且通過基于二級(jí)索引來設(shè)計(jì)最終的存儲(chǔ)結(jié)果,從而達(dá)到整個(gè)計(jì)算過程中的無鎖設(shè)計(jì),同時(shí)為了保證分配的隨機(jī)性,針對同等優(yōu)先級(jí)的采用了隨機(jī)的方式來進(jìn)行最終節(jié)點(diǎn)的分配。這個(gè)思路很值得借鑒。

          在上文中,我們提到在優(yōu)化過程是先通過 prioritizeNodes 獲得 priorityList,然后再通過 selectHost 函數(shù)獲得得分最高的 Node,返回結(jié)果。

          3.2.1 prioritizeNodes

          在 prioritizeNodes 函數(shù)中會(huì)將需要調(diào)度的 Pod 列表和 Node 列表傳入各種優(yōu)選算法進(jìn)行打分排序,最終整合成結(jié)果集 priorityList。priorityList 是一個(gè) framework.NodeScoreList 的結(jié)構(gòu)體,結(jié)構(gòu)如下面的代碼所示:

          // NodeScoreList 聲明一個(gè)節(jié)點(diǎn)列表及節(jié)點(diǎn)分?jǐn)?shù)
          type NodeScoreList []NodeScore

          // NodeScore 節(jié)點(diǎn)和節(jié)點(diǎn)分?jǐn)?shù)的結(jié)構(gòu)體
          type NodeScore struct {
            Name  string
            Score int64
          }

          prioritizeNodes 通過運(yùn)行評(píng)分插件對節(jié)點(diǎn)進(jìn)行優(yōu)先排序,這些插件從 RunScorePlugins()的調(diào)用中為每個(gè)節(jié)點(diǎn)返回一個(gè)分?jǐn)?shù)。每個(gè)插件的分?jǐn)?shù)和 Extender 的分?jǐn)?shù)加在一起,成為該節(jié)點(diǎn)的分?jǐn)?shù)。整個(gè)流程如圖所示:

          由于 prioritizeNodes 的邏輯太長,這里將他們分四個(gè)部分,如下所示:

          1. 準(zhǔn)備階段
          func (g *genericScheduler) prioritizeNodes(ctx context.Context, fwk framework.Framework,state *framework.CycleState, pod *v1.Pod,nodes []*v1.Node,) (framework.NodeScoreList, error) {
              // 如果沒有提供優(yōu)先級(jí)配置(即沒有Extender也沒有ScorePlugins),則所有節(jié)點(diǎn)的得分為 1。這是生成所需格式的優(yōu)先級(jí)列表所必需的
             if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
                result := make(framework.NodeScoreList, 0len(nodes))
                for i := range nodes {
                   result = append(result, framework.NodeScore{
                      Name:  nodes[i].Name,
                      Score: 1,
                   })
                }
                return result, nil
             }
             // 運(yùn)行PreScore插件,準(zhǔn)備評(píng)分?jǐn)?shù)據(jù)
             preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
             if !preScoreStatus.IsSuccess() {
                return nil, preScoreStatus.AsError()
             }


          1. 運(yùn)行 Score 插件進(jìn)行評(píng)分
           // 運(yùn)行Score插件對Node進(jìn)行評(píng)分,此處需要知道的是scoresMap的類型是map[string][]NodeScore。scoresMap的key是插件名字,value是該插件對所有Node的評(píng)分
             scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
             if !scoreStatus.IsSuccess() {
                return nil, scoreStatus.AsError()
             }
             // result用于匯總所有分?jǐn)?shù)
             result := make(framework.NodeScoreList, 0len(nodes))
             // 將分?jǐn)?shù)按照node的維度進(jìn)行匯總,循環(huán)執(zhí)行l(wèi)en(nodes)次
             for i := range nodes {
                // 先在result中塞滿所有node的Name,Score初始化為0;
                result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
               // 執(zhí)行了多少個(gè)scoresMap就有多少個(gè)Score,所以這里遍歷len(scoresMap)次;
                for j := range scoresMap {
                   // 每個(gè)算法對應(yīng)第i個(gè)node的結(jié)果分值加權(quán)后累加;
                   result[i].Score += scoresMap[j][i].Score
                }
             }

          Score 插件中獲取的分?jǐn)?shù)會(huì)直接記錄在 result[i].Score,result 就是最終返回結(jié)果的 priorityList。

          RunScorePlugins里面分別調(diào)用 parallelize.Until 方法跑三次來進(jìn)行打分:

          第一次會(huì)調(diào)用runScorePlugin方法,里面會(huì)調(diào)用 getDefaultConfig 里面設(shè)置的 score 的 Plugin 來進(jìn)行打分;

          第二次會(huì)調(diào)用runScoreExtension方法,里面會(huì)調(diào)用 Plugin 的NormalizeScore方法,用來保證分?jǐn)?shù)必須是 0 到 100 之間,不是每一個(gè) plugin 都會(huì)實(shí)現(xiàn) NormalizeScore 方法。

          第三次會(huì)調(diào)用遍歷所有的scorePlugins,并對對應(yīng)的算出的來的分?jǐn)?shù)乘以一個(gè)權(quán)重。

          打分的 plugin 共有:noderesources, imagelocality, interpodaffinity, noderesources, nodeaffinity, nodepreferavoidpods, podtopologyspread, tainttoleration

          1. 配置的 Extender 的評(píng)分獲取
            // 如果配置了Extender,還要調(diào)用Extender對Node評(píng)分并累加到result中
             if len(g.extenders) != 0 && nodes != nil {
                // 因?yàn)橐鄥f(xié)程并發(fā)調(diào)用Extender并統(tǒng)計(jì)分?jǐn)?shù),所以需要鎖來互斥寫入Node分?jǐn)?shù)
                var mu sync.Mutex
                var wg sync.WaitGroup
                // combinedScores的key是Node名字,value是Node評(píng)分
                combinedScores := make(map[string]int64len(nodes))
                for i := range g.extenders {
                   // 如果Extender不管理Pod申請的資源則跳過
                   if !g.extenders[i].IsInterested(pod) {
                      continue
                   }
                   // 啟動(dòng)協(xié)程調(diào)用Extender對所有Node評(píng)分。
                   wg.Add(1)
                   go func(extIndex int) {
                      defer func() {
                         wg.Done()
                      }()
                     // 調(diào)用Extender對Node進(jìn)行評(píng)分
                      prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
                      if err != nil {
                         //擴(kuò)展器的優(yōu)先級(jí)錯(cuò)誤可以忽略,讓k8s/其他擴(kuò)展器確定優(yōu)先級(jí)。
                         return
                      }
                      mu.Lock()
                      for i := range *prioritizedList {
                         host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                        // Extender的權(quán)重是通過Prioritize()返回的,其實(shí)該權(quán)重是人工配置的,只是通過Prioritize()返回使用上更方便。
                        // 合并后的評(píng)分是每個(gè)Extender對Node評(píng)分乘以權(quán)重的累加和
                         combinedScores[host] += score * weight
                      }
                      mu.Unlock()
                   }(i)
                }
                // 等待所有的go routines結(jié)束,調(diào)用時(shí)間取決于最慢的Extender。
                wg.Wait()

          Extender 這里有幾個(gè)很有趣的設(shè)置

          • 首先是擴(kuò)展器中如果出現(xiàn)了評(píng)分的錯(cuò)誤,可以忽略,而不是想預(yù)選階段那樣直接返回報(bào)錯(cuò)。
            • 能這樣做的原因是,因?yàn)樵u(píng)分不同于過濾,對錯(cuò)誤不敏感。過濾如果失敗是要返回錯(cuò)誤的(如果不能忽略),因?yàn)?Node 可能無法滿足 Pod 需求;而評(píng)分無非是選擇最優(yōu)的節(jié)點(diǎn),評(píng)分錯(cuò)誤只會(huì)對選擇最優(yōu)有一點(diǎn)影響,但是不會(huì)造成故障。
          • 其次是使用了 combinedScores 來記錄分?jǐn)?shù),考慮到 Extender 和 Score 插件返回的評(píng)分的體系會(huì)存在出入,所以這邊并沒有直接累加。而是后續(xù)再進(jìn)行一次遍歷麻將 Extender 的評(píng)分標(biāo)準(zhǔn)化之后才與原先的 Score 插件評(píng)分進(jìn)行累加。
          • 最后是關(guān)于鎖的使用
            • 在評(píng)分的設(shè)置里面,使用了多協(xié)程來并發(fā)進(jìn)行評(píng)分。在最后分?jǐn)?shù)進(jìn)行匯總的時(shí)候會(huì)出現(xiàn)并發(fā)寫的問題,為了避免這種現(xiàn)象的出現(xiàn),k8s 的程序中對從 prioritizedList 里面讀取節(jié)點(diǎn)名稱和分?jǐn)?shù),然后寫入combinedScores的過程中上了互斥鎖。
            • 為了記錄所有并發(fā)讀取 Extender 的協(xié)程,這里使用了 wait Group 這樣的數(shù)據(jù)結(jié)構(gòu)來保證,所有的 go routines 結(jié)束再進(jìn)行最后的分?jǐn)?shù)累加。這里存在一個(gè)程序性能的問題,所有的線程只要有一個(gè)沒有運(yùn)行完畢,程序就會(huì)卡在這一步。即便是多協(xié)程并發(fā)調(diào)用 Extender,也會(huì)存在木桶效應(yīng),即調(diào)用時(shí)間取決于最慢的 Extender。雖然 Extender 可能都很快,但是網(wǎng)絡(luò)延時(shí)是一個(gè)比較常見的事情,更嚴(yán)重的是如果 Extender 異常造成調(diào)度超時(shí),那么就拖累了整個(gè) kube-scheduler 的調(diào)度效率。這是一個(gè)后續(xù)需要解決的問題
          1. 分?jǐn)?shù)的累加,返回結(jié)果集 priorityList
                for i := range result {
                  // 最終Node的評(píng)分是所有ScorePlugin分?jǐn)?shù)總和+所有Extender分?jǐn)?shù)總和
                // 此處標(biāo)準(zhǔn)化了Extender的評(píng)分,使其范圍與ScorePlugin一致,否則二者沒法累加在一起。
                   result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
                }
             }
             return result, nil
          }

          優(yōu)選算法由一系列的 PriorityConfig(也就是 PriorityConfig 數(shù)組)組成,每個(gè) Config 代表了一個(gè)算法,Config 描述了權(quán)重 Weight、Function(一種優(yōu)選算法函數(shù)類型)。需要調(diào)度的 Pod 分別對每個(gè)合適的 Node(N)執(zhí)行每個(gè)優(yōu)選算法(A)進(jìn)行打分,最后得到一個(gè)二維數(shù)組,元素分別為 A1N1,A1N2,A1N3… ,行代表一個(gè)算法對應(yīng)不同的 Node 計(jì)算得到的分值,列代表同一個(gè) Node 對應(yīng)不同算法的分值:


          N1N2N3
          A1{ Name:“node1”,Score:5,PriorityConfig:{…weight:1}}{ Name:“node2”,Score:3,PriorityConfig:{…weight:1}}{ Name:“node3”,Score:1,PriorityConfig:{…weight:1}}
          A2{ Name:“node1”,Score:6,PriorityConfig:{…weight:1}}{ Name:“node2”,Score:2,PriorityConfig:{…weight:1}}{ Name:“node3”,Score:3,PriorityConfig:{…weight:1}}
          A3{ Name:“node1”,Score:4,PriorityConfig:{…weight:1}}{ Name:“node2”,Score:7,PriorityConfig:{…weight:1.}}{ Name:“node3”,Score:2,PriorityConfig:{…weight:1}}

          最后將結(jié)果合并(Combine)成一維數(shù)組 HostPriorityList :HostPriorityList =[{ Name:"node1",Score:15},{ Name:"node2",Score:12},{ Name:"node3",Score:6}]這樣就完成了對每個(gè) Node 進(jìn)行優(yōu)選算法打分的流程。

          Combine 的過程非常簡單,只需要將 Node 名字相同的分?jǐn)?shù)進(jìn)行加權(quán)求和統(tǒng)計(jì)即可。

          最終得到一維數(shù)組 HostPriorityList,也就是前面提到的 HostPriority 結(jié)構(gòu)體的集合。就這樣實(shí)現(xiàn)了為每個(gè) Node 的打分 Priority 優(yōu)選過程。

          3.2.2 selectHost選出得分最高的 Node

          priorityList 數(shù)組保存了每個(gè) Node 的名字和它對應(yīng)的分?jǐn)?shù),最后通過selectHost函數(shù)選出分?jǐn)?shù)最高的 Node 對 Pod 進(jìn)行綁定和調(diào)度。selectHost通過傳入的 priorityList,然后以隨機(jī)篩選的的方式從得分最高的節(jié)點(diǎn)們中挑選一個(gè)。

          這里的隨機(jī)篩選是指的當(dāng)多個(gè) host 優(yōu)先級(jí)相同的時(shí)候,會(huì)有一定的概率用當(dāng)前的 node 替換之前的優(yōu)先級(jí)相等的 node(到目前為止的優(yōu)先級(jí)最高的 node), 其主要通過`cntOfMaxScorerand.Intn(cntOfMaxScore)來進(jìn)行實(shí)現(xiàn)。

          // selectHost()根據(jù)所有可行Node的評(píng)分找到最優(yōu)的Node
          func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
             // 沒有可行Node的評(píng)分,返回錯(cuò)誤
             if len(nodeScoreList) == 0 {
                return "", fmt.Errorf("empty priorityList")
             }
             // 在nodeScoreList中找到分?jǐn)?shù)最高的Node,初始化第0個(gè)Node分?jǐn)?shù)最高
             maxScore := nodeScoreList[0].Score
             selected := nodeScoreList[0].Name
            // 如果最高分?jǐn)?shù)相同,先統(tǒng)計(jì)數(shù)量(cntOfMaxScore)
             cntOfMaxScore := 1
             for _, ns := range nodeScoreList[1:] {
                if ns.Score > maxScore {
                   maxScore = ns.Score
                   selected = ns.Name
                   cntOfMaxScore = 1
                } else if ns.Score == maxScore {
                   // 分?jǐn)?shù)相同就累計(jì)數(shù)量
                   cntOfMaxScore++
                   if rand.Intn(cntOfMaxScore) == 0 {
                      //以1/cntOfMaxScore的概率成為最優(yōu)Node
                      selected = ns.Name
                   }
                }
             }
             return selected, nil
          }

          只有同時(shí)滿足 FilterPlugin 和 Extender 的過濾條件的 Node 才是可行 Node,調(diào)度算法優(yōu)先用 FilterPlugin 過濾,然后在用 Extender 過濾,這樣可以盡量減少傳給 Extender 的 Node 數(shù)量;調(diào)度算法為待調(diào)度的 Pod 對每個(gè)可行 Node(過濾通過)進(jìn)行評(píng)分,評(píng)分方法是:

          其中 f(x)和 g(x)是標(biāo)準(zhǔn)化分?jǐn)?shù)函數(shù),w 為權(quán)重;分?jǐn)?shù)最高的 Node 為最優(yōu)候選 Node,當(dāng)有多個(gè) Node 都為最高分?jǐn)?shù)時(shí),每個(gè) Node 有 1/n 的概率成最優(yōu) Node;調(diào)度算法并不是對調(diào)度框架和調(diào)度插件再抽象和封裝,只是對調(diào)度周期從 PreFilter 到 Score 的過程的一種抽象,其中 PostFilter 不在調(diào)度算法抽象范圍內(nèi)。因?yàn)?PostFilter 與過濾無關(guān),是用來實(shí)現(xiàn)搶占的擴(kuò)展點(diǎn);

          3.3 總結(jié)

          Scheduler 調(diào)度器,在 k8s 的整個(gè)代碼中處于一個(gè)承上啟下的作用。了解 Scheduler 在哪個(gè)過程中發(fā)揮作用,更能夠理解它的重要性。

          本文第二章,主要對于 kube-scheduler v1.21 的調(diào)度流程進(jìn)行了分析,但由于選擇的議題實(shí)在是太大,這里這對正常流程中的調(diào)度進(jìn)行源碼的解析,其中有大量的細(xì)節(jié)都暫未提及,包括搶占調(diào)度、framework、extender 等實(shí)現(xiàn)。通過源碼閱讀可以發(fā)現(xiàn),Pod 的調(diào)度是通過一個(gè)隊(duì)列 SchedulingQueue 異步工作的,隊(duì)列對 pod 時(shí)間進(jìn)行監(jiān)聽,并且進(jìn)行調(diào)度流程。單個(gè) pod 的調(diào)度主要分為 3 個(gè)步驟:

          • 1)根據(jù) Predict 和 Priority 兩個(gè)階段選擇最優(yōu)的 Node;
          • 2)為了提升效率,假設(shè) Pod 已經(jīng)被調(diào)度到對應(yīng)的 Node,保存到 cache 中;
          • 3)通過 extender 和各種插件進(jìn)行驗(yàn)證,如果通過就進(jìn)行綁定。

          在接受到命令之后,程序會(huì)現(xiàn)在scheduler.New() 初始化 scheduler 結(jié)構(gòu)體,然后通過 Run() 函數(shù)啟動(dòng)調(diào)度的主邏輯,喚醒 sched.Run()。在 sched.Run()中會(huì)一直監(jiān)聽和調(diào)度,通過隊(duì)列的方式給 pod 分配合適的 node。scheduleOne() 里面是整個(gè)分配 pod 調(diào)度過程的主要邏輯,因?yàn)槠邢蓿@里只對 sched.Algorithm.Schedule() 進(jìn)行了深入的了解。bind 和后續(xù)的操作就停留在scheduleOne()這里沒有再進(jìn)行深入。

          因篇幅有限,以及個(gè)人的興趣導(dǎo)向,在正常流程介紹完畢之后第三章對正常調(diào)度過程中的優(yōu)選和預(yù)選策略再次進(jìn)行深入的代碼閱讀。以期能夠?qū)φU{(diào)度的細(xì)節(jié)有更好的把握。如果時(shí)間可以再多些,可以更細(xì)致到對具體的調(diào)度算法進(jìn)行分析,這里因?yàn)槠邢蓿A(yù)選的部分就只介紹了根據(jù) predict 過程中的 NameNode 函數(shù)。

          - END -




          鏈接:https://juejin.cn/post/7133192540215312420

          (版權(quán)歸原作者所有,侵刪)

          瀏覽 32
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲日韩国产精品一区无码AV | 成人猫咪av| 婷婷综合网站 | 亚洲国产成人久久综合区色欲 | 欧美成人国产精品一区二区 |