Kubernetes 源碼學(xué)習(xí)之延時隊列

client-go 中的 workqueue,類似于 golang 語言中的 channel,主要用于并發(fā)程序之間的數(shù)據(jù)同步。Kubernetes 的控制器模型通過 client-go 的 informer watch 資源變化,當(dāng)資源發(fā)生變化時會通過回調(diào)函數(shù)將資源寫入隊列,由控制器中的消費(fèi)者完成業(yè)務(wù)處理。
延時隊列
client-go 中實現(xiàn)了多種隊列,包括通用隊列、延時隊列、限速隊列,本文首先介紹延時隊列的實現(xiàn)。延時隊列是在通用隊列基礎(chǔ)上進(jìn)行擴(kuò)展的,因為它本質(zhì)還是一個隊列,只是加了一個新的函數(shù)來進(jìn)行延遲,對應(yīng)的接口定義如下所示:
//?k8s.io/client-go/util/workqueue/queue.go
//?通用隊列接口定義
type?Interface?interface?{
?Add(item?interface{})??//?向隊列中添加一個元素
?Len()?int??//?獲取隊列長度
?Get()?(item?interface{},?shutdown?bool)??//?獲取隊列頭部的元素,第二個返回值表示隊列是否已經(jīng)關(guān)閉
?Done(item?interface{})??//?標(biāo)記隊列中元素已經(jīng)處理完
?ShutDown()??//?關(guān)閉隊列
?ShuttingDown()?bool??//?查詢隊列是否正在關(guān)閉
}
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?DelayingInterface?是一個延時隊列,可以在以后的時間來添加元素的接口
//?這使得它更容易在處理失敗后重新入隊列,而不至于陷入?hot-loop
type?DelayingInterface?interface?{
??//?擴(kuò)展通用隊列
?Interface
?//?在指定的時間后將元素添加到工作隊列中
?AddAfter(item?interface{},?duration?time.Duration)
}
延時隊列的定義很簡單,就是增加了一個函數(shù)來實現(xiàn)元素的延遲添加而已,接下來我們繼續(xù)來查看該接口的具體實現(xiàn)方式:
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?delayingType?包裝了?Interface?通用接口,并提供了延遲重新入隊列
type?delayingType?struct?{
?Interface??//?一個通用隊列
?//?時鐘用于跟蹤延遲觸發(fā)的時間
?clock?clock.Clock
?//?關(guān)閉信號
?stopCh?chan?struct{}
?//?用來保證只發(fā)出一次關(guān)閉信號
?stopOnce?sync.Once
?//?在觸發(fā)之前確保我們等待的時間不超過?maxWait
?heartbeat?clock.Ticker
?//?waitingForAddCh?是一個?buffered?channel,提供了一個緩沖通道
??//?延遲添加的元素封裝成?waitFor?放到?channel?中
?waitingForAddCh?chan?*waitFor
?//?記錄重試的次數(shù)
?metrics?retryMetrics
}
//?waitFor?持有要添加的數(shù)據(jù)和應(yīng)該添加的時間
type?waitFor?struct?{
?data????t??//?添加的元素數(shù)據(jù)
?readyAt?time.Time??//?在什么時間添加到隊列中
?index?int??//?優(yōu)先級隊列(heap)中的索引
}
在延時隊列的實現(xiàn) delayingType 結(jié)構(gòu)體中包含一個通用隊列 Interface 的實現(xiàn),然后最重要的一個屬性就是?waitingForAddCh,這是一個 buffered channel,將延遲添加的元素封裝成?waitFor?放到通道中,意思就是當(dāng)?shù)搅酥付ǖ臅r間后就將元素添加到通用隊列中去進(jìn)行處理,還沒有到時間的話就放到這個緩沖通道中。要了解是如何實現(xiàn)延時隊列的我們還需要了解另外一個數(shù)據(jù)結(jié)構(gòu),那就是?waitForPriorityQueue:
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?waitForPriorityQueue?為?waitFor?的元素集合實現(xiàn)了一個優(yōu)先級隊列
//?把需要延遲的元素放到一個隊列中,然后在隊列中按照元素的延時添加時間(readyAt)從小到大排序
//?其實這個優(yōu)先級隊列就是實現(xiàn)的?golang?中內(nèi)置的?container/heap/heap.go?中的?Interface?接口
//?最終實現(xiàn)的隊列就是?waitForPriorityQueue?這個集合是有序的,按照時間從小到大進(jìn)行排列
type?waitForPriorityQueue?[]*waitFor
waitForPriorityQueue?是一個有序的 waitFor 的集合,按照添加的時間從小到大進(jìn)行排列,這就形成了一個優(yōu)先隊列。
優(yōu)先隊列
其實這個優(yōu)先隊列是 golang 中內(nèi)置的?container/heap/heap.go?文件中的 Interface 接口(常說的數(shù)據(jù)結(jié)構(gòu)堆)的一個實現(xiàn),我們要想實現(xiàn)自己的隊列也完全可以去實現(xiàn)這個接口即可:
//?$GOROOT/src/container/heap/heap.go
//?堆接口定義
//?注意:這個接口中的 Push 和 Pop 函數(shù)是給 heap 包的實現(xiàn)調(diào)用的
//?任何實現(xiàn)了本接口的類型都可以用于構(gòu)建小頂堆
//?小頂堆可以通過?heap.Init?建立,數(shù)據(jù)是遞增順序或者空的話也是最小堆
//?小頂堆的約束條件是:
//?!h.Less(j,?i)?for?0?<=?i?
//?要從堆中添加和刪除元素,可以直接使用 heap.Push 和 heap.Pop 函數(shù)。
type?Interface?interface?{
?sort.Interface??//?擴(kuò)展排序接口
?Push(x?interface{})?
?Pop()?interface{}???
}
//?Init?初始化一個堆
//?使用任何堆操作之前應(yīng)先初始化,Init?函數(shù)對于堆的約束性是冪等的,并可能在任何時候堆的約束性被破壞時被調(diào)用
//?本函數(shù)復(fù)雜度為O(n),其中n等于h.Len()。
func?Init(h?Interface)?{
?//?構(gòu)建堆
?n?:=?h.Len()
?for?i?:=?n/2?-?1;?i?>=?0;?i--?{
??down(h,?i,?n)
?}
}
//?Push?將元素?x?添加到堆上
//?復(fù)雜度為?O(log?n),其中?n?=?h.Len()
func?Push(h?Interface,?x?interface{})?{
?h.Push(x)
??//?要保證堆的結(jié)構(gòu),所以添加進(jìn)來的元素要重新調(diào)整
??//?元素添加到最后,然后不斷上浮,因為要滿足任一節(jié)點(diǎn)的值要小于左右子樹的值
?up(h,?h.Len()-1)
}
//?將元素?j?重新排到正確的位置
func?up(h?Interface,?j?int)?{
?for?{
??i?:=?(j?-?1)?/?2?//?父節(jié)點(diǎn)的索引
????//?如果?j?就是父節(jié)點(diǎn)或者?j?>=?i(父節(jié)點(diǎn)元素?
??if?i?==?j?||?!h.Less(j,?i)?{
???break
??}
????//?父節(jié)點(diǎn)元素?>?j?的元素,就交換二者
??h.Swap(i,?j)
??j?=?i
?}
}
//?Pop?從堆中移除并返回最小元素(根據(jù)?Less)
//?復(fù)雜度為?O(log?n),其中?n?=?h.Len()
//?Pop?相當(dāng)于?Remove(h,?0)
func?Pop(h?Interface)?interface{}?{
?n?:=?h.Len()?-?1
??//?將最后一個元素填充到堆頂,Pop?是彈出堆頂?shù)脑?/span>
?h.Swap(0,?n)
??//?然后不斷的下沉這個元素
?down(h,?0,?n)
??//?調(diào)用外部的實現(xiàn)者,h.Pop()?實現(xiàn)中會刪除最后一個元素
?return?h.Pop()
}
func?down(h?Interface,?i0,?n?int)?bool?{
?i?:=?i0??//?父節(jié)點(diǎn)索引
?for?{
??j1?:=?2*i?+?1??//?左子節(jié)點(diǎn)的索引
??if?j1?>=?n?||?j1?0?{?//?下沉到最后一個節(jié)點(diǎn)就不處理了
???break
??}
??j?:=?j1??//?左子節(jié)點(diǎn)索引
??if?j2?:=?j1?+?1;?j2????j?=?j2?//?= 2*i + 2 ?//?右子節(jié)點(diǎn)索引
??}
????//?子節(jié)點(diǎn)?>=?父節(jié)點(diǎn)索引,那就不用處理了
??if?!h.Less(j,?i)?{
???break
??}
????//?子節(jié)點(diǎn)?
??h.Swap(i,?j)
??i?=?j
?}
?return?i?>?i0
}
//?重新調(diào)整結(jié)構(gòu)
func?Fix(h?Interface,?i?int)?{
?if?!down(h,?i,?h.Len())?{
??up(h,?i)
?}
}
//?$GOROOT/src/sort/sort.go
//?排序的接口定義
type?Interface?interface?{
?//?集合元素大小
?Len()?int
?//?比較索引?i?和?j?位置的元素大小
?Less(i,?j?int)?bool
?//?交換索引?i?和?j?位置的元素
?Swap(i,?j?int)
}
堆是一種經(jīng)過排序的完全二叉樹,其中任一非終端節(jié)點(diǎn)的數(shù)據(jù)值均不大于(或不小于)其左孩子和右孩子節(jié)點(diǎn)的值。golang 中內(nèi)置的堆是小頂堆(最小堆),任一節(jié)點(diǎn)的值是其子樹所有結(jié)點(diǎn)的最小值:


