Kubernetes 源碼學(xué)習(xí)之限速隊(duì)列

前面我們?cè)敿?xì)分析了 client-go 中的延遲隊(duì)列的實(shí)現(xiàn),接下來就是限速隊(duì)列的實(shí)現(xiàn),限速隊(duì)列在我們?nèi)粘?yīng)用中非常廣泛,其原理也比較簡(jiǎn)單,利用延遲隊(duì)列的特性,延遲某個(gè)元素的插入時(shí)間來達(dá)到限速的目的。
所以限速隊(duì)列是擴(kuò)展的延遲隊(duì)列,在其基礎(chǔ)上增加了 AddRateLimited、Forget、NumRequeues 3個(gè)方法。限速隊(duì)列接口的定義如下所示:
//?k8s.io/client-go/util/workqueue/rate_limiting_queue.go
//?RateLimitingInterface?是對(duì)加入隊(duì)列的元素進(jìn)行速率限制的接口
type?RateLimitingInterface?interface?{
??//?延時(shí)隊(duì)列
?DelayingInterface?
?//?在限速器說ok后,將元素item添加到工作隊(duì)列中
?AddRateLimited(item?interface{})
?//?丟棄指定的元素
?Forget(item?interface{})
??//?查詢?cè)胤湃腙?duì)列的次數(shù)
?NumRequeues(item?interface{})?int
}
很明顯我們可以看出來限速隊(duì)列是在延時(shí)隊(duì)列基礎(chǔ)上進(jìn)行的擴(kuò)展,接下來我們查看下限速隊(duì)列的實(shí)現(xiàn)結(jié)構(gòu):
//?k8s.io/client-go/util/workqueue/rate_limiting_queue.go
//?限速隊(duì)列的實(shí)現(xiàn)
type?rateLimitingType?struct?{
??//?同樣集成了延遲隊(duì)列
?DelayingInterface
??//?因?yàn)槭窍匏訇?duì)列,所以在里面定義一個(gè)限速器
?rateLimiter?RateLimiter
}
//?通過限速器獲取延遲時(shí)間,然后加入到延時(shí)隊(duì)列
func?(q?*rateLimitingType)?AddRateLimited(item?interface{})?{
?q.DelayingInterface.AddAfter(item,?q.rateLimiter.When(item))
}
//?直接通過限速器獲取元素放入隊(duì)列的次數(shù)
func?(q?*rateLimitingType)?NumRequeues(item?interface{})?int?{
?return?q.rateLimiter.NumRequeues(item)
}
//?直接通過限速器丟棄指定的元素
func?(q?*rateLimitingType)?Forget(item?interface{})?{
?q.rateLimiter.Forget(item)
}
我們可以看到限速隊(duì)列的實(shí)現(xiàn)非常簡(jiǎn)單,就是通過包含的限速器去實(shí)現(xiàn)各種限速的功能,所以我們要真正去了解的是限速器的實(shí)現(xiàn)原理。
限速器
限速器當(dāng)然也是一種接口抽象,我們可以實(shí)現(xiàn)各種各樣的限速器,甚至不限速也可以,該接口定義如下所示:
//?k8s.io/client-go/util/workqueue/default_rate_limiter.go
//?限速器接口
type?RateLimiter?interface?{
?//?獲取指定的元素需要等待的時(shí)間
?When(item?interface{})?time.Duration
?//?釋放指定元素,表示該元素已經(jīng)被處理了
?Forget(item?interface{})
?//?返回某個(gè)對(duì)象被重新入隊(duì)多少次,監(jiān)控用
?NumRequeues(item?interface{})?int
}
可以看到和上面的限速隊(duì)列的擴(kuò)展接口方法非常類似,索引在實(shí)現(xiàn)的時(shí)候我們都是直接調(diào)用限速器對(duì)應(yīng)的實(shí)現(xiàn)方法。接下來我們來看看幾種限速器的具體實(shí)現(xiàn)。
BucketRateLimiter
第一個(gè)要了解的限速器使用非常頻繁 - BucketRateLimiter(令牌桶限速器),這是一個(gè)固定速率(qps)的限速器,該限速器是利用 golang.org/x/time/rate 庫來實(shí)現(xiàn)的,令牌桶算法內(nèi)部實(shí)現(xiàn)了一個(gè)存放 token(令牌)的“桶”,初始時(shí)“桶”是空的,token 會(huì)以固定速率往“桶”里填充,直到將其填滿為止,多余的 token 會(huì)被丟棄。每個(gè)元素都會(huì)從令牌桶得到一個(gè) token,只有得到 token 的元素才允許通過,而沒有得到 token 的元素處于等待狀態(tài)。令牌桶算法通過控制發(fā)放 token 來達(dá)到限速目的。
比如抽獎(jiǎng)、搶優(yōu)惠、投票、報(bào)名……等場(chǎng)景,在面對(duì)突然到來的上百倍流量峰值,除了消息隊(duì)列,預(yù)留容量以外,我們可以考慮做峰值限流。因?yàn)閷?duì)于大部分營(yíng)銷類活動(dòng),消息限流(對(duì)被限流的消息直接丟棄,并直接回復(fù):“系統(tǒng)繁忙,請(qǐng)稍后再試。”)并不會(huì)對(duì)營(yíng)銷的結(jié)果有太大影響。
令牌桶是有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定的速率向桶中放 Token,桶滿了就暫時(shí)不放了,而用戶則從桶中取 Token,如果有剩余的 Token 就可以一直取,如果沒有剩余的 Token,則需要等到系統(tǒng)中放置了 Token 才行。

