<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kubernetes 源碼學(xué)習(xí)之延時隊列

          共 2481字,需瀏覽 5分鐘

           ·

          2020-09-18 14:46

          client-go 中的 workqueue,類似于 golang 語言中的 channel,主要用于并發(fā)程序之間的數(shù)據(jù)同步。Kubernetes 的控制器模型通過 client-go 的 informer watch 資源變化,當(dāng)資源發(fā)生變化時會通過回調(diào)函數(shù)將資源寫入隊列,由控制器中的消費(fèi)者完成業(yè)務(wù)處理。

          延時隊列

          client-go 中實現(xiàn)了多種隊列,包括通用隊列、延時隊列、限速隊列,本文首先介紹延時隊列的實現(xiàn)。延時隊列是在通用隊列基礎(chǔ)上進(jìn)行擴(kuò)展的,因為它本質(zhì)還是一個隊列,只是加了一個新的函數(shù)來進(jìn)行延遲,對應(yīng)的接口定義如下所示:

          //?k8s.io/client-go/util/workqueue/queue.go

          //?通用隊列接口定義
          type?Interface?interface?{
          ?Add(item?interface{})??//?向隊列中添加一個元素
          ?Len()?int??//?獲取隊列長度
          ?Get()?(item?interface{},?shutdown?bool)??//?獲取隊列頭部的元素,第二個返回值表示隊列是否已經(jīng)關(guān)閉
          ?Done(item?interface{})??//?標(biāo)記隊列中元素已經(jīng)處理完
          ?ShutDown()??//?關(guān)閉隊列
          ?ShuttingDown()?bool??//?查詢隊列是否正在關(guān)閉
          }

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?DelayingInterface?是一個延時隊列,可以在以后的時間來添加元素的接口
          //?這使得它更容易在處理失敗后重新入隊列,而不至于陷入?hot-loop
          type?DelayingInterface?interface?{
          ??//?擴(kuò)展通用隊列
          ?Interface
          ?//?在指定的時間后將元素添加到工作隊列中
          ?AddAfter(item?interface{},?duration?time.Duration)
          }

          延時隊列的定義很簡單,就是增加了一個函數(shù)來實現(xiàn)元素的延遲添加而已,接下來我們繼續(xù)來查看該接口的具體實現(xiàn)方式:

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?delayingType?包裝了?Interface?通用接口,并提供了延遲重新入隊列
          type?delayingType?struct?{
          ?Interface??//?一個通用隊列

          ?//?時鐘用于跟蹤延遲觸發(fā)的時間
          ?clock?clock.Clock

          ?//?關(guān)閉信號
          ?stopCh?chan?struct{}
          ?//?用來保證只發(fā)出一次關(guān)閉信號
          ?stopOnce?sync.Once

          ?//?在觸發(fā)之前確保我們等待的時間不超過?maxWait
          ?heartbeat?clock.Ticker

          ?//?waitingForAddCh?是一個?buffered?channel,提供了一個緩沖通道
          ??//?延遲添加的元素封裝成?waitFor?放到?channel?中
          ?waitingForAddCh?chan?*waitFor

          ?//?記錄重試的次數(shù)
          ?metrics?retryMetrics
          }

          //?waitFor?持有要添加的數(shù)據(jù)和應(yīng)該添加的時間
          type?waitFor?struct?{
          ?data????t??//?添加的元素數(shù)據(jù)
          ?readyAt?time.Time??//?在什么時間添加到隊列中
          ?index?int??//?優(yōu)先級隊列(heap)中的索引
          }

          在延時隊列的實現(xiàn) delayingType 結(jié)構(gòu)體中包含一個通用隊列 Interface 的實現(xiàn),然后最重要的一個屬性就是?waitingForAddCh,這是一個 buffered channel,將延遲添加的元素封裝成?waitFor?放到通道中,意思就是當(dāng)?shù)搅酥付ǖ臅r間后就將元素添加到通用隊列中去進(jìn)行處理,還沒有到時間的話就放到這個緩沖通道中。要了解是如何實現(xiàn)延時隊列的我們還需要了解另外一個數(shù)據(jù)結(jié)構(gòu),那就是?waitForPriorityQueue

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?waitForPriorityQueue?為?waitFor?的元素集合實現(xiàn)了一個優(yōu)先級隊列
          //?把需要延遲的元素放到一個隊列中,然后在隊列中按照元素的延時添加時間(readyAt)從小到大排序
          //?其實這個優(yōu)先級隊列就是實現(xiàn)的?golang?中內(nèi)置的?container/heap/heap.go?中的?Interface?接口
          //?最終實現(xiàn)的隊列就是?waitForPriorityQueue?這個集合是有序的,按照時間從小到大進(jìn)行排列
          type?waitForPriorityQueue?[]*waitFor

          waitForPriorityQueue?是一個有序的 waitFor 的集合,按照添加的時間從小到大進(jìn)行排列,這就形成了一個優(yōu)先隊列

          優(yōu)先隊列

          其實這個優(yōu)先隊列是 golang 中內(nèi)置的?container/heap/heap.go?文件中的 Interface 接口(常說的數(shù)據(jù)結(jié)構(gòu)堆)的一個實現(xiàn),我們要想實現(xiàn)自己的隊列也完全可以去實現(xiàn)這個接口即可:

          //?$GOROOT/src/container/heap/heap.go

          //?堆接口定義
          //?注意:這個接口中的 Push 和 Pop 函數(shù)是給 heap 包的實現(xiàn)調(diào)用的
          //?任何實現(xiàn)了本接口的類型都可以用于構(gòu)建小頂堆
          //?小頂堆可以通過?heap.Init?建立,數(shù)據(jù)是遞增順序或者空的話也是最小堆
          //?小頂堆的約束條件是:
          //?!h.Less(j,?i)?for?0?<=?i?

          //?要從堆中添加和刪除元素,可以直接使用 heap.Push 和 heap.Pop 函數(shù)。
          type?Interface?interface?{
          ?sort.Interface??//?擴(kuò)展排序接口
          ?Push(x?interface{})?
          ?Pop()?interface{}???
          }

          //?Init?初始化一個堆
          //?使用任何堆操作之前應(yīng)先初始化,Init?函數(shù)對于堆的約束性是冪等的,并可能在任何時候堆的約束性被破壞時被調(diào)用
          //?本函數(shù)復(fù)雜度為O(n),其中n等于h.Len()。
          func?Init(h?Interface)?{
          ?//?構(gòu)建堆
          ?n?:=?h.Len()
          ?for?i?:=?n/2?-?1;?i?>=?0;?i--?{
          ??down(h,?i,?n)
          ?}
          }

          //?Push?將元素?x?添加到堆上
          //?復(fù)雜度為?O(log?n),其中?n?=?h.Len()
          func?Push(h?Interface,?x?interface{})?{
          ?h.Push(x)
          ??//?要保證堆的結(jié)構(gòu),所以添加進(jìn)來的元素要重新調(diào)整
          ??//?元素添加到最后,然后不斷上浮,因為要滿足任一節(jié)點(diǎn)的值要小于左右子樹的值
          ?up(h,?h.Len()-1)
          }

          //?將元素?j?重新排到正確的位置
          func?up(h?Interface,?j?int)?{
          ?for?{
          ??i?:=?(j?-?1)?/?2?//?父節(jié)點(diǎn)的索引
          ????//?如果?j?就是父節(jié)點(diǎn)或者?j?>=?i(父節(jié)點(diǎn)元素?
          ??if?i?==?j?||?!h.Less(j,?i)?{
          ???break
          ??}
          ????//?父節(jié)點(diǎn)元素?>?j?的元素,就交換二者
          ??h.Swap(i,?j)
          ??j?=?i
          ?}
          }

          //?Pop?從堆中移除并返回最小元素(根據(jù)?Less)
          //?復(fù)雜度為?O(log?n),其中?n?=?h.Len()
          //?Pop?相當(dāng)于?Remove(h,?0)
          func?Pop(h?Interface)?interface{}?{
          ?n?:=?h.Len()?-?1
          ??//?將最后一個元素填充到堆頂,Pop?是彈出堆頂?shù)脑?/span>
          ?h.Swap(0,?n)
          ??//?然后不斷的下沉這個元素
          ?down(h,?0,?n)
          ??//?調(diào)用外部的實現(xiàn)者,h.Pop()?實現(xiàn)中會刪除最后一個元素
          ?return?h.Pop()
          }

          func?down(h?Interface,?i0,?n?int)?bool?{
          ?i?:=?i0??//?父節(jié)點(diǎn)索引
          ?for?{
          ??j1?:=?2*i?+?1??//?左子節(jié)點(diǎn)的索引
          ??if?j1?>=?n?||?j1?0
          ?{?//?下沉到最后一個節(jié)點(diǎn)就不處理了
          ???break
          ??}
          ??j?:=?j1??//?左子節(jié)點(diǎn)索引
          ??if?j2?:=?j1?+?1;?j2????j?=?j2?//?= 2*i + 2 ?//?右子節(jié)點(diǎn)索引
          ??}
          ????//?子節(jié)點(diǎn)?>=?父節(jié)點(diǎn)索引,那就不用處理了
          ??if?!h.Less(j,?i)?{
          ???break
          ??}
          ????//?子節(jié)點(diǎn)?
          ??h.Swap(i,?j)
          ??i?=?j
          ?}
          ?return?i?>?i0
          }

          //?重新調(diào)整結(jié)構(gòu)
          func?Fix(h?Interface,?i?int)?{
          ?if?!down(h,?i,?h.Len())?{
          ??up(h,?i)
          ?}
          }

          //?$GOROOT/src/sort/sort.go

          //?排序的接口定義
          type?Interface?interface?{
          ?//?集合元素大小
          ?Len()?int
          ?//?比較索引?i?和?j?位置的元素大小
          ?Less(i,?j?int)?bool
          ?//?交換索引?i?和?j?位置的元素
          ?Swap(i,?j?int)
          }

          堆是一種經(jīng)過排序的完全二叉樹,其中任一非終端節(jié)點(diǎn)的數(shù)據(jù)值均不大于(或不小于)其左孩子和右孩子節(jié)點(diǎn)的值。golang 中內(nèi)置的堆是小頂堆(最小堆),任一節(jié)點(diǎn)的值是其子樹所有結(jié)點(diǎn)的最小值:

          堆又被稱為優(yōu)先隊列,盡管名為優(yōu)先隊列,但堆并不是隊列。因為隊列中允許的操作是先進(jìn)先出(FIFO),在隊尾插入元素,在隊頭取出元素。而堆雖然在堆底插入元素,在堆頂取出元素,但是堆中元素的排列不是按照到來的先后順序,而是按照一定的優(yōu)先順序排列的。

          下圖是插入一個元素的示意圖:

          下圖是從堆中刪除一個元素的示意圖:

          延時隊列實現(xiàn)

          接下來我們來看下?waitForPriorityQueue?是如何實現(xiàn)這個優(yōu)先隊列的:

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?獲取隊列長度,pq?就是一個?waitFor?集合,直接返回長度即可
          func?(pq?waitForPriorityQueue)?Len()?int?{
          ?return?len(pq)
          }

          //?判斷索引?i?和?j?上的元素大小
          func?(pq?waitForPriorityQueue)?Less(i,?j?int)?bool?{
          ??//?根據(jù)時間先后順序來決定先后順序
          ??//?i?位置的元素時間在?j?之前,則證明索引?i?的元素小于索引?j?的元素
          ?return?pq[i].readyAt.Before(pq[j].readyAt)
          }

          //?交換索引?i?和?j?的元素
          func?(pq?waitForPriorityQueue)?Swap(i,?j?int)?{
          ??//?交換元素
          ?pq[i],?pq[j]?=?pq[j],?pq[i]
          ??//?更新元素里面的索引信息
          ?pq[i].index?=?i
          ?pq[j].index?=?j
          }

          //?添加元素到隊列中
          //?要注意不應(yīng)該直接調(diào)用?Push?函數(shù),而應(yīng)該使用?`heap.Push`
          func?(pq?*waitForPriorityQueue)?Push(x?interface{})?{
          ?n?:=?len(*pq)
          ?item?:=?x.(*waitFor)
          ?item.index?=?n
          ?*pq?=?append(*pq,?item)
          }

          //?從隊列中彈出最后一個元素
          //?要注意不應(yīng)該直接調(diào)用?Pop?函數(shù),而應(yīng)該使用?`heap.Pop`
          func?(pq?*waitForPriorityQueue)?Pop()?interface{}?{
          ?n?:=?len(*pq)
          ?item?:=?(*pq)[n-1]
          ?item.index?=?-1
          ?*pq?=?(*pq)[0:(n?-?1)]
          ?return?item
          }

          //?直接獲取隊列開頭的元素,不會刪除元素或改變隊列
          func?(pq?waitForPriorityQueue)?Peek()?interface{}?{
          ?return?pq[0]
          }

          上面就是 waitForPriorityQueue 這個優(yōu)先隊列的實現(xiàn),接下來我們就來分析延時隊列的具體實現(xiàn)了,因為延時隊列集成通用隊列,所以這里只對新增的函數(shù)做說明:

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?在指定的延遲時間之后將元素?item?添加到隊列中
          func?(q?*delayingType)?AddAfter(item?interface{},?duration?time.Duration)?{
          ?//?如果隊列關(guān)閉了則直接退出
          ?if?q.ShuttingDown()?{
          ??return
          ?}

          ?q.metrics.retry()

          ?//?如果延遲的時間<=0,則相當(dāng)于通用隊列一樣添加元素
          ?if?duration?<=?0?{
          ??q.Add(item)
          ??return
          ?}
          ?
          ??//?select?沒有?default?case,所以可能會被阻塞
          ?select?{
          ??//?如果調(diào)用了?ShutDown()?則解除阻塞
          ?case?<-q.stopCh:
          ??//?把元素封裝成?waitFor?傳給?waitingForAddCh
          ?case?q.waitingForAddCh?<-?&waitFor{data:?item,?readyAt:?q.clock.Now().Add(duration)}:
          ?}
          }

          AddAfter 的函數(shù)實現(xiàn)比較簡單,就是把元素和添加的時間封裝成一個 waitFor 對象,然后發(fā)送給 ?waitingForAddCh?通道,所以具體怎么添加的元素需要查看如何從這個通道消費(fèi)數(shù)據(jù)的地方,也就是 waitingLoop 函數(shù),這個函數(shù)在實例化?DelayingInterface?后就用一個單獨(dú)的協(xié)程啟動了:

          //?k8s.io/client-go/util/workqueue/delaying_queue.go

          //?waitingLoop?一直運(yùn)行直到工作隊列關(guān)閉為止
          //?并對要添加的元素列表進(jìn)行檢查
          func?(q?*delayingType)?waitingLoop()?{
          ?defer?utilruntime.HandleCrash()

          ?//?創(chuàng)建一個占位符通道,當(dāng)列表中沒有元素的時候利用這個變量實現(xiàn)長時間等待
          ?never?:=?make(<-chan?time.Time)

          ?//?構(gòu)造一個定時器,當(dāng)?shù)却犃蓄^部的元素準(zhǔn)備好時,該定時器就會失效
          ?var?nextReadyAtTimer?clock.Timer
          ?
          ??//?構(gòu)造一個優(yōu)先級隊列
          ?waitingForQueue?:=?&waitForPriorityQueue{}
          ??//?構(gòu)造小頂堆結(jié)構(gòu)
          ?heap.Init(waitingForQueue)
          ?
          ??//?用來避免元素重復(fù)添加,如果重復(fù)添加了就只更新時間
          ?waitingEntryByData?:=?map[t]*waitFor{}
          ??
          ??//?死循環(huán)
          ?for?{
          ????//?隊列如果關(guān)閉了,則直接退出
          ??if?q.Interface.ShuttingDown()?{
          ???return
          ??}
          ????
          ????//?獲取當(dāng)前時間
          ??now?:=?q.clock.Now()

          ??//?如果優(yōu)先隊列中有元素的話
          ??for?waitingForQueue.Len()?>?0?{
          ??????//?獲取第一個元素
          ???entry?:=?waitingForQueue.Peek().(*waitFor)
          ??????//?如果第一個元素指定的時間還沒到時間,則跳出循環(huán)
          ??????//?因為第一個元素是時間最小的
          ???if?entry.readyAt.After(now)?{
          ????break
          ???}

          ??????//?時間已經(jīng)過了,那就把它從優(yōu)先隊列中拿出來放入通用隊列中
          ??????//?同時要把元素從上面提到的?map?中刪除,因為不用再判斷重復(fù)添加了
          ???entry?=?heap.Pop(waitingForQueue).(*waitFor)
          ???q.Add(entry.data)
          ???delete(waitingEntryByData,?entry.data)
          ??}

          ??nextReadyAt?:=?never
          ????//?如果優(yōu)先隊列中還有元素,那就用第一個元素指定的時間減去當(dāng)前時間作為等待時間
          ????//?因為優(yōu)先隊列是用時間排序的,后面的元素需要等待的時間更長,所以先處理排序靠前面的元素
          ??if?waitingForQueue.Len()?>?0?{
          ???if?nextReadyAtTimer?!=?nil?{
          ????nextReadyAtTimer.Stop()
          ???}
          ??????//?獲取第一個元素
          ???entry?:=?waitingForQueue.Peek().(*waitFor)
          ??????//?第一個元素的時間減去當(dāng)前時間作為等待時間
          ???nextReadyAtTimer?=?q.clock.NewTimer(entry.readyAt.Sub(now))
          ???nextReadyAt?=?nextReadyAtTimer.C()
          ??}

          ??select?{
          ????//?退出信號
          ??case?<-q.stopCh:
          ???return

          ????//?定時器,每過一段時間沒有任何數(shù)據(jù),那就再執(zhí)行一次大循環(huán)
          ??case?<-q.heartbeat.C():

          ????//?上面的等待時間信號,時間到了就有信號
          ????//?激活這個case,然后繼續(xù)循環(huán),添加準(zhǔn)備好了的元素
          ??case?<-nextReadyAt:
          ????
          ????//?AddAfter?函數(shù)中放入到通道中的元素,這里從通道中獲取數(shù)據(jù)
          ??case?waitEntry?:=?<-q.waitingForAddCh:
          ??????//?如果時間已經(jīng)過了就直接放入通用隊列,沒過就插入到有序隊列
          ???if?waitEntry.readyAt.After(q.clock.Now())?{
          ????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
          ???}?else?{
          ????????//?放入通用隊列
          ????q.Add(waitEntry.data)
          ???}
          ??????//?下面就是把channel里面的元素全部取出來
          ??????//?如果沒有數(shù)據(jù)了就直接退出
          ???drained?:=?false
          ???for?!drained?{
          ????select?{
          ????case?waitEntry?:=?<-q.waitingForAddCh:
          ?????if?waitEntry.readyAt.After(q.clock.Now())?{
          ??????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
          ?????}?else?{
          ??????q.Add(waitEntry.data)
          ?????}
          ????default:
          ?????drained?=?true
          ????}
          ???}
          ??}
          ?}
          }

          //?插入元素到有序隊列,如果已經(jīng)存在了則更新時間
          func?insert(q?*waitForPriorityQueue,?knownEntries?map[t]*waitFor,?entry?*waitFor)?{
          ?//?查看元素是否已經(jīng)存在
          ?existing,?exists?:=?knownEntries[entry.data]
          ?if?exists?{
          ????//?元素存在,就比較誰的時間靠后就用誰的時間
          ??if?existing.readyAt.After(entry.readyAt)?{
          ???existing.readyAt?=?entry.readyAt
          ??????//?時間變了需要重新調(diào)整優(yōu)先級隊列
          ???heap.Fix(q,?existing.index)
          ??}
          ??return
          ?}
          ??
          ??//?把元素放入有序隊列中
          ?heap.Push(q,?entry)
          ??//?并記錄在上面的?map?里面,用于判斷是否存在
          ?knownEntries[entry.data]?=?entry
          }

          到這里延時隊列核心代碼就分析完了,其實實現(xiàn)的原理很簡單,既然是延時隊列那肯定就有元素執(zhí)行的時間,根據(jù)這個時間的先后順序來構(gòu)造一個優(yōu)先級隊列,時間到了的話就把這個元素放到通用隊列中去進(jìn)行正常的處理就行。

          所以核心重點(diǎn)就是優(yōu)先隊列的實現(xiàn),而這里使用的優(yōu)先隊列是 golang 內(nèi)置的 heap 接口實現(xiàn),所以歸根結(jié)底底層都是數(shù)據(jù)結(jié)構(gòu)與算法的運(yùn)用




          K8S進(jìn)階訓(xùn)練營,點(diǎn)擊下方圖片了解詳情


          瀏覽 75
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产69久久久 | 久久偷拍熟女91 | 逼特逼视频在线免费观看 | 樱桃视频一区二区三区 | 激情五月综合网 |