堆又被稱為優(yōu)先隊列,盡管名為優(yōu)先隊列,但堆并不是隊列。因為隊列中允許的操作是先進(jìn)先出(FIFO),在隊尾插入元素,在隊頭取出元素。而堆雖然在堆底插入元素,在堆頂取出元素,但是堆中元素的排列不是按照到來的先后順序,而是按照一定的優(yōu)先順序排列的。
下圖是插入一個元素的示意圖:


延時隊列實現(xiàn)
接下來我們來看下?waitForPriorityQueue?是如何實現(xiàn)這個優(yōu)先隊列的:
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?獲取隊列長度,pq?就是一個?waitFor?集合,直接返回長度即可
func?(pq?waitForPriorityQueue)?Len()?int?{
?return?len(pq)
}
//?判斷索引?i?和?j?上的元素大小
func?(pq?waitForPriorityQueue)?Less(i,?j?int)?bool?{
??//?根據(jù)時間先后順序來決定先后順序
??//?i?位置的元素時間在?j?之前,則證明索引?i?的元素小于索引?j?的元素
?return?pq[i].readyAt.Before(pq[j].readyAt)
}
//?交換索引?i?和?j?的元素
func?(pq?waitForPriorityQueue)?Swap(i,?j?int)?{
??//?交換元素
?pq[i],?pq[j]?=?pq[j],?pq[i]
??//?更新元素里面的索引信息
?pq[i].index?=?i
?pq[j].index?=?j
}
//?添加元素到隊列中
//?要注意不應(yīng)該直接調(diào)用?Push?函數(shù),而應(yīng)該使用?`heap.Push`
func?(pq?*waitForPriorityQueue)?Push(x?interface{})?{
?n?:=?len(*pq)
?item?:=?x.(*waitFor)
?item.index?=?n
?*pq?=?append(*pq,?item)
}
//?從隊列中彈出最后一個元素
//?要注意不應(yīng)該直接調(diào)用?Pop?函數(shù),而應(yīng)該使用?`heap.Pop`
func?(pq?*waitForPriorityQueue)?Pop()?interface{}?{
?n?:=?len(*pq)
?item?:=?(*pq)[n-1]
?item.index?=?-1
?*pq?=?(*pq)[0:(n?-?1)]
?return?item
}
//?直接獲取隊列開頭的元素,不會刪除元素或改變隊列
func?(pq?waitForPriorityQueue)?Peek()?interface{}?{
?return?pq[0]
}
上面就是 waitForPriorityQueue 這個優(yōu)先隊列的實現(xiàn),接下來我們就來分析延時隊列的具體實現(xiàn)了,因為延時隊列集成通用隊列,所以這里只對新增的函數(shù)做說明:
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?在指定的延遲時間之后將元素?item?添加到隊列中
func?(q?*delayingType)?AddAfter(item?interface{},?duration?time.Duration)?{
?//?如果隊列關(guān)閉了則直接退出
?if?q.ShuttingDown()?{
??return
?}
?q.metrics.retry()
?//?如果延遲的時間<=0,則相當(dāng)于通用隊列一樣添加元素
?if?duration?<=?0?{
??q.Add(item)
??return
?}
?
??//?select?沒有?default?case,所以可能會被阻塞
?select?{
??//?如果調(diào)用了?ShutDown()?則解除阻塞
?case?<-q.stopCh:
??//?把元素封裝成?waitFor?傳給?waitingForAddCh
?case?q.waitingForAddCh?<-?&waitFor{data:?item,?readyAt:?q.clock.Now().Add(duration)}:
?}
}
AddAfter 的函數(shù)實現(xiàn)比較簡單,就是把元素和添加的時間封裝成一個 waitFor 對象,然后發(fā)送給 ?waitingForAddCh?通道,所以具體怎么添加的元素需要查看如何從這個通道消費(fèi)數(shù)據(jù)的地方,也就是 waitingLoop 函數(shù),這個函數(shù)在實例化?DelayingInterface?后就用一個單獨(dú)的協(xié)程啟動了:
//?k8s.io/client-go/util/workqueue/delaying_queue.go
//?waitingLoop?一直運(yùn)行直到工作隊列關(guān)閉為止
//?并對要添加的元素列表進(jìn)行檢查
func?(q?*delayingType)?waitingLoop()?{
?defer?utilruntime.HandleCrash()
?//?創(chuàng)建一個占位符通道,當(dāng)列表中沒有元素的時候利用這個變量實現(xiàn)長時間等待
?never?:=?make(<-chan?time.Time)
?//?構(gòu)造一個定時器,當(dāng)?shù)却犃蓄^部的元素準(zhǔn)備好時,該定時器就會失效
?var?nextReadyAtTimer?clock.Timer
?
??//?構(gòu)造一個優(yōu)先級隊列
?waitingForQueue?:=?&waitForPriorityQueue{}
??//?構(gòu)造小頂堆結(jié)構(gòu)
?heap.Init(waitingForQueue)
?
??//?用來避免元素重復(fù)添加,如果重復(fù)添加了就只更新時間
?waitingEntryByData?:=?map[t]*waitFor{}
??
??//?死循環(huán)
?for?{
????//?隊列如果關(guān)閉了,則直接退出
??if?q.Interface.ShuttingDown()?{
???return
??}
????
????//?獲取當(dāng)前時間
??now?:=?q.clock.Now()
??//?如果優(yōu)先隊列中有元素的話
??for?waitingForQueue.Len()?>?0?{
??????//?獲取第一個元素
???entry?:=?waitingForQueue.Peek().(*waitFor)
??????//?如果第一個元素指定的時間還沒到時間,則跳出循環(huán)
??????//?因為第一個元素是時間最小的
???if?entry.readyAt.After(now)?{
????break
???}
??????//?時間已經(jīng)過了,那就把它從優(yōu)先隊列中拿出來放入通用隊列中
??????//?同時要把元素從上面提到的?map?中刪除,因為不用再判斷重復(fù)添加了
???entry?=?heap.Pop(waitingForQueue).(*waitFor)
???q.Add(entry.data)
???delete(waitingEntryByData,?entry.data)
??}
??nextReadyAt?:=?never
????//?如果優(yōu)先隊列中還有元素,那就用第一個元素指定的時間減去當(dāng)前時間作為等待時間
????//?因為優(yōu)先隊列是用時間排序的,后面的元素需要等待的時間更長,所以先處理排序靠前面的元素
??if?waitingForQueue.Len()?>?0?{
???if?nextReadyAtTimer?!=?nil?{
????nextReadyAtTimer.Stop()
???}
??????//?獲取第一個元素
???entry?:=?waitingForQueue.Peek().(*waitFor)
??????//?第一個元素的時間減去當(dāng)前時間作為等待時間
???nextReadyAtTimer?=?q.clock.NewTimer(entry.readyAt.Sub(now))
???nextReadyAt?=?nextReadyAtTimer.C()
??}
??select?{
????//?退出信號
??case?<-q.stopCh:
???return
????//?定時器,每過一段時間沒有任何數(shù)據(jù),那就再執(zhí)行一次大循環(huán)
??case?<-q.heartbeat.C():
????//?上面的等待時間信號,時間到了就有信號
????//?激活這個case,然后繼續(xù)循環(huán),添加準(zhǔn)備好了的元素
??case?<-nextReadyAt:
????
????//?AddAfter?函數(shù)中放入到通道中的元素,這里從通道中獲取數(shù)據(jù)
??case?waitEntry?:=?<-q.waitingForAddCh:
??????//?如果時間已經(jīng)過了就直接放入通用隊列,沒過就插入到有序隊列
???if?waitEntry.readyAt.After(q.clock.Now())?{
????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
???}?else?{
????????//?放入通用隊列
????q.Add(waitEntry.data)
???}
??????//?下面就是把channel里面的元素全部取出來
??????//?如果沒有數(shù)據(jù)了就直接退出
???drained?:=?false
???for?!drained?{
????select?{
????case?waitEntry?:=?<-q.waitingForAddCh:
?????if?waitEntry.readyAt.After(q.clock.Now())?{
??????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
?????}?else?{
??????q.Add(waitEntry.data)
?????}
????default:
?????drained?=?true
????}
???}
??}
?}
}
//?插入元素到有序隊列,如果已經(jīng)存在了則更新時間
func?insert(q?*waitForPriorityQueue,?knownEntries?map[t]*waitFor,?entry?*waitFor)?{
?//?查看元素是否已經(jīng)存在
?existing,?exists?:=?knownEntries[entry.data]
?if?exists?{
????//?元素存在,就比較誰的時間靠后就用誰的時間
??if?existing.readyAt.After(entry.readyAt)?{
???existing.readyAt?=?entry.readyAt
??????//?時間變了需要重新調(diào)整優(yōu)先級隊列
???heap.Fix(q,?existing.index)
??}
??return
?}
??
??//?把元素放入有序隊列中
?heap.Push(q,?entry)
??//?并記錄在上面的?map?里面,用于判斷是否存在
?knownEntries[entry.data]?=?entry
}
到這里延時隊列核心代碼就分析完了,其實實現(xiàn)的原理很簡單,既然是延時隊列那肯定就有元素執(zhí)行的時間,根據(jù)這個時間的先后順序來構(gòu)造一個優(yōu)先級隊列,時間到了的話就把這個元素放到通用隊列中去進(jìn)行正常的處理就行。

所以核心重點(diǎn)就是優(yōu)先隊列的實現(xiàn),而這里使用的優(yōu)先隊列是 golang 內(nèi)置的 heap 接口實現(xiàn),所以歸根結(jié)底底層都是數(shù)據(jù)結(jié)構(gòu)與算法的運(yùn)用。
K8S進(jìn)階訓(xùn)練營,點(diǎn)擊下方圖片了解詳情

