<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kubernetes 源碼學習之限速隊列

          共 8247字,需瀏覽 17分鐘

           ·

          2020-09-29 00:48

          前面我們詳細分析了 client-go 中的延遲隊列的實現(xiàn),接下來就是限速隊列的實現(xiàn),限速隊列在我們?nèi)粘弥蟹浅V泛,其原理也比較簡單,利用延遲隊列的特性,延遲某個元素的插入時間來達到限速的目的。

          所以限速隊列是擴展的延遲隊列,在其基礎(chǔ)上增加了 AddRateLimitedForget、NumRequeues 3個方法。限速隊列接口的定義如下所示:

          //?k8s.io/client-go/util/workqueue/rate_limiting_queue.go

          //?RateLimitingInterface?是對加入隊列的元素進行速率限制的接口
          type?RateLimitingInterface?interface?{
          ??//?延時隊列
          ?DelayingInterface?

          ?//?在限速器說ok后,將元素item添加到工作隊列中
          ?AddRateLimited(item?interface{})

          ?//?丟棄指定的元素
          ?Forget(item?interface{})

          ??//?查詢元素放入隊列的次數(shù)
          ?NumRequeues(item?interface{})?int
          }

          很明顯我們可以看出來限速隊列是在延時隊列基礎(chǔ)上進行的擴展,接下來我們查看下限速隊列的實現(xiàn)結(jié)構(gòu):

          //?k8s.io/client-go/util/workqueue/rate_limiting_queue.go

          //?限速隊列的實現(xiàn)
          type?rateLimitingType?struct?{
          ??//?同樣集成了延遲隊列
          ?DelayingInterface
          ??//?因為是限速隊列,所以在里面定義一個限速器
          ?rateLimiter?RateLimiter
          }

          //?通過限速器獲取延遲時間,然后加入到延時隊列
          func?(q?*rateLimitingType)?AddRateLimited(item?interface{})?{
          ?q.DelayingInterface.AddAfter(item,?q.rateLimiter.When(item))
          }

          //?直接通過限速器獲取元素放入隊列的次數(shù)
          func?(q?*rateLimitingType)?NumRequeues(item?interface{})?int?{
          ?return?q.rateLimiter.NumRequeues(item)
          }

          //?直接通過限速器丟棄指定的元素
          func?(q?*rateLimitingType)?Forget(item?interface{})?{
          ?q.rateLimiter.Forget(item)
          }

          我們可以看到限速隊列的實現(xiàn)非常簡單,就是通過包含的限速器去實現(xiàn)各種限速的功能,所以我們要真正去了解的是限速器的實現(xiàn)原理。

          限速器

          限速器當然也是一種接口抽象,我們可以實現(xiàn)各種各樣的限速器,甚至不限速也可以,該接口定義如下所示:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          //?限速器接口
          type?RateLimiter?interface?{
          ?//?獲取指定的元素需要等待的時間
          ?When(item?interface{})?time.Duration
          ?//?釋放指定元素,表示該元素已經(jīng)被處理了
          ?Forget(item?interface{})
          ?//?返回某個對象被重新入隊多少次,監(jiān)控用
          ?NumRequeues(item?interface{})?int
          }

          可以看到和上面的限速隊列的擴展接口方法非常類似,索引在實現(xiàn)的時候我們都是直接調(diào)用限速器對應的實現(xiàn)方法。接下來我們來看看幾種限速器的具體實現(xiàn)。

          BucketRateLimiter

          第一個要了解的限速器使用非常頻繁 - BucketRateLimiter(令牌桶限速器),這是一個固定速率(qps)的限速器,該限速器是利用 golang.org/x/time/rate 庫來實現(xiàn)的,令牌桶算法內(nèi)部實現(xiàn)了一個存放 token(令牌)的“桶”,初始時“桶”是空的,token 會以固定速率往“桶”里填充,直到將其填滿為止,多余的 token 會被丟棄。每個元素都會從令牌桶得到一個 token,只有得到 token 的元素才允許通過,而沒有得到 token 的元素處于等待狀態(tài)。令牌桶算法通過控制發(fā)放 token 來達到限速目的。

          比如抽獎、搶優(yōu)惠、投票、報名……等場景,在面對突然到來的上百倍流量峰值,除了消息隊列,預留容量以外,我們可以考慮做峰值限流。因為對于大部分營銷類活動,消息限流(對被限流的消息直接丟棄,并直接回復:“系統(tǒng)繁忙,請稍后再試?!保┎⒉粫I銷的結(jié)果有太大影響。

          令牌桶是有一個固定大小的桶,系統(tǒng)會以恒定的速率向桶中放 Token,桶滿了就暫時不放了,而用戶則從桶中取 Token,如果有剩余的 Token 就可以一直取,如果沒有剩余的 Token,則需要等到系統(tǒng)中放置了 Token 才行。

          golang 中就自帶了一個令牌桶限速器的實現(xiàn),我們可以使用以下方法構(gòu)造一個限速器對象:

          limiter?:=?NewLimiter(10,?1);

          該構(gòu)造函數(shù)包含兩個參數(shù):

          1. 第一個參數(shù)是?r Limit。代表每秒可以向 Token 桶中產(chǎn)生多少 token,Limit 實際上是 float64 的別名。
          2. 第二個參數(shù)是?b int。b 代表 Token 桶的容量大小。

          上面我們構(gòu)造出的限速器含義就是,其令牌桶大小為 1,以每秒 10 個 Token 的速率向桶中放置 Token。

          除了直接指定每秒產(chǎn)生的 Token 個數(shù)外,還可以用 Every 方法來指定向 Token 桶中放置 Token 的間隔,例如:

          limit?:=?Every(100?*?time.Millisecond)
          limiter?:=?NewLimiter(limit,?1)

          以上就表示每 100ms 往桶中放一個 Token,本質(zhì)上也就是一秒鐘產(chǎn)生 10 個。

          Limiter 提供了三類方法供用戶消費 Token,用戶可以每次消費一個 Token,也可以一次性消費多個 Token。而每種方法代表了當 Token 不足時,各自不同的對應手段。

          Wait/WaitN

          func?(lim?*Limiter)?Wait(ctx?context.Context)?(err?error)
          func?(lim?*Limiter)?WaitN(ctx?context.Context,?n?int)?(err?error)

          Wait 實際上就是?WaitN(ctx,1)。當使用 Wait 方法消費 Token 時,如果此時桶內(nèi) Token 不足時 (小于 N),那么 Wait 方法將會阻塞一段時間,直至 Token 滿足條件,當然如果充足則直接返回。我們可以看到,Wait 方法有一個 context 參數(shù),我們可以設置 context 的 Deadline 或者 Timeout,來決定此次 Wait 的最長時間。

          Allow/AllowN

          func?(lim?*Limiter)?Allow()?bool
          func?(lim?*Limiter)?AllowN(now?time.Time,?n?int)?bool

          Allow 實際上就是?AllowN(time.Now(),1)。AllowN 方法表示,截止到某一時刻,目前桶中數(shù)目是否至少為 n 個,滿足則返回 true,同時從桶中消費 n 個 token。反之返回不消費 Token,false。通常對應這樣的線上場景,如果請求速率過快,就直接丟到某些請求。

          Reserve/ReserveN

          func?(lim?*Limiter)?Reserve()?*Reservation
          func?(lim?*Limiter)?ReserveN(now?time.Time,?n?int)?*Reservation

          Reserve 相當于?ReserveN(time.Now(), 1)。ReserveN 的用法就相對來說復雜一些,當調(diào)用完成后,無論 Token 是否充足,都會返回一個 Reservation 對象指針。

          你可以調(diào)用該對象的 Delay() 方法,該方法返回了需要等待的時間,如果等待時間為 0,則說明不用等待。必須等到等待時間之后,才能進行接下來的工作。或者,如果不想等待,可以調(diào)用 Cancel() 方法,該方法會將 Token 歸還。

          動態(tài)調(diào)整速率

          此外 Limiter 還支持調(diào)整速率和桶大?。?/p>

          1. SetLimit(Limit) 改變放入 Token 的速率
          2. SetBurst(int) 改變 Token 桶大小

          有了這兩個方法,可以根據(jù)現(xiàn)有環(huán)境和條件,根據(jù)我們的需求,動態(tài)的改變 Token 桶大小和速率。

          令牌桶限速器實現(xiàn)

          了解了令牌桶如何使用后,接下來就可以直接查看下令牌桶限速器是如何實現(xiàn)的:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          //?令牌桶限速器,固定速率(qps)
          type?BucketRateLimiter?struct?{
          ??//?golang?自帶的?Limiter?
          ?*rate.Limiter
          }

          var?_?RateLimiter?=?&BucketRateLimiter{}

          func?(r?*BucketRateLimiter)?When(item?interface{})?time.Duration?{
          ?//?獲取需要等待的時間(延遲),而且這個延遲是一個相對固定的周期
          ??return?r.Limiter.Reserve().Delay()
          }

          func?(r?*BucketRateLimiter)?NumRequeues(item?interface{})?int?{
          ?//?固定頻率,不需要重試
          ??return?0
          }

          func?(r?*BucketRateLimiter)?Forget(item?interface{})?{
          ??//?不需要重試,因此也不需要忘記
          }

          令牌桶限速器里面直接包裝一個令牌桶 Limiter 對象,直接通過 Limiter.Reserve().Delay() 方法就可以獲取元素需要延遲的時間,再使用這個限速器的時候,默認初始化參數(shù)為:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)}

          通過 rate.NewLimiter 實例化,傳入 r 和 b 兩個參數(shù),r 表示每秒往“”桶中填充 token 的數(shù)量,b 表示令牌桶的大小,這里可以看到默認的參數(shù)為速率為10,即每秒放入10個 bucket,桶容量大小為100。

          ItemExponentialFailureRateLimiter

          ItemExponentialFailureRateLimiter(指數(shù)增長限速器) 是比較常用的限速器,從字面意思解釋是元素錯誤次數(shù)指數(shù)遞增限速器,他會根據(jù)元素錯誤次數(shù)逐漸累加等待時間,具體實現(xiàn)如下:

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?當對象處理失敗的時候,其再次入隊的等待時間?×?2,到?MaxDelay?為止,直到超過最大失敗次數(shù)
          type?ItemExponentialFailureRateLimiter?struct?{
          ????//?修改失敗次數(shù)用到的鎖
          ????failuresLock?sync.Mutex???????????
          ????//?記錄每個元素錯誤次數(shù)
          ????failures?????map[interface{}]int??
          ????//?元素延遲基數(shù)
          ????baseDelay?time.Duration???????????
          ????//?元素最大的延遲時間
          ????maxDelay??time.Duration???????????
          }

          //?實現(xiàn)限速器的When接口
          func?(r?*ItemExponentialFailureRateLimiter)?When(item?interface{})?time.Duration?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()

          ????//?累加錯誤計數(shù)
          ????exp?:=?r.failures[item]
          ????r.failures[item]?=?r.failures[item]?+?1
          ?
          ????//?通過錯誤次數(shù)計算延遲時間,公式是2^i?*?baseDelay,按指數(shù)遞增
          ????backoff?:=?float64(r.baseDelay.Nanoseconds())?*?math.Pow(2,?float64(exp))
          ????if?backoff?>?math.MaxInt64?{
          ????????//?最大延遲時間
          ????????return?r.maxDelay
          ????}

          ????//?計算后的延遲值和最大延遲值之間取最小值
          ????calculated?:=?time.Duration(backoff)
          ????if?calculated?>?r.maxDelay?{
          ????????return?r.maxDelay
          ????}
          ?
          ????return?calculated
          }

          //?元素錯誤次數(shù),直接從?failures?中取
          func?(r?*ItemExponentialFailureRateLimiter)?NumRequeues(item?interface{})?int?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????return?r.failures[item]
          }

          //??直接從?failures?刪除指定元素
          func?(r?*ItemExponentialFailureRateLimiter)?Forget(item?interface{})?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????delete(r.failures,?item)
          }

          從上面 When() 函數(shù)中的算法實現(xiàn)來看,該限速器是出現(xiàn)錯誤后不斷嘗試的過程,而且隨著嘗試次數(shù)的增加按照指數(shù)增加延遲時間

          ItemFastSlowRateLimiter

          ItemFastSlowRateLimiter (快慢限速器)和 ItemExponentialFailureRateLimiter 很像,都是用于錯誤嘗試的,但是 ItemFastSlowRateLimiter 的限速策略是嘗試次數(shù)超過閾值用長延遲,否則用短延遲,不過該限速器很少使用。

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?快慢限速器,先以?fastDelay?為周期進行嘗試,超過?maxFastAttempts?次數(shù)后,按照?slowDelay?為周期進行嘗試
          type?ItemFastSlowRateLimiter?struct?{
          ????failuresLock?sync.Mutex??????????
          ????//?錯誤次數(shù)計數(shù)
          ????failures?????map[interface{}]int?

          ????//?錯誤嘗試閾值
          ????maxFastAttempts?int????????
          ????//?短延遲時間??????
          ????fastDelay???????time.Duration????
          ????//?長延遲時間
          ????slowDelay???????time.Duration????
          }

          func?(r?*ItemFastSlowRateLimiter)?When(item?interface{})?time.Duration?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????//?一樣累加錯誤計數(shù)
          ????r.failures[item]?=?r.failures[item]?+?1

          ????//?錯誤次數(shù)還未超過閾值
          ????if?r.failures[item]?<=?r.maxFastAttempts?{
          ????????//?用短延遲
          ????????return?r.fastDelay
          ????}
          ????//?超過了用長延遲
          ???return?r.slowDelay
          }

          func?(r?*ItemFastSlowRateLimiter)?NumRequeues(item?interface{})?int?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????return?r.failures[item]
          }

          func?(r?*ItemFastSlowRateLimiter)?Forget(item?interface{})?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????delete(r.failures,?item)
          }

          MaxOfRateLimiter

          MaxOfRateLimiter 也可以叫混合限速器,他內(nèi)部有多個限速器,選擇所有限速器中速度最慢(延遲最大)的一種方案。比如內(nèi)部有三個限速器,When() 接口返回的就是三個限速器里面延遲最大的。

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?混合限速器,選擇所有限速器中速度最慢的一種方案
          type?MaxOfRateLimiter?struct?{
          ????//?限速器數(shù)組
          ????limiters?[]RateLimiter???
          }

          func?(r?*MaxOfRateLimiter)?When(item?interface{})?time.Duration?{
          ????ret?:=?time.Duration(0)
          ????//?獲取所有限速器里面時間最大的延遲時間
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????curr?:=?limiter.When(item)
          ????????if?curr?>?ret?{
          ????????????ret?=?curr
          ????????}
          ????}
          ????return?ret
          }

          func?(r?*MaxOfRateLimiter)?NumRequeues(item?interface{})?int?{
          ?ret?:=?0
          ????//?Requeues?次數(shù)也是取最大值
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????curr?:=?limiter.NumRequeues(item)
          ????????if?curr?>?ret?{
          ????????????ret?=?curr
          ????????}
          ????}
          ????return?ret
          }

          func?(r?*MaxOfRateLimiter)?Forget(item?interface{})?{
          ????//?循環(huán)遍歷?Forget
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????limiter.Forget(item)
          ????}
          }

          混合限速器的實現(xiàn)非常簡單,而在 Kubernetes 中默認的控制器限速器初始化就是使用的混合限速器:

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?實例化默認的限速器,由?ItemExponentialFailureRateLimiter?和
          //?BucketRateLimiter?組成的混合限速器
          func?DefaultControllerRateLimiter()?RateLimiter?{
          ?return?NewMaxOfRateLimiter(
          ??NewItemExponentialFailureRateLimiter(5*time.Millisecond,?1000*time.Second),
          ??//?10?qps,?100?bucket?容量
          ??&BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)},
          ?)
          }

          func?NewItemExponentialFailureRateLimiter(baseDelay?time.Duration,?maxDelay?time.Duration)?RateLimiter?{
          ?return?&ItemExponentialFailureRateLimiter{
          ??failures:??map[interface{}]int{},
          ??baseDelay:?baseDelay,
          ??maxDelay:??maxDelay,
          ?}
          }

          到這里我們就將限速隊列分析完了,接下來我們需要了解下 WorkQueue 在控制器中是如何使用的。


          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包(下圖只是部分),同時還包含學習建議:入門看什么,進階看什么。

          關(guān)注公眾號 「polarisxu」,回復?ebook?獲??;還可以回復「進群」,和數(shù)萬 Gopher 交流學習。



          瀏覽 78
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  福利色在线播放 | 欧美大黄片 | iGAO激情在线视频入口 | 亚洲熟妇性ⅩXXX交潮喷 | 吸咬奶头狂揉60分钟视频 |