Kubernetes client-go 源碼分析 - workqueue
概述
源碼版本信息
Project: kubernetes Branch: master Last commit id: d25d741c Date: 2021-09-26
自定義控制器涉及到的 client-go 組件整體工作流程,大致如下圖:

今天我們來詳細(xì)研究下 workqueue 相關(guān)代碼。client-go 的 util/workqueue 包里主要有三個(gè)隊(duì)列,分別是普通隊(duì)列,延時(shí)隊(duì)列,限速隊(duì)列,后一個(gè)隊(duì)列以前一個(gè)隊(duì)列的實(shí)現(xiàn)為基礎(chǔ),層層添加新功能,我們按照 Queue、DelayingQueue、RateLimitingQueue 的順序?qū)訉訐荛_來看限速隊(duì)列是如何實(shí)現(xiàn)的。
Queue
接口和結(jié)構(gòu)體
先看接口定義:
k8s.io/client-go/util/workqueue/queue.go:26
type?Interface?interface?{
???Add(item?interface{})??//?添加一個(gè)元素
???Len()?int??????????????//?元素個(gè)數(shù)
???Get()?(item?interface{},?shutdown?bool)?//?獲取一個(gè)元素,第二個(gè)返回值和?channel?類似,標(biāo)記隊(duì)列是否關(guān)閉了
???Done(item?interface{})?//?標(biāo)記一個(gè)元素已經(jīng)處理完
???ShutDown()?????????????//?關(guān)閉隊(duì)列
???ShuttingDown()?bool????//?是否正在關(guān)閉
}
這個(gè)基礎(chǔ)的隊(duì)列接口定義很清晰,我們繼續(xù)來看其實(shí)現(xiàn)的類型:
type?Type?struct?{
???queue?[]t????????????//?定義元素的處理順序,里面所有元素都應(yīng)該在?dirty?set?中有,而不能出現(xiàn)在?processing?set?中
???dirty?set????????????//?標(biāo)記所有需要被處理的元素
???processing?set???????//?當(dāng)前正在被處理的元素,當(dāng)處理完后需要檢查該元素是否在?dirty?set?中,如果有則添加到?queue?里
???cond?*sync.Cond??????//?條件鎖
???shuttingDown?bool????//?是否正在關(guān)閉
???metrics?queueMetrics
???unfinishedWorkUpdatePeriod?time.Duration
???clock??????????????????????clock.Clock
}
Queue 的工作邏輯大致是這樣,里面的三個(gè)屬性 queue、dirty、processing 都保存 items,但是含義有所不同:
queue:這是一個(gè) []t 類型,也就是一個(gè)切片,因?yàn)槠溆行颍赃@里當(dāng)作一個(gè)列表來存儲(chǔ) item 的處理順序。 dirty:這是一個(gè) set 類型,也就是一個(gè)集合,這個(gè)集合存儲(chǔ)的是所有需要處理的 item,這些 item 也會(huì)保存在 queue 中,但是 set 里是無需的,set 的特性是唯一。 processing:這也是一個(gè) set,存放的是當(dāng)前正在處理的 item,也就是說這個(gè) item 來自 queue 出隊(duì)的元素,同時(shí)這個(gè)元素會(huì)被從 dirty 中刪除。
下面分別介紹 set 類型和 Queue 接口的集合核心方法的實(shí)現(xiàn)。
set
上面提到的 dirty 和 processing 字段都是 set 類型,set 相關(guān)定義如下:
type?empty?struct{}
type?t?interface{}
type?set?map[t]empty
func?(s?set)?has(item?t)?bool?{
???_,?exists?:=?s[item]
???return?exists
}
func?(s?set)?insert(item?t)?{
???s[item]?=?empty{}
}
func?(s?set)?delete(item?t)?{
???delete(s,?item)
}
set 是一個(gè)空接口到空結(jié)構(gòu)體的 map,也就是實(shí)現(xiàn)了一個(gè)集合的功能,集合元素是 interface{} 類型,也就是可以存儲(chǔ)任意類型。而 map 的 value 是 struct{} 類型,也就是空。這里利用 map 的 key 唯一的特性實(shí)現(xiàn)了一個(gè)集合類型,附帶三個(gè)方法 has()、insert()、delete() 來實(shí)現(xiàn)集合相關(guān)操作。
Add()
Add() 方法用于標(biāo)記一個(gè) item 需要被處理,代碼如下:
func?(q?*Type)?Add(item?interface{})?{
???q.cond.L.Lock()
???defer?q.cond.L.Unlock()
???if?q.shuttingDown?{?//?如果?queue?正在被關(guān)閉,則返回
??????return
???}
???if?q.dirty.has(item)?{?//?如果?dirty?set?中已經(jīng)有了該?item,則返回
??????return
???}
???q.metrics.add(item)
???q.dirty.insert(item)?//?添加到?dirty?set?中
???if?q.processing.has(item)?{?//?如果正在被處理,則返回
??????return
???}
???q.queue?=?append(q.queue,?item)?//?如果沒有正在處理,則加到?q.queue?中
???q.cond.Signal()?//?通知某個(gè)?getter?有新?item?到來
}
Get()
func?(q?*Type)?Get()?(item?interface{},?shutdown?bool)?{
???q.cond.L.Lock()
???defer?q.cond.L.Unlock()
???for?len(q.queue)?==?0?&&?!q.shuttingDown?{?//?如果?q.queue?為空,并且沒有正在關(guān)閉,則等待下一個(gè)?item?的到來
??????q.cond.Wait()
???}
???if?len(q.queue)?==?0?{?//?這時(shí)候如果?q.queue?長(zhǎng)度還是?0,說明?q.shuttingDown?為?true,所以直接返回
??????return?nil,?true
???}
???item,?q.queue?=?q.queue[0],?q.queue[1:]?//?獲取?q.queue?第一個(gè)元素,同時(shí)更新?q.queue
???q.metrics.get(item)
???q.processing.insert(item)?//?剛才獲取到的?q.queue?第一個(gè)元素放到?processing?set?中
???q.dirty.delete(item)?//?dirty?set?中刪除該元素
???return?item,?false?//?返回?item
}
Done()
func?(q?*Type)?Done(item?interface{})?{
???q.cond.L.Lock()
???defer?q.cond.L.Unlock()
???q.metrics.done(item)
???q.processing.delete(item)?//?processing?set?中刪除該?item
???if?q.dirty.has(item)?{?//?如果?dirty?中還有,說明還需要再次處理,放到?q.queue?中
??????q.queue?=?append(q.queue,?item)
??????q.cond.Signal()?//?通知某個(gè)?getter?有新的?item
???}
}
DelayingQueue
接口和結(jié)構(gòu)體
還是先看接口定義:
k8s.io/client-go/util/workqueue/delaying_queue.go:30
type?DelayingInterface?interface?{
???Interface
???//?AddAfter?adds?an?item?to?the?workqueue?after?the?indicated?duration?has?passed
???AddAfter(item?interface{},?duration?time.Duration)
}
相比 Queue 這里只是多了一個(gè) AddAfter(item interface{}, duration time.Duration) 方法,望文生義,也就是延時(shí)添加 item。
結(jié)構(gòu)體定義:
type?delayingType?struct?{
???Interface???????????????//?用來嵌套普通?Queue
???clock?clock.Clock???????//?計(jì)時(shí)器
???stopCh?chan?struct{}
???stopOnce?sync.Once??????//?用來確保?ShutDown()?方法只執(zhí)行一次
???heartbeat?clock.Ticker??//?默認(rèn)10s的心跳,后面用在一個(gè)大循環(huán)里,避免沒有新?item?時(shí)一直卡住
???waitingForAddCh?chan?*waitFor??//?傳遞?waitFor?的?channel,默認(rèn)大小?1000
???metrics?retryMetrics
}
對(duì)于延時(shí)隊(duì)列,我們關(guān)注的入口方法肯定就是新增的 AddAfter() 了,看這個(gè)方法的具體的邏輯前我們先看下上面提到的 waitFor 類型。
waitFor
先看下 waitFor 結(jié)構(gòu)定義,代碼如下:
type?waitFor?struct?{
???data????t??????????//?準(zhǔn)備添加到隊(duì)列中的數(shù)據(jù)
???readyAt?time.Time??//?應(yīng)該被加入隊(duì)列的時(shí)間
???index?int??????????//?在?heap?中的索引
}
然后可以注意到有這樣一行代碼:
type?waitForPriorityQueue?[]*waitFor
這里定義了一個(gè) waitFor 的優(yōu)先級(jí)隊(duì)列,用最小堆的方式來實(shí)現(xiàn),這個(gè)類型實(shí)現(xiàn)了 heap.Interface 接口,我們具體看下源碼:
//?添加一個(gè)?item?到隊(duì)列中
func?(pq?*waitForPriorityQueue)?Push(x?interface{})?{
???n?:=?len(*pq)
???item?:=?x.(*waitFor)
???item.index?=?n
???*pq?=?append(*pq,?item)?//?添加到隊(duì)列的尾巴
}
//?從隊(duì)列尾巴移除一個(gè)?item
func?(pq?*waitForPriorityQueue)?Pop()?interface{}?{
???n?:=?len(*pq)
???item?:=?(*pq)[n-1]
???item.index?=?-1
???*pq?=?(*pq)[0:(n?-?1)]
???return?item
}
//?獲取隊(duì)列第一個(gè)?item
func?(pq?waitForPriorityQueue)?Peek()?interface{}?{
???return?pq[0]
}
NewDelayingQueue
接著看一下 DelayingQueue 相關(guān)的幾個(gè) New 函數(shù),理解了這里的邏輯,才能繼續(xù)往后面分析 AddAfter() 方法。
//?這里可以傳遞一個(gè)名字
func?NewNamedDelayingQueue(name?string)?DelayingInterface?{
???return?NewDelayingQueueWithCustomClock(clock.RealClock{},?name)
}
//?上面一個(gè)函數(shù)只是調(diào)用當(dāng)前函數(shù),附帶一個(gè)名字,這里加了一個(gè)指定?clock?的能力
func?NewDelayingQueueWithCustomClock(clock?clock.Clock,?name?string)?DelayingInterface?{
??return?newDelayingQueue(clock,?NewNamed(name),?name)?//?注意這里的?NewNamed()?函數(shù)
}
func?newDelayingQueue(clock?clock.Clock,?q?Interface,?name?string)?*delayingType?{
???ret?:=?&delayingType{
??????Interface:???????q,
??????clock:???????????clock,
??????heartbeat:???????clock.NewTicker(maxWait),?//?10s?一次心跳
??????stopCh:??????????make(chan?struct{}),
??????waitingForAddCh:?make(chan?*waitFor,?1000),
??????metrics:?????????newRetryMetrics(name),
???}
???go?ret.waitingLoop()?//?留意這里的函數(shù)調(diào)用
???return?ret
}
上面涉及到兩個(gè)細(xì)節(jié):
NewNamed(name) go ret.waitingLoop()
NewNamed() 函數(shù)用于創(chuàng)建一個(gè)前面提到的 Queue 的對(duì)應(yīng)類型 Type 對(duì)象,這個(gè)值被傳遞給了 newDelayingQueue() 函數(shù),進(jìn)而賦值給了 delayingType{} 對(duì)象的 Interface 字段,于是后面 delayingType 類型才能直接調(diào)用 Type 類型實(shí)現(xiàn)的方法。
func?NewNamed(name?string)?*Type?{
???rc?:=?clock.RealClock{}
???return?newQueue(
??????rc,
??????globalMetricsFactory.newQueueMetrics(name,?rc),
??????defaultUnfinishedWorkUpdatePeriod,
???)
}
waitingLoop() 方法邏輯不少,我們單獨(dú)放到下面一個(gè)小節(jié)。
waitingLoop()
這個(gè)方法是實(shí)現(xiàn)延時(shí)隊(duì)列的核心邏輯所在:
func?(q?*delayingType)?waitingLoop()?{
???defer?utilruntime.HandleCrash()
???//?隊(duì)列里沒有?item?時(shí)實(shí)現(xiàn)等待用的
???never?:=?make(<-chan?time.Time)
???var?nextReadyAtTimer?clock.Timer
???//?構(gòu)造一個(gè)有序隊(duì)列
???waitingForQueue?:=?&waitForPriorityQueue{}
???heap.Init(waitingForQueue)?//?這一行其實(shí)是多余的,等下提個(gè)?pr?給它刪掉
???//?這個(gè)?map?用來處理重復(fù)添加邏輯的,下面會(huì)講到
???waitingEntryByData?:=?map[t]*waitFor{}
???//?無限循環(huán)
???for?{
??????//?這個(gè)地方?Interface?是多余的,等下也提個(gè)?pr?把它刪掉吧
??????if?q.Interface.ShuttingDown()?{
?????????return
??????}
??????now?:=?q.clock.Now()
??????//?隊(duì)列里有?item?就開始循環(huán)
??????for?waitingForQueue.Len()?>?0?{
?????????//?獲取第一個(gè)?item
?????????entry?:=?waitingForQueue.Peek().(*waitFor)
?????????//?時(shí)間還沒到,先不處理
?????????if?entry.readyAt.After(now)?{
????????????break
?????????}
????????//?時(shí)間到了,pop 出第一個(gè)元素;注意 waitingForQueue.Pop()?是最后一個(gè) item,heap.Pop()?是第一個(gè)元素
?????????entry?=?heap.Pop(waitingForQueue).(*waitFor)
?????????//?將數(shù)據(jù)加到延時(shí)隊(duì)列里
?????????q.Add(entry.data)
?????????//?map?里刪除已經(jīng)加到延時(shí)隊(duì)列的?item
?????????delete(waitingEntryByData,?entry.data)
??????}
??????//?如果隊(duì)列中有?item,就用第一個(gè)?item?的等待時(shí)間初始化計(jì)時(shí)器,如果為空則一直等待
??????nextReadyAt?:=?never
??????if?waitingForQueue.Len()?>?0?{
?????????if?nextReadyAtTimer?!=?nil?{
????????????nextReadyAtTimer.Stop()
?????????}
?????????entry?:=?waitingForQueue.Peek().(*waitFor)
?????????nextReadyAtTimer?=?q.clock.NewTimer(entry.readyAt.Sub(now))
?????????nextReadyAt?=?nextReadyAtTimer.C()
??????}
??????select?{
??????case?<-q.stopCh:
?????????return
??????case?<-q.heartbeat.C():?//?心跳時(shí)間是?10s,到了就繼續(xù)下一輪循環(huán)
??????case?<-nextReadyAt:?//?第一個(gè)?item?的等到時(shí)間到了,繼續(xù)下一輪循環(huán)
??????case?waitEntry?:=?<-q.waitingForAddCh:?//?waitingForAddCh?收到新的?item
?????????//?如果時(shí)間沒到,就加到優(yōu)先級(jí)隊(duì)列里,如果時(shí)間到了,就直接加到延時(shí)隊(duì)列里
?????????if?waitEntry.readyAt.After(q.clock.Now())?{
????????????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
?????????}?else?{
????????????q.Add(waitEntry.data)
?????????}
?????????//?下面的邏輯就是將?waitingForAddCh?中的數(shù)據(jù)處理完
?????????drained?:=?false
?????????for?!drained?{
????????????select?{
????????????case?waitEntry?:=?<-q.waitingForAddCh:
???????????????if?waitEntry.readyAt.After(q.clock.Now())?{
??????????????????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
???????????????}?else?{
??????????????????q.Add(waitEntry.data)
???????????????}
????????????default:
???????????????drained?=?true
????????????}
?????????}
??????}
???}
}
上面函數(shù)還有一個(gè) insert() 調(diào)用,我們?cè)賮砜匆幌逻@個(gè)插入邏輯:
func?insert(q?*waitForPriorityQueue,?knownEntries?map[t]*waitFor,?entry?*waitFor)?{
???//?這里的主要邏輯是看一個(gè)?entry?是否存在,如果已經(jīng)存在,新的?entry?的?ready?時(shí)間更短,就更新時(shí)間
???existing,?exists?:=?knownEntries[entry.data]
???if?exists?{
??????if?existing.readyAt.After(entry.readyAt)?{
?????????existing.readyAt?=?entry.readyAt?//?如果存在就只更新時(shí)間
?????????heap.Fix(q,?existing.index)
??????}
??????return
???}
???//?如果不存在就丟到?q?里,同時(shí)在?map?里記錄一下,用于查重
???heap.Push(q,?entry)
???knownEntries[entry.data]?=?entry
}
AddAfter()
這個(gè)方法的作用是在指定的延時(shí)到達(dá)之后,在 workqueue 中添加一個(gè)元素,源碼如下:
func?(q?*delayingType)?AddAfter(item?interface{},?duration?time.Duration)?{
???if?q.ShuttingDown()?{?//?已經(jīng)在關(guān)閉中就直接返回
??????return
???}
???q.metrics.retry()
???if?duration?<=?0?{?//?如果時(shí)間到了,就直接添加
??????q.Add(item)
??????return
???}
???select?{
???case?<-q.stopCh:
?????//?構(gòu)造?waitFor{},丟到?waitingForAddCh
???case?q.waitingForAddCh?<-?&waitFor{data:?item,?readyAt:?q.clock.Now().Add(duration)}:
???}
}
RateLimitingQueue
最后一個(gè) workqueue 就是限速隊(duì)列,我們繼續(xù)來看。
接口和結(jié)構(gòu)體
先看接口定義:
k8s.io/client-go/util/workqueue/rate_limiting_queue.go:20
type?RateLimitingInterface?interface?{
???DelayingInterface???????????????????//?延時(shí)隊(duì)列里內(nèi)嵌了普通隊(duì)列,限速隊(duì)列里內(nèi)嵌了延時(shí)隊(duì)列
???AddRateLimited(item?interface{})????//?限速方式往隊(duì)列里加入一個(gè)元素
???Forget(item?interface{})????????????//?標(biāo)識(shí)一個(gè)元素結(jié)束重試
???NumRequeues(item?interface{})?int???//?標(biāo)識(shí)這個(gè)元素被處理里多少次了
}
然后看下兩個(gè) New 函數(shù)。
func?NewRateLimitingQueue(rateLimiter?RateLimiter)?RateLimitingInterface?{
???return?&rateLimitingType{
??????DelayingInterface:?NewDelayingQueue(),
??????rateLimiter:???????rateLimiter,
???}
}
func?NewNamedRateLimitingQueue(rateLimiter?RateLimiter,?name?string)?RateLimitingInterface?{
???return?&rateLimitingType{
??????DelayingInterface:?NewNamedDelayingQueue(name),
??????rateLimiter:???????rateLimiter,
???}
}
這里的區(qū)別就是里面的延時(shí)隊(duì)列有沒有指定的名字。注意到這里有一個(gè) RateLimiter 類型,后面要詳細(xì)講,另外 rateLimitingType 就是上面接口的具體實(shí)現(xiàn)類型了。
RateLimiter
RateLimiter 表示一個(gè)限速器,我們看下限速器是什么意思。先看接口定義:
k8s.io/client-go/util/workqueue/default_rate_limiters.go:27
type?RateLimiter?interface?{
???When(item?interface{})?time.Duration?//?返回一個(gè)?item?需要等待的時(shí)常
???Forget(item?interface{})?????????????//?標(biāo)識(shí)一個(gè)元素結(jié)束重試
???NumRequeues(item?interface{})?int????//?標(biāo)識(shí)這個(gè)元素被處理里多少次了
}
這個(gè)接口有五個(gè)實(shí)現(xiàn),分別叫做:
BucketRateLimiter ItemExponentialFailureRateLimiter ItemFastSlowRateLimiter MaxOfRateLimiter WithMaxWaitRateLimiter
下面分別來看:
BucketRateLimiter 這個(gè)限速器可說的不多,用了 golang 標(biāo)準(zhǔn)庫(kù)的 golang.org/x/time/rate.Limiter 實(shí)現(xiàn)。BucketRateLimiter 實(shí)例化的時(shí)候比如傳遞一個(gè) rate.NewLimiter(rate.Limit(10), 100) 進(jìn)去,表示令牌桶里最多有 100 個(gè)令牌,每秒發(fā)放 10 個(gè)令牌。
type?BucketRateLimiter?struct?{
???*rate.Limiter
}
var?_?RateLimiter?=?&BucketRateLimiter{}
func?(r?*BucketRateLimiter)?When(item?interface{})?time.Duration?{
???return?r.Limiter.Reserve().Delay()?//?過多久后給當(dāng)前?item?發(fā)放一個(gè)令牌
}
func?(r?*BucketRateLimiter)?NumRequeues(item?interface{})?int?{
???return?0
}
func?(r?*BucketRateLimiter)?Forget(item?interface{})?{
}
ItemExponentialFailureRateLimiter
Exponential 是指數(shù)的意思,從這個(gè)限速器的名字大概能猜到是失敗次數(shù)越多,限速越長(zhǎng)而且是指數(shù)級(jí)增長(zhǎng)的一種限速器。
結(jié)構(gòu)體定義如下,屬性含義基本可以望文生義。
type?ItemExponentialFailureRateLimiter?struct?{
???failuresLock?sync.Mutex
???failures?????map[interface{}]int
???baseDelay?time.Duration
???maxDelay??time.Duration
}
主要邏輯是 When() 函數(shù)是如何實(shí)現(xiàn)的:
func?(r?*ItemExponentialFailureRateLimiter)?When(item?interface{})?time.Duration?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???exp?:=?r.failures[item]
???r.failures[item]?=?r.failures[item]?+?1?//?失敗次數(shù)加一
???//?每調(diào)用一次,exp?也就加了1,對(duì)應(yīng)到這里時(shí)?2^n?指數(shù)爆炸
???backoff?:=?float64(r.baseDelay.Nanoseconds())?*?math.Pow(2,?float64(exp))
???if?backoff?>?math.MaxInt64?{?//?如果超過了最大整型,就返回最大延時(shí),不然后面時(shí)間轉(zhuǎn)換溢出了
??????return?r.maxDelay
???}
???calculated?:=?time.Duration(backoff)
???if?calculated?>?r.maxDelay?{?//?如果超過最大延時(shí),則返回最大延時(shí)
??????return?r.maxDelay
???}
???return?calculated
}
另外兩個(gè)函數(shù)太簡(jiǎn)單了:
func?(r?*ItemExponentialFailureRateLimiter)?NumRequeues(item?interface{})?int?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???return?r.failures[item]
}
func?(r?*ItemExponentialFailureRateLimiter)?Forget(item?interface{})?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???delete(r.failures,?item)
}
ItemFastSlowRateLimiter 快慢限速器,也就是先快后慢,定義一個(gè)閾值,超過了就慢慢重試。先看類型定義:
type?ItemFastSlowRateLimiter?struct?{
???failuresLock?sync.Mutex
???failures?????map[interface{}]int
???maxFastAttempts?int????????????//?快速重試的次數(shù)
???fastDelay???????time.Duration??//?快重試間隔
???slowDelay???????time.Duration??//?慢重試間隔
}
同樣繼續(xù)來看具體的方法實(shí)現(xiàn):
func?(r?*ItemFastSlowRateLimiter)?When(item?interface{})?time.Duration?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???r.failures[item]?=?r.failures[item]?+?1?//?標(biāo)識(shí)重試次數(shù)?+?1
???if?r.failures[item]?<=?r.maxFastAttempts?{?//?如果快重試次數(shù)沒有用完,則返回?fastDelay
??????return?r.fastDelay
???}
???return?r.slowDelay?//?反之返回?slowDelay
}
func?(r?*ItemFastSlowRateLimiter)?NumRequeues(item?interface{})?int?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???return?r.failures[item]
}
func?(r?*ItemFastSlowRateLimiter)?Forget(item?interface{})?{
???r.failuresLock.Lock()
???defer?r.failuresLock.Unlock()
???delete(r.failures,?item)
}
MaxOfRateLimiter 這個(gè)限速器看著有點(diǎn)樂呵人,內(nèi)部放多個(gè)限速器,然后返回限速最狠的一個(gè)延時(shí):
type?MaxOfRateLimiter?struct?{
???limiters?[]RateLimiter
}
func?(r?*MaxOfRateLimiter)?When(item?interface{})?time.Duration?{
???ret?:=?time.Duration(0)
???for?_,?limiter?:=?range?r.limiters?{
??????curr?:=?limiter.When(item)
??????if?curr?>?ret?{
?????????ret?=?curr
??????}
???}
???return?ret
}
WithMaxWaitRateLimiter 這個(gè)限速器也很簡(jiǎn)單,就是在其他限速器上包裝一個(gè)最大延遲的屬性,如果到了最大延時(shí),則直接返回:
type?WithMaxWaitRateLimiter?struct?{
???limiter??RateLimiter???//?其他限速器
???maxDelay?time.Duration?//?最大延時(shí)
}
func?NewWithMaxWaitRateLimiter(limiter?RateLimiter,?maxDelay?time.Duration)?RateLimiter?{
???return?&WithMaxWaitRateLimiter{limiter:?limiter,?maxDelay:?maxDelay}
}
func?(w?WithMaxWaitRateLimiter)?When(item?interface{})?time.Duration?{
???delay?:=?w.limiter.When(item)
???if?delay?>?w.maxDelay?{
??????return?w.maxDelay?//?已經(jīng)超過了最大延時(shí),直接返回最大延時(shí)
???}
???return?delay
}
限速隊(duì)列的實(shí)現(xiàn)
看完了上面的限速器的概念,限速隊(duì)列的實(shí)現(xiàn)就很簡(jiǎn)單了:
func?(q?*rateLimitingType)?AddRateLimited(item?interface{})?{
???//?內(nèi)部存了一個(gè)延時(shí)隊(duì)列,通過限速器計(jì)算出一個(gè)等待時(shí)間,然后傳給延時(shí)隊(duì)列
???q.DelayingInterface.AddAfter(item,?q.rateLimiter.When(item))
}
func?(q?*rateLimitingType)?NumRequeues(item?interface{})?int?{
???return?q.rateLimiter.NumRequeues(item)
}
func?(q?*rateLimitingType)?Forget(item?interface{})?{
???q.rateLimiter.Forget(item)
}