<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kubernetes 源碼學(xué)習(xí)之限速隊(duì)列

          共 8116字,需瀏覽 17分鐘

           ·

          2020-09-28 06:55

          前面我們?cè)敿?xì)分析了 client-go 中的延遲隊(duì)列的實(shí)現(xiàn),接下來就是限速隊(duì)列的實(shí)現(xiàn),限速隊(duì)列在我們?nèi)粘?yīng)用中非常廣泛,其原理也比較簡(jiǎn)單,利用延遲隊(duì)列的特性,延遲某個(gè)元素的插入時(shí)間來達(dá)到限速的目的。

          所以限速隊(duì)列是擴(kuò)展的延遲隊(duì)列,在其基礎(chǔ)上增加了 AddRateLimitedForgetNumRequeues 3個(gè)方法。限速隊(duì)列接口的定義如下所示:

          //?k8s.io/client-go/util/workqueue/rate_limiting_queue.go

          //?RateLimitingInterface?是對(duì)加入隊(duì)列的元素進(jìn)行速率限制的接口
          type?RateLimitingInterface?interface?{
          ??//?延時(shí)隊(duì)列
          ?DelayingInterface?

          ?//?在限速器說ok后,將元素item添加到工作隊(duì)列中
          ?AddRateLimited(item?interface{})

          ?//?丟棄指定的元素
          ?Forget(item?interface{})

          ??//?查詢?cè)胤湃腙?duì)列的次數(shù)
          ?NumRequeues(item?interface{})?int
          }

          很明顯我們可以看出來限速隊(duì)列是在延時(shí)隊(duì)列基礎(chǔ)上進(jìn)行的擴(kuò)展,接下來我們查看下限速隊(duì)列的實(shí)現(xiàn)結(jié)構(gòu):

          //?k8s.io/client-go/util/workqueue/rate_limiting_queue.go

          //?限速隊(duì)列的實(shí)現(xiàn)
          type?rateLimitingType?struct?{
          ??//?同樣集成了延遲隊(duì)列
          ?DelayingInterface
          ??//?因?yàn)槭窍匏訇?duì)列,所以在里面定義一個(gè)限速器
          ?rateLimiter?RateLimiter
          }

          //?通過限速器獲取延遲時(shí)間,然后加入到延時(shí)隊(duì)列
          func?(q?*rateLimitingType)?AddRateLimited(item?interface{})?{
          ?q.DelayingInterface.AddAfter(item,?q.rateLimiter.When(item))
          }

          //?直接通過限速器獲取元素放入隊(duì)列的次數(shù)
          func?(q?*rateLimitingType)?NumRequeues(item?interface{})?int?{
          ?return?q.rateLimiter.NumRequeues(item)
          }

          //?直接通過限速器丟棄指定的元素
          func?(q?*rateLimitingType)?Forget(item?interface{})?{
          ?q.rateLimiter.Forget(item)
          }

          我們可以看到限速隊(duì)列的實(shí)現(xiàn)非常簡(jiǎn)單,就是通過包含的限速器去實(shí)現(xiàn)各種限速的功能,所以我們要真正去了解的是限速器的實(shí)現(xiàn)原理。

          限速器

          限速器當(dāng)然也是一種接口抽象,我們可以實(shí)現(xiàn)各種各樣的限速器,甚至不限速也可以,該接口定義如下所示:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          //?限速器接口
          type?RateLimiter?interface?{
          ?//?獲取指定的元素需要等待的時(shí)間
          ?When(item?interface{})?time.Duration
          ?//?釋放指定元素,表示該元素已經(jīng)被處理了
          ?Forget(item?interface{})
          ?//?返回某個(gè)對(duì)象被重新入隊(duì)多少次,監(jiān)控用
          ?NumRequeues(item?interface{})?int
          }

          可以看到和上面的限速隊(duì)列的擴(kuò)展接口方法非常類似,索引在實(shí)現(xiàn)的時(shí)候我們都是直接調(diào)用限速器對(duì)應(yīng)的實(shí)現(xiàn)方法。接下來我們來看看幾種限速器的具體實(shí)現(xiàn)。

          BucketRateLimiter

          第一個(gè)要了解的限速器使用非常頻繁 - BucketRateLimiter(令牌桶限速器),這是一個(gè)固定速率(qps)的限速器,該限速器是利用 golang.org/x/time/rate 庫來實(shí)現(xiàn)的,令牌桶算法內(nèi)部實(shí)現(xiàn)了一個(gè)存放 token(令牌)的“桶”,初始時(shí)“桶”是空的,token 會(huì)以固定速率往“桶”里填充,直到將其填滿為止,多余的 token 會(huì)被丟棄。每個(gè)元素都會(huì)從令牌桶得到一個(gè) token,只有得到 token 的元素才允許通過,而沒有得到 token 的元素處于等待狀態(tài)。令牌桶算法通過控制發(fā)放 token 來達(dá)到限速目的。

          比如抽獎(jiǎng)、搶優(yōu)惠、投票、報(bào)名……等場(chǎng)景,在面對(duì)突然到來的上百倍流量峰值,除了消息隊(duì)列,預(yù)留容量以外,我們可以考慮做峰值限流。因?yàn)閷?duì)于大部分營(yíng)銷類活動(dòng),消息限流(對(duì)被限流的消息直接丟棄,并直接回復(fù):“系統(tǒng)繁忙,請(qǐng)稍后再試。”)并不會(huì)對(duì)營(yíng)銷的結(jié)果有太大影響。

          令牌桶是有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定的速率向桶中放 Token,桶滿了就暫時(shí)不放了,而用戶則從桶中取 Token,如果有剩余的 Token 就可以一直取,如果沒有剩余的 Token,則需要等到系統(tǒng)中放置了 Token 才行。

          golang 中就自帶了一個(gè)令牌桶限速器的實(shí)現(xiàn),我們可以使用以下方法構(gòu)造一個(gè)限速器對(duì)象:

          limiter?:=?NewLimiter(10,?1);

          該構(gòu)造函數(shù)包含兩個(gè)參數(shù):

          1. 第一個(gè)參數(shù)是?r Limit。代表每秒可以向 Token 桶中產(chǎn)生多少 token,Limit 實(shí)際上是 float64 的別名。
          2. 第二個(gè)參數(shù)是?b int。b 代表 Token 桶的容量大小。

          上面我們構(gòu)造出的限速器含義就是,其令牌桶大小為 1,以每秒 10 個(gè) Token 的速率向桶中放置 Token。

          除了直接指定每秒產(chǎn)生的 Token 個(gè)數(shù)外,還可以用 Every 方法來指定向 Token 桶中放置 Token 的間隔,例如:

          limit?:=?Every(100?*?time.Millisecond)
          limiter?:=?NewLimiter(limit,?1)

          以上就表示每 100ms 往桶中放一個(gè) Token,本質(zhì)上也就是一秒鐘產(chǎn)生 10 個(gè)。

          Limiter 提供了三類方法供用戶消費(fèi) Token,用戶可以每次消費(fèi)一個(gè) Token,也可以一次性消費(fèi)多個(gè) Token。而每種方法代表了當(dāng) Token 不足時(shí),各自不同的對(duì)應(yīng)手段。

          Wait/WaitN

          func?(lim?*Limiter)?Wait(ctx?context.Context)?(err?error)
          func?(lim?*Limiter)?WaitN(ctx?context.Context,?n?int)?(err?error)

          Wait 實(shí)際上就是?WaitN(ctx,1)。當(dāng)使用 Wait 方法消費(fèi) Token 時(shí),如果此時(shí)桶內(nèi) Token 不足時(shí) (小于 N),那么 Wait 方法將會(huì)阻塞一段時(shí)間,直至 Token 滿足條件,當(dāng)然如果充足則直接返回。我們可以看到,Wait 方法有一個(gè) context 參數(shù),我們可以設(shè)置 context 的 Deadline 或者 Timeout,來決定此次 Wait 的最長(zhǎng)時(shí)間。

          Allow/AllowN

          func?(lim?*Limiter)?Allow()?bool
          func?(lim?*Limiter)?AllowN(now?time.Time,?n?int)?bool

          Allow 實(shí)際上就是?AllowN(time.Now(),1)。AllowN 方法表示,截止到某一時(shí)刻,目前桶中數(shù)目是否至少為 n 個(gè),滿足則返回 true,同時(shí)從桶中消費(fèi) n 個(gè) token。反之返回不消費(fèi) Token,false。通常對(duì)應(yīng)這樣的線上場(chǎng)景,如果請(qǐng)求速率過快,就直接丟到某些請(qǐng)求。

          Reserve/ReserveN

          func?(lim?*Limiter)?Reserve()?*Reservation
          func?(lim?*Limiter)?ReserveN(now?time.Time,?n?int)?*Reservation

          Reserve 相當(dāng)于?ReserveN(time.Now(), 1)。ReserveN 的用法就相對(duì)來說復(fù)雜一些,當(dāng)調(diào)用完成后,無論 Token 是否充足,都會(huì)返回一個(gè) Reservation 對(duì)象指針。

          你可以調(diào)用該對(duì)象的 Delay() 方法,該方法返回了需要等待的時(shí)間,如果等待時(shí)間為 0,則說明不用等待。必須等到等待時(shí)間之后,才能進(jìn)行接下來的工作。或者,如果不想等待,可以調(diào)用 Cancel() 方法,該方法會(huì)將 Token 歸還。

          動(dòng)態(tài)調(diào)整速率

          此外 Limiter 還支持調(diào)整速率和桶大小:

          1. SetLimit(Limit) 改變放入 Token 的速率
          2. SetBurst(int) 改變 Token 桶大小

          有了這兩個(gè)方法,可以根據(jù)現(xiàn)有環(huán)境和條件,根據(jù)我們的需求,動(dòng)態(tài)的改變 Token 桶大小和速率。

          令牌桶限速器實(shí)現(xiàn)

          了解了令牌桶如何使用后,接下來就可以直接查看下令牌桶限速器是如何實(shí)現(xiàn)的:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          //?令牌桶限速器,固定速率(qps)
          type?BucketRateLimiter?struct?{
          ??//?golang?自帶的?Limiter?
          ?*rate.Limiter
          }

          var?_?RateLimiter?=?&BucketRateLimiter{}

          func?(r?*BucketRateLimiter)?When(item?interface{})?time.Duration?{
          ?//?獲取需要等待的時(shí)間(延遲),而且這個(gè)延遲是一個(gè)相對(duì)固定的周期
          ??return?r.Limiter.Reserve().Delay()
          }

          func?(r?*BucketRateLimiter)?NumRequeues(item?interface{})?int?{
          ?//?固定頻率,不需要重試
          ??return?0
          }

          func?(r?*BucketRateLimiter)?Forget(item?interface{})?{
          ??//?不需要重試,因此也不需要忘記
          }

          令牌桶限速器里面直接包裝一個(gè)令牌桶 Limiter 對(duì)象,直接通過 Limiter.Reserve().Delay() 方法就可以獲取元素需要延遲的時(shí)間,再使用這個(gè)限速器的時(shí)候,默認(rèn)初始化參數(shù)為:

          //?k8s.io/client-go/util/workqueue/default_rate_limiter.go

          BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)}

          通過 rate.NewLimiter 實(shí)例化,傳入 r 和 b 兩個(gè)參數(shù),r 表示每秒往“”桶中填充 token 的數(shù)量,b 表示令牌桶的大小,這里可以看到默認(rèn)的參數(shù)為速率為10,即每秒放入10個(gè) bucket,桶容量大小為100。

          ItemExponentialFailureRateLimiter

          ItemExponentialFailureRateLimiter(指數(shù)增長(zhǎng)限速器) 是比較常用的限速器,從字面意思解釋是元素錯(cuò)誤次數(shù)指數(shù)遞增限速器,他會(huì)根據(jù)元素錯(cuò)誤次數(shù)逐漸累加等待時(shí)間,具體實(shí)現(xiàn)如下:

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?當(dāng)對(duì)象處理失敗的時(shí)候,其再次入隊(duì)的等待時(shí)間?×?2,到?MaxDelay?為止,直到超過最大失敗次數(shù)
          type?ItemExponentialFailureRateLimiter?struct?{
          ????//?修改失敗次數(shù)用到的鎖
          ????failuresLock?sync.Mutex???????????
          ????//?記錄每個(gè)元素錯(cuò)誤次數(shù)
          ????failures?????map[interface{}]int??
          ????//?元素延遲基數(shù)
          ????baseDelay?time.Duration???????????
          ????//?元素最大的延遲時(shí)間
          ????maxDelay??time.Duration???????????
          }

          //?實(shí)現(xiàn)限速器的When接口
          func?(r?*ItemExponentialFailureRateLimiter)?When(item?interface{})?time.Duration?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()

          ????//?累加錯(cuò)誤計(jì)數(shù)
          ????exp?:=?r.failures[item]
          ????r.failures[item]?=?r.failures[item]?+?1
          ?
          ????//?通過錯(cuò)誤次數(shù)計(jì)算延遲時(shí)間,公式是2^i?*?baseDelay,按指數(shù)遞增
          ????backoff?:=?float64(r.baseDelay.Nanoseconds())?*?math.Pow(2,?float64(exp))
          ????if?backoff?>?math.MaxInt64?{
          ????????//?最大延遲時(shí)間
          ????????return?r.maxDelay
          ????}

          ????//?計(jì)算后的延遲值和最大延遲值之間取最小值
          ????calculated?:=?time.Duration(backoff)
          ????if?calculated?>?r.maxDelay?{
          ????????return?r.maxDelay
          ????}
          ?
          ????return?calculated
          }

          //?元素錯(cuò)誤次數(shù),直接從?failures?中取
          func?(r?*ItemExponentialFailureRateLimiter)?NumRequeues(item?interface{})?int?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????return?r.failures[item]
          }

          //??直接從?failures?刪除指定元素
          func?(r?*ItemExponentialFailureRateLimiter)?Forget(item?interface{})?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????delete(r.failures,?item)
          }

          從上面 When() 函數(shù)中的算法實(shí)現(xiàn)來看,該限速器是出現(xiàn)錯(cuò)誤后不斷嘗試的過程,而且隨著嘗試次數(shù)的增加按照指數(shù)增加延遲時(shí)間

          ItemFastSlowRateLimiter

          ItemFastSlowRateLimiter (快慢限速器)和 ItemExponentialFailureRateLimiter 很像,都是用于錯(cuò)誤嘗試的,但是 ItemFastSlowRateLimiter 的限速策略是嘗試次數(shù)超過閾值用長(zhǎng)延遲,否則用短延遲,不過該限速器很少使用。

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?快慢限速器,先以?fastDelay?為周期進(jìn)行嘗試,超過?maxFastAttempts?次數(shù)后,按照?slowDelay?為周期進(jìn)行嘗試
          type?ItemFastSlowRateLimiter?struct?{
          ????failuresLock?sync.Mutex??????????
          ????//?錯(cuò)誤次數(shù)計(jì)數(shù)
          ????failures?????map[interface{}]int?

          ????//?錯(cuò)誤嘗試閾值
          ????maxFastAttempts?int????????
          ????//?短延遲時(shí)間??????
          ????fastDelay???????time.Duration????
          ????//?長(zhǎng)延遲時(shí)間
          ????slowDelay???????time.Duration????
          }

          func?(r?*ItemFastSlowRateLimiter)?When(item?interface{})?time.Duration?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????//?一樣累加錯(cuò)誤計(jì)數(shù)
          ????r.failures[item]?=?r.failures[item]?+?1

          ????//?錯(cuò)誤次數(shù)還未超過閾值
          ????if?r.failures[item]?<=?r.maxFastAttempts?{
          ????????//?用短延遲
          ????????return?r.fastDelay
          ????}
          ????//?超過了用長(zhǎng)延遲
          ???return?r.slowDelay
          }

          func?(r?*ItemFastSlowRateLimiter)?NumRequeues(item?interface{})?int?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????return?r.failures[item]
          }

          func?(r?*ItemFastSlowRateLimiter)?Forget(item?interface{})?{
          ????r.failuresLock.Lock()
          ????defer?r.failuresLock.Unlock()
          ?
          ????delete(r.failures,?item)
          }

          MaxOfRateLimiter

          MaxOfRateLimiter 也可以叫混合限速器,他內(nèi)部有多個(gè)限速器,選擇所有限速器中速度最慢(延遲最大)的一種方案。比如內(nèi)部有三個(gè)限速器,When() 接口返回的就是三個(gè)限速器里面延遲最大的。

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?混合限速器,選擇所有限速器中速度最慢的一種方案
          type?MaxOfRateLimiter?struct?{
          ????//?限速器數(shù)組
          ????limiters?[]RateLimiter???
          }

          func?(r?*MaxOfRateLimiter)?When(item?interface{})?time.Duration?{
          ????ret?:=?time.Duration(0)
          ????//?獲取所有限速器里面時(shí)間最大的延遲時(shí)間
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????curr?:=?limiter.When(item)
          ????????if?curr?>?ret?{
          ????????????ret?=?curr
          ????????}
          ????}
          ????return?ret
          }

          func?(r?*MaxOfRateLimiter)?NumRequeues(item?interface{})?int?{
          ?ret?:=?0
          ????//?Requeues?次數(shù)也是取最大值
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????curr?:=?limiter.NumRequeues(item)
          ????????if?curr?>?ret?{
          ????????????ret?=?curr
          ????????}
          ????}
          ????return?ret
          }

          func?(r?*MaxOfRateLimiter)?Forget(item?interface{})?{
          ????//?循環(huán)遍歷?Forget
          ????for?_,?limiter?:=?range?r.limiters?{
          ????????limiter.Forget(item)
          ????}
          }

          混合限速器的實(shí)現(xiàn)非常簡(jiǎn)單,而在 Kubernetes 中默認(rèn)的控制器限速器初始化就是使用的混合限速器:

          //?k8s.io/client-go/util/workqueue/default_rate_limiters.go

          //?實(shí)例化默認(rèn)的限速器,由?ItemExponentialFailureRateLimiter?和
          //?BucketRateLimiter?組成的混合限速器
          func?DefaultControllerRateLimiter()?RateLimiter?{
          ?return?NewMaxOfRateLimiter(
          ??NewItemExponentialFailureRateLimiter(5*time.Millisecond,?1000*time.Second),
          ??//?10?qps,?100?bucket?容量
          ??&BucketRateLimiter{Limiter:?rate.NewLimiter(rate.Limit(10),?100)},
          ?)
          }

          func?NewItemExponentialFailureRateLimiter(baseDelay?time.Duration,?maxDelay?time.Duration)?RateLimiter?{
          ?return?&ItemExponentialFailureRateLimiter{
          ??failures:??map[interface{}]int{},
          ??baseDelay:?baseDelay,
          ??maxDelay:??maxDelay,
          ?}
          }

          到這里我們就將限速隊(duì)列分析完了,接下來我們需要了解下 WorkQueue 在控制器中是如何使用的。




          K8S進(jìn)階訓(xùn)練營(yíng),點(diǎn)擊下方圖片了解詳情


          瀏覽 113
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国内精品在线播放 | 久久久久理论 | 影音先锋av男人站 | 欧美噜噜噜 | 亚洲视频在线视频看视频在线 |