<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          調(diào)度器調(diào)度隊列之 activeQ 分析 | 視頻文字稿

          共 17242字,需瀏覽 35分鐘

           ·

          2021-03-15 20:10

          前面我們分析了 kube-scheduler 組件如何接收命令行參數(shù),用傳遞的參數(shù)構(gòu)造一個 Scheduler 對象,最終啟動了調(diào)度器。調(diào)度器啟動后就可以開始為未調(diào)度的 Pod 進行調(diào)度操作了,本文主要來分析調(diào)度器是如何對一個 Pod 進行調(diào)度操作過程中的活動隊列。

          調(diào)度隊列

          調(diào)度器啟動后最終是調(diào)用 Scheduler 下面的 Run() 函數(shù)來開始調(diào)度 Pod,如下所示代碼:

          // pkg/scheduler/scheduler.go

          // 等待 cache 同步完成,然后開始調(diào)度
          func (sched *Scheduler) Run(ctx context.Context) {
           if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
            return
           }
           sched.SchedulingQueue.Run()
           wait.UntilWithContext(ctx, sched.scheduleOne, 0)
           sched.SchedulingQueue.Close()
          }

          首先會等待所有的 cache 同步完成,然后開始執(zhí)行 SchedulingQueue 的 Run() 函數(shù),SchedulingQueue 是一個隊列接口,用于存儲待調(diào)度的 Pod,該接口遵循類似于 cache.FIFOcache.Heap 這樣的數(shù)據(jù)結(jié)構(gòu),要弄明白調(diào)度器是如何去調(diào)度 Pod 的,我們就首先需要弄清楚這個結(jié)構(gòu):

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // 用于存儲帶調(diào)度 Pod 的隊列接口
          type SchedulingQueue interface {
           framework.PodNominator
           // AddUnschedulableIfNotPresent 將無法調(diào)度的 Pod 添加回調(diào)度隊列
            // podSchedulingCycle表示可以通過調(diào)用 SchedulingCycle() 返回的當(dāng)前調(diào)度周期號
           AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
            // SchedulingCycle 返回由調(diào)度隊列緩存的當(dāng)前調(diào)度周期數(shù)。 
            // 通常,只要彈出一個 Pod(例如調(diào)用 Pop() 函數(shù)),就增加此數(shù)字。
           SchedulingCycle() int64
            
            // 下面是通用隊列相關(guān)操作
            // Pop 刪除隊列的頭并返回它。 
            // 如果隊列為空,它將阻塞,并等待直到新元素添加到隊列中
           Pop() (*framework.QueuedPodInfo, error)
            // 往隊列中添加一個 Pod
           Add(pod *v1.Pod) error
           Update(oldPod, newPod *v1.Pod) error
           Delete(pod *v1.Pod) error

           MoveAllToActiveOrBackoffQueue(event string)
           AssignedPodAdded(pod *v1.Pod)
           AssignedPodUpdated(pod *v1.Pod)
           PendingPods() []*v1.Pod
            // 關(guān)閉 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
           Close()
           // NumUnschedulablePods 返回 SchedulingQueue 中存在的不可調(diào)度 Pod 的數(shù)量
           NumUnschedulablePods() int
           // 啟動管理隊列的goroutine
           Run()
          }

          SchedulingQueue 是一個用于存儲帶調(diào)度 Pod 的隊列接口,在構(gòu)造 Scheduler 對象的時候我們可以了解到調(diào)度器中是如何實現(xiàn)這個隊列接口的:

          // pkg/scheduler/factory.go

          // Profiles are required to have equivalent queue sort plugins.
          lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
          podQueue := internalqueue.NewSchedulingQueue(
           lessFn,
           internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
           internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
           internalqueue.WithPodNominator(nominator),
          )
          ......
          return &Scheduler{
           ......
           NextPod:         internalqueue.MakeNextPodFunc(podQueue),
            ......
           SchedulingQueue: podQueue,
          }, nil

          可以看到上面的 internalqueue.NewSchedulingQueue 就是創(chuàng)建的一個 SchedulingQueue 對象,定義如下所示:

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // 初始化一個優(yōu)先級隊列作為一個新的調(diào)度隊列
          func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
           return NewPriorityQueue(lessFn, opts...)
          }

          // 配置 PriorityQueue
          type Option func(*priorityQueueOptions)

          // 創(chuàng)建一個 PriorityQueue 對象
          func NewPriorityQueue(
           lessFn framework.LessFunc,
           opts ...Option,
          )
           *PriorityQueue
           {
            ......

            comp := func(podInfo1, podInfo2 interface{}) bool {
            pInfo1 := podInfo1.(*framework.QueuedPodInfo)
            pInfo2 := podInfo2.(*framework.QueuedPodInfo)
            return lessFn(pInfo1, pInfo2)
           }
            ......

            pq := &PriorityQueue{
            PodNominator:              options.podNominator,
            clock:                     options.clock,
            stop:                      make(chan struct{}),
            podInitialBackoffDuration: options.podInitialBackoffDuration,
            podMaxBackoffDuration:     options.podMaxBackoffDuration,
            activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
            unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
            moveRequestCycle:          -1,
           }
           pq.cond.L = &pq.lock
           pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

           return pq
          }

          從上面的初始化過程可以看出來 PriorityQueue 這個優(yōu)先級隊列實現(xiàn)了 SchedulingQueue 接口,所以真正的實現(xiàn)還需要去查看這個優(yōu)先級隊列:

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // PriorityQueue 實現(xiàn)了調(diào)度隊列 SchedulingQueue
          // PriorityQueue 的頭部元素是優(yōu)先級最高的 pending Pod,該結(jié)構(gòu)有三個子隊列:
          // 一個子隊列包含正在考慮進行調(diào)度的 Pod,稱為 activeQ,是一個堆
          // 另一個隊列包含已嘗試并且確定為不可調(diào)度的 Pod,稱為 unschedulableQ
          // 第三個隊列包含從 unschedulableQ 隊列移出的 Pod,并在 backoff 完成后將其移到 activeQ 隊列
          type PriorityQueue struct {
           framework.PodNominator

           stop  chan struct{}
           clock util.Clock

           // pod 初始 backoff 的時間
           podInitialBackoffDuration time.Duration
           // pod 最大 backoff 的時間
           podMaxBackoffDuration time.Duration

           lock sync.RWMutex
           cond sync.Cond  // condition

            // activeQ 是調(diào)度程序主動查看以查找要調(diào)度 pod 的堆結(jié)構(gòu),堆頭部是優(yōu)先級最高的 Pod
           activeQ *heap.Heap
            // backoff 隊列
           podBackoffQ *heap.Heap
           // unschedulableQ 不可調(diào)度隊列
           unschedulableQ *UnschedulablePodsMap
            // 調(diào)度周期的遞增序號,當(dāng) pop 的時候會增加
           schedulingCycle int64
            // moveRequestCycle 會緩存 schedulingCycle 的值
            // 當(dāng)未調(diào)度的 Pod 重新被添加到 activeQ 中會保存 schedulingCycle 到 moveRequestCycle 中
           moveRequestCycle int64

           // 表明隊列已經(jīng)被關(guān)閉
           closed bool
          }

          這里使用的是一個 PriorityQueue 優(yōu)先級隊列來存儲帶調(diào)度的 Pod,這個也很好理解,普通隊列是一個 FIFO 數(shù)據(jù)結(jié)構(gòu),根據(jù)元素進入隊列的順序依次出隊,而對于調(diào)度的這個場景,優(yōu)先級隊列顯然更合適,可以根據(jù)某些優(yōu)先級策略,優(yōu)先對某個 Pod 進行調(diào)度。

          PriorityQueue 的頭部元素是優(yōu)先級最高的帶調(diào)度的 Pod,該結(jié)構(gòu)有三個子隊列:

          • 活動隊列(activeQ)
          • 不可調(diào)度隊列(unschedulableQ):當(dāng) Pod 不能滿足被調(diào)度的條件的時候就會被加入到這個不可調(diào)度的隊列中來,等待后續(xù)繼續(xù)進行嘗試調(diào)度
          • backoff 隊列:如果任務(wù)反復(fù)執(zhí)行還是失敗,則會按嘗試次數(shù)增加等待調(diào)度時間,降低重試效率,從而避免反復(fù)失敗浪費調(diào)度資源。對于調(diào)度失敗的 Pod 會優(yōu)先存儲在 backoff 隊列中,等待后續(xù)進行重試

          這里我們需要來弄清楚這幾個隊列是如何實現(xiàn)的。

          活動隊列

          活動隊列(activeQ)是存儲當(dāng)前系統(tǒng)中所有在等待調(diào)度的 Pod 隊列,在上面實例化優(yōu)先級隊列里面可以看到 activeQ 隊列的初始化是通過調(diào)用 heap.NewWithRecorder() 函數(shù)實現(xiàn)的。

          // pkg/scheduler/internal/heap/heap.go

          // NewWithRecorder 就是 Heap 基礎(chǔ)上包裝了 metrics 數(shù)據(jù)
          func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
           return &Heap{
            data: &data{
             items:    map[string]*heapItem{},
             queue:    []string{},
             keyFunc:  keyFn,
             lessFunc: lessFn,
            },
            metricRecorder: metricRecorder,
           }
          }

          // lessFunc 接收兩個元素,對列表進行排序時,將第一個元素放在第二個元素之前,則返回true。
          type lessFunc = func(item1, item2 interface{}) bool

          其中的 data 數(shù)據(jù)結(jié)構(gòu)是 Golang 中的一個標(biāo)準 heap 堆(只需要實現(xiàn) heap.Interface 接口即可),然后 Heap 是在 data 基礎(chǔ)上新增了一個用于記錄 metrics 數(shù)據(jù)的堆,這里最重要的就是用比較元素優(yōu)先級的 lessFunc 函數(shù)的實現(xiàn),在初始化優(yōu)先級隊列的時候我們傳入了一個 comp 的參數(shù),這個參數(shù)就是 activeQ 這個堆里面的 lessFunc 函數(shù)的實現(xiàn):

          comp := func(podInfo1, podInfo2 interface{}) bool {
            pInfo1 := podInfo1.(*framework.QueuedPodInfo)
            pInfo2 := podInfo2.(*framework.QueuedPodInfo)
            return lessFn(pInfo1, pInfo2)
           }

          最終是調(diào)用的創(chuàng)建 Scheduler 對象的時候傳入的 lessFn 參數(shù):

          lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()

          從這里可以看到比較元素優(yōu)先級是通過調(diào)度框架的 QueueSortFunc() 函數(shù)來實現(xiàn)的,對應(yīng)的實現(xiàn)如下所示:

          // pkg/scheduler/framework/runtime/framework.go

          // QueueSortFunc 返回用于對調(diào)度隊列中的 Pod 進行排序的函數(shù)
          func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
           if f == nil {
            // 如果 frameworkImpl 為nil,則只需保持其順序不變
            // NOTE: 主要用于測試
            return func(_, _ *framework.QueuedPodInfo) bool { return false }
           }
            // 如果沒有 queuesort 插件
           if len(f.queueSortPlugins) == 0 {
            panic("No QueueSort plugin is registered in the frameworkImpl.")
           }

           // 只有一個 QueueSort 插件有效
           return f.queueSortPlugins[0].Less
          }

          最終真正用于優(yōu)先級隊列元素優(yōu)先級比較的函數(shù)是通過 QueueSort 插件來實現(xiàn)的,默認啟用的 QueueSort 插件是 PrioritySort:

          // pkg/scheduler/algorithmprovider/registry.go

          func getDefaultConfig() *schedulerapi.Plugins {
           return &schedulerapi.Plugins{
            QueueSort: &schedulerapi.PluginSet{
             Enabled: []schedulerapi.Plugin{
              {Name: queuesort.Name},
             },
            },
              ......

          PrioritySort 這個插件的核心實現(xiàn)就是其 Less 函數(shù)了:

          // pkg/scheduler/framework/plugins/queuesort/priority_sort.go

          // Less 是 activeQ 隊列用于對 Pod 進行排序的函數(shù)。
          // 它根據(jù) Pod 的優(yōu)先級對 Pod 進行排序,
          // 當(dāng)優(yōu)先級相同時,它使用 PodQueueInfo.timestamp 進行比較
          func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
           p1 := pod.GetPodPriority(pInfo1.Pod)
           p2 := pod.GetPodPriority(pInfo2.Pod)
            // 先根據(jù)優(yōu)先級的高低進行比較,然后根據(jù) Pod 的創(chuàng)建時間
            // 越高優(yōu)先級的 Pod 越被優(yōu)先調(diào)度,越早創(chuàng)建的pod越優(yōu)先
           return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
          }

          // pkg/api/v1/pod/util.go

          // GetPodPriority 獲取指定 Pod 的優(yōu)先級
          func GetPodPriority(pod *v1.Pod) int32 {
           if pod.Spec.Priority != nil {
            return *pod.Spec.Priority
           }
           return 0
          }

          到這里就真相大白了,對于 activeQ 活動隊列中的 Pod 是依靠 PrioritySort 插件來進行優(yōu)先級比較的,每個 Pod 在被創(chuàng)建后都會有一個 priority 屬性來標(biāo)記 Pod 優(yōu)先級,然后在調(diào)度 Pod 的時候會先根據(jù) Pod 優(yōu)先級的高低進行比較,如果優(yōu)先級相同,則回根據(jù) Pod 的創(chuàng)建時間進行比較,越高優(yōu)先級的 Pod 越被優(yōu)先調(diào)度,越早創(chuàng)建的Pod 越優(yōu)先。

          那么 Pod 是在什么時候加入到 activeQ 活動隊列的呢?還記得前面我們在創(chuàng)建 Scheduler 對象的時候有一個 addAllEventHandlers 函數(shù)嗎?其中就有對未調(diào)度 Pod 的事件監(jiān)聽處理操作。

          // pkg/scheduler/eventhandlers.go

          // unscheduled pod queue
          podInformer.Informer().AddEventHandler(
           cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
             switch t := obj.(type) {
             case *v1.Pod:
              return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
             case cache.DeletedFinalStateUnknown:
              if pod, ok := t.Obj.(*v1.Pod); ok {
               return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
              }
              utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
              return false
             default:
              utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
              return false
             }
            },
            Handler: cache.ResourceEventHandlerFuncs{
             AddFunc:    sched.addPodToSchedulingQueue,
             UpdateFunc: sched.updatePodInSchedulingQueue,
             DeleteFunc: sched.deletePodFromSchedulingQueue,
            },
           },
          )

          當(dāng) Pod 有事件變化后,首先回通過 FilterFunc 函數(shù)進行過濾,如果 Pod 沒有綁定到節(jié)點(未調(diào)度)并且使用的是指定的調(diào)度器才進入下面的 Handler 進行處理,比如當(dāng)創(chuàng)建 Pod 以后就會有 onAdd 的添加事件,這里調(diào)用的就是 sched.addPodToSchedulingQueue 函數(shù):

          // pkg/scheduler/eventhandlers.go

          // 添加未調(diào)度的 Pod 到優(yōu)先級隊列
          func(sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
             pod := obj.(*v1.Pod)
             klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
             if err := sched.SchedulingQueue.Add(pod); err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
             }
          }

          可以看到這里當(dāng) Pod 被創(chuàng)建后,會將 Pod 通過調(diào)度隊列 SchedulingQueue 的 Add 函數(shù)添加到優(yōu)先級隊列中去:

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // Add 添加 Pod 到 activeQ 活動隊列,僅當(dāng)添加了新的 Pod 時才應(yīng)調(diào)用它
          // 這樣 Pod 就不會已經(jīng)處于 active/unschedulable/backoff 隊列中了
          func (p *PriorityQueue) Add(pod *v1.Pod) error {
           p.lock.Lock()
           defer p.lock.Unlock()
           pInfo := p.newQueuedPodInfo(pod)
            // 添加到 activeQ 隊列
           if err := p.activeQ.Add(pInfo); err != nil {
            klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
            return err
           }
            // 如果在 unschedulableQ 隊列中,則從改隊列移除
           if p.unschedulableQ.get(pod) != nil {
            klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
            p.unschedulableQ.delete(pod)
           }
           // 從 backoff 隊列刪除
           if err := p.podBackoffQ.Delete(pInfo); err == nil {
            klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
           }
            // 記錄metrics
           metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
           p.PodNominator.AddNominatedPod(pod, "")
            // 通知其他地方進行處理
           p.cond.Broadcast()

           return nil
          }

          這就是 activeQ 活動隊列添加元素的過程。

          調(diào)度 Pod

          當(dāng)我們把新創(chuàng)建的 Pod 添加到 activeQ 活動隊列過后,就可以在另外的協(xié)程中從這個隊列中彈出堆頂?shù)脑貋磉M行具體的調(diào)度處理了。這里就要回頭本文開頭部分調(diào)度器啟動后執(zhí)行的一個調(diào)度操作了 sched.scheduleOne

          // pkg/scheduler/scheduler.go

          // scheduleOne 為單個 Pod 完成整個調(diào)度工作流程
          func (sched *Scheduler) scheduleOne(ctx context.Context) {
            // 從調(diào)度器中獲取下一個要調(diào)度的 Pod
           podInfo := sched.NextPod()
           ......
          }

          scheduleOne 函數(shù)在最開始調(diào)用 sched.NextPod() 函數(shù)來獲取現(xiàn)在需要調(diào)度的 Pod,其實就是上面 activeQ 活動隊列中 Pop 出來的元素,當(dāng)實例化 Scheduler 對象的時候就指定了 NextPod 函數(shù):internalqueue.MakeNextPodFunc(podQueue)

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // MakeNextPodFunc 返回一個函數(shù),用于從指定的調(diào)度隊列中獲取下一個 Pod 進行調(diào)度
          func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
           return func() *framework.QueuedPodInfo {
            podInfo, err := queue.Pop()
            ......
            return nil
           }
          }

          很明顯這里就是調(diào)用的優(yōu)先級隊列的 Pop() 函數(shù)來彈出隊列中的 Pod 進行調(diào)度處理。

          // pkg/scheduler/internal/queue/scheduling_queue.go

          // Pop 刪除 activeQ 活動隊列的頭部元素并返回它。
          // 如果 activeQ 為空,它將阻塞,并等待直到新元素添加到隊列中。
          // 當(dāng) Pod 彈出后會增加調(diào)度周期參數(shù)的值。
          func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
           p.lock.Lock()
           defer p.lock.Unlock()
           for p.activeQ.Len() == 0 {
              // 當(dāng)隊列為空時,將阻塞Pop()的調(diào)用,直到新元素入隊。
              // 調(diào)用Close()時,將設(shè)置p.closed并廣播condition,這將導(dǎo)致此循環(huán)繼續(xù)并從Pop()返回。
            if p.closed {
             return nil, fmt.Errorf(queueClosed)
            }
            p.cond.Wait()
           }
            // 從 activeQ 隊列彈出堆頂元素
           obj, err := p.activeQ.Pop()
           if err != nil {
            return nil, err
           }
           pInfo := obj.(*framework.QueuedPodInfo)
           pInfo.Attempts++
            // 增加調(diào)度周期次數(shù)
           p.schedulingCycle++
           return pInfo, err
          }

          Pop() 函數(shù)很簡單,就是從 activeQ 隊列中彈出堆頂?shù)脑胤祷丶纯伞D玫搅艘{(diào)度的 Pod,接下來就是去真正執(zhí)行調(diào)度邏輯了。


           點擊屏末  | 學(xué)習(xí)k8s開發(fā)課程
          瀏覽 103
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品91久久久久久 | 91av成人在线播放 | 亚洲综合在线一区 | 超碰国产操逼 | 国产BBB |