golang 中就自帶了一個(gè)令牌桶限速器的實(shí)現(xiàn),我們可以使用以下方法構(gòu)造一個(gè)限速器對(duì)象:
limiter?:=?NewLimiter(10,?1);
該構(gòu)造函數(shù)包含兩個(gè)參數(shù):
第一個(gè)參數(shù)是? r Limit。代表每秒可以向 Token 桶中產(chǎn)生多少 token,Limit 實(shí)際上是 float64 的別名。第二個(gè)參數(shù)是? b int。b 代表 Token 桶的容量大小。
上面我們構(gòu)造出的限速器含義就是,其令牌桶大小為 1,以每秒 10 個(gè) Token 的速率向桶中放置 Token。
除了直接指定每秒產(chǎn)生的 Token 個(gè)數(shù)外,還可以用 Every 方法來指定向 Token 桶中放置 Token 的間隔,例如:
limit?:=?Every(100?*?time.Millisecond)
limiter?:=?NewLimiter(limit,?1)
以上就表示每 100ms 往桶中放一個(gè) Token,本質(zhì)上也就是一秒鐘產(chǎn)生 10 個(gè)。
Limiter 提供了三類方法供用戶消費(fèi) Token,用戶可以每次消費(fèi)一個(gè) Token,也可以一次性消費(fèi)多個(gè) Token。而每種方法代表了當(dāng) Token 不足時(shí),各自不同的對(duì)應(yīng)手段。
Wait/WaitN
func?(lim?*Limiter)?Wait(ctx?context.Context)?(err?error)
func?(lim?*Limiter)?WaitN(ctx?context.Context,?n?int)?(err?error)
Wait 實(shí)際上就是?WaitN(ctx,1)。當(dāng)使用 Wait 方法消費(fèi) Token 時(shí),如果此時(shí)桶內(nèi) Token 不足時(shí) (小于 N),那么 Wait 方法將會(huì)阻塞一段時(shí)間,直至 Token 滿足條件,當(dāng)然如果充足則直接返回。我們可以看到,Wait 方法有一個(gè) context 參數(shù),我們可以設(shè)置 context 的 Deadline 或者 Timeout,來決定此次 Wait 的最長(zhǎng)時(shí)間。
Allow/AllowN
func?(lim?*Limiter)?Allow()?bool
func?(lim?*Limiter)?AllowN(now?time.Time,?n?int)?bool
Allow 實(shí)際上就是?AllowN(time.Now(),1)。AllowN 方法表示,截止到某一時(shí)刻,目前桶中數(shù)目是否至少為 n 個(gè),滿足則返回 true,同時(shí)從桶中消費(fèi) n 個(gè) token。反之返回不消費(fèi) Token,false。通常對(duì)應(yīng)這樣的線上場(chǎng)景,如果請(qǐng)求速率過快,就直接丟到某些請(qǐng)求。
Reserve/ReserveN
func?(lim?*Limiter)?Reserve()?*Reservation
func?(lim?*Limiter)?ReserveN(now?time.Time,?n?int)?*Reservation
Reserve 相當(dāng)于?ReserveN(time.Now(), 1)。ReserveN 的用法就相對(duì)來說復(fù)雜一些,當(dāng)調(diào)用完成后,無論 Token 是否充足,都會(huì)返回一個(gè) Reservation 對(duì)象指針。
你可以調(diào)用該對(duì)象的 Delay() 方法,該方法返回了需要等待的時(shí)間,如果等待時(shí)間為 0,則說明不用等待。必須等到等待時(shí)間之后,才能進(jìn)行接下來的工作。或者,如果不想等待,可以調(diào)用 Cancel() 方法,該方法會(huì)將 Token 歸還。
動(dòng)態(tài)調(diào)整速率
此外 Limiter 還支持調(diào)整速率和桶大小:
SetLimit(Limit)改變放入 Token 的速率SetBurst(int)改變 Token 桶大小
有了這兩個(gè)方法,可以根據(jù)現(xiàn)有環(huán)境和條件,根據(jù)我們的需求,動(dòng)態(tài)的改變 Token 桶大小和速率。
令牌桶限速器實(shí)現(xiàn)
了解了令牌桶如何使用后,接下來就可以直接查看下令牌桶限速器是如何實(shí)現(xiàn)的:
//?k8s.io/client-go/util/workqueue/default_rate_limiter.go
//?令牌桶限速器,固定速率(qps)
type?BucketRateLimiter?struct?{
??//?golang?自帶的?Limiter?
?*rate.Limiter
}
var?_?RateLimiter?=?&BucketRateLimiter{}
func?(r?*BucketRateLimiter)?When(item?interface{})?time.Duration?{
?//?獲取需要等待的時(shí)間(延遲),而且這個(gè)延遲是一個(gè)相對(duì)固定的周期
??return?r.Limiter.Reserve().Delay()
}
func?(r?*BucketRateLimiter)?NumRequeues(item?interface{})?int?{
?//?固定頻率,不需要重試
??return?0
}
func?(r?*BucketRateLimiter)?Forget(item?interface{})?{
??//?不需要重試,因此也不需要忘記
}
令牌桶限速器里面直接包裝一個(gè)令牌桶 Limiter 對(duì)象,直接通過 Limiter.Reserve().Delay() 方法就可以獲取元素需要延遲的時(shí)間,再使用這個(gè)限速器的時(shí)候,默認(rèn)初始化參數(shù)為:
//?k8s.io/client-go/util/workqueue/default_rate_limiter.go
BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)}
通過 rate.NewLimiter 實(shí)例化,傳入 r 和 b 兩個(gè)參數(shù),r 表示每秒往“”桶中填充 token 的數(shù)量,b 表示令牌桶的大小,這里可以看到默認(rèn)的參數(shù)為速率為10,即每秒放入10個(gè) bucket,桶容量大小為100。
ItemExponentialFailureRateLimiter
ItemExponentialFailureRateLimiter(指數(shù)增長(zhǎng)限速器) 是比較常用的限速器,從字面意思解釋是元素錯(cuò)誤次數(shù)指數(shù)遞增限速器,他會(huì)根據(jù)元素錯(cuò)誤次數(shù)逐漸累加等待時(shí)間,具體實(shí)現(xiàn)如下:
//?k8s.io/client-go/util/workqueue/default_rate_limiters.go
//?當(dāng)對(duì)象處理失敗的時(shí)候,其再次入隊(duì)的等待時(shí)間?×?2,到?MaxDelay?為止,直到超過最大失敗次數(shù)
type?ItemExponentialFailureRateLimiter?struct?{
????//?修改失敗次數(shù)用到的鎖
????failuresLock?sync.Mutex???????????
????//?記錄每個(gè)元素錯(cuò)誤次數(shù)
????failures?????map[interface{}]int??
????//?元素延遲基數(shù)
????baseDelay?time.Duration???????????
????//?元素最大的延遲時(shí)間
????maxDelay??time.Duration???????????
}
//?實(shí)現(xiàn)限速器的When接口
func?(r?*ItemExponentialFailureRateLimiter)?When(item?interface{})?time.Duration?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
????//?累加錯(cuò)誤計(jì)數(shù)
????exp?:=?r.failures[item]
????r.failures[item]?=?r.failures[item]?+?1
?
????//?通過錯(cuò)誤次數(shù)計(jì)算延遲時(shí)間,公式是2^i?*?baseDelay,按指數(shù)遞增
????backoff?:=?float64(r.baseDelay.Nanoseconds())?*?math.Pow(2,?float64(exp))
????if?backoff?>?math.MaxInt64?{
????????//?最大延遲時(shí)間
????????return?r.maxDelay
????}
????//?計(jì)算后的延遲值和最大延遲值之間取最小值
????calculated?:=?time.Duration(backoff)
????if?calculated?>?r.maxDelay?{
????????return?r.maxDelay
????}
?
????return?calculated
}
//?元素錯(cuò)誤次數(shù),直接從?failures?中取
func?(r?*ItemExponentialFailureRateLimiter)?NumRequeues(item?interface{})?int?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
?
????return?r.failures[item]
}
//??直接從?failures?刪除指定元素
func?(r?*ItemExponentialFailureRateLimiter)?Forget(item?interface{})?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
?
????delete(r.failures,?item)
}
從上面 When() 函數(shù)中的算法實(shí)現(xiàn)來看,該限速器是出現(xiàn)錯(cuò)誤后不斷嘗試的過程,而且隨著嘗試次數(shù)的增加按照指數(shù)增加延遲時(shí)間。
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter (快慢限速器)和 ItemExponentialFailureRateLimiter 很像,都是用于錯(cuò)誤嘗試的,但是 ItemFastSlowRateLimiter 的限速策略是嘗試次數(shù)超過閾值用長(zhǎng)延遲,否則用短延遲,不過該限速器很少使用。
//?k8s.io/client-go/util/workqueue/default_rate_limiters.go
//?快慢限速器,先以?fastDelay?為周期進(jìn)行嘗試,超過?maxFastAttempts?次數(shù)后,按照?slowDelay?為周期進(jìn)行嘗試
type?ItemFastSlowRateLimiter?struct?{
????failuresLock?sync.Mutex??????????
????//?錯(cuò)誤次數(shù)計(jì)數(shù)
????failures?????map[interface{}]int?
????//?錯(cuò)誤嘗試閾值
????maxFastAttempts?int????????
????//?短延遲時(shí)間??????
????fastDelay???????time.Duration????
????//?長(zhǎng)延遲時(shí)間
????slowDelay???????time.Duration????
}
func?(r?*ItemFastSlowRateLimiter)?When(item?interface{})?time.Duration?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
?
????//?一樣累加錯(cuò)誤計(jì)數(shù)
????r.failures[item]?=?r.failures[item]?+?1
????//?錯(cuò)誤次數(shù)還未超過閾值
????if?r.failures[item]?<=?r.maxFastAttempts?{
????????//?用短延遲
????????return?r.fastDelay
????}
????//?超過了用長(zhǎng)延遲
???return?r.slowDelay
}
func?(r?*ItemFastSlowRateLimiter)?NumRequeues(item?interface{})?int?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
?
????return?r.failures[item]
}
func?(r?*ItemFastSlowRateLimiter)?Forget(item?interface{})?{
????r.failuresLock.Lock()
????defer?r.failuresLock.Unlock()
?
????delete(r.failures,?item)
}
MaxOfRateLimiter
MaxOfRateLimiter 也可以叫混合限速器,他內(nèi)部有多個(gè)限速器,選擇所有限速器中速度最慢(延遲最大)的一種方案。比如內(nèi)部有三個(gè)限速器,When() 接口返回的就是三個(gè)限速器里面延遲最大的。
//?k8s.io/client-go/util/workqueue/default_rate_limiters.go
//?混合限速器,選擇所有限速器中速度最慢的一種方案
type?MaxOfRateLimiter?struct?{
????//?限速器數(shù)組
????limiters?[]RateLimiter???
}
func?(r?*MaxOfRateLimiter)?When(item?interface{})?time.Duration?{
????ret?:=?time.Duration(0)
????//?獲取所有限速器里面時(shí)間最大的延遲時(shí)間
????for?_,?limiter?:=?range?r.limiters?{
????????curr?:=?limiter.When(item)
????????if?curr?>?ret?{
????????????ret?=?curr
????????}
????}
????return?ret
}
func?(r?*MaxOfRateLimiter)?NumRequeues(item?interface{})?int?{
?ret?:=?0
????//?Requeues?次數(shù)也是取最大值
????for?_,?limiter?:=?range?r.limiters?{
????????curr?:=?limiter.NumRequeues(item)
????????if?curr?>?ret?{
????????????ret?=?curr
????????}
????}
????return?ret
}
func?(r?*MaxOfRateLimiter)?Forget(item?interface{})?{
????//?循環(huán)遍歷?Forget
????for?_,?limiter?:=?range?r.limiters?{
????????limiter.Forget(item)
????}
}
混合限速器的實(shí)現(xiàn)非常簡(jiǎn)單,而在 Kubernetes 中默認(rèn)的控制器限速器初始化就是使用的混合限速器:
//?k8s.io/client-go/util/workqueue/default_rate_limiters.go
//?實(shí)例化默認(rèn)的限速器,由?ItemExponentialFailureRateLimiter?和
//?BucketRateLimiter?組成的混合限速器
func?DefaultControllerRateLimiter()?RateLimiter?{
?return?NewMaxOfRateLimiter(
??NewItemExponentialFailureRateLimiter(5*time.Millisecond,?1000*time.Second),
??//?10?qps,?100?bucket?容量
??&BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)},
?)
}
func?NewItemExponentialFailureRateLimiter(baseDelay?time.Duration,?maxDelay?time.Duration)?RateLimiter?{
?return?&ItemExponentialFailureRateLimiter{
??failures:??map[interface{}]int{},
??baseDelay:?baseDelay,
??maxDelay:??maxDelay,
?}
}
到這里我們就將限速隊(duì)列分析完了,接下來我們需要了解下 WorkQueue 在控制器中是如何使用的。
K8S進(jìn)階訓(xùn)練營(yíng),點(diǎn)擊下方圖片了解詳情

