<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kubernetes client-go 源碼分析 - workqueue

          共 6991字,需瀏覽 14分鐘

           ·

          2021-10-14 00:17


          概述

          源碼版本信息

          • Project: kubernetes
          • Branch: master
          • Last commit id: d25d741c
          • Date: 2021-09-26

          自定義控制器涉及到的 client-go 組件整體工作流程,大致如下圖:

          今天我們來詳細(xì)研究下 workqueue 相關(guān)代碼。client-go 的 util/workqueue 包里主要有三個(gè)隊(duì)列,分別是普通隊(duì)列,延時(shí)隊(duì)列,限速隊(duì)列,后一個(gè)隊(duì)列以前一個(gè)隊(duì)列的實(shí)現(xiàn)為基礎(chǔ),層層添加新功能,我們按照 Queue、DelayingQueue、RateLimitingQueue 的順序?qū)訉訐荛_來看限速隊(duì)列是如何實(shí)現(xiàn)的。

          Queue

          接口和結(jié)構(gòu)體

          先看接口定義:

          • k8s.io/client-go/util/workqueue/queue.go:26
          type?Interface?interface?{
          ???Add(item?interface{})??//?添加一個(gè)元素
          ???Len()?int??????????????//?元素個(gè)數(shù)
          ???Get()?(item?interface{},?shutdown?bool)?//?獲取一個(gè)元素,第二個(gè)返回值和?channel?類似,標(biāo)記隊(duì)列是否關(guān)閉了
          ???Done(item?interface{})?//?標(biāo)記一個(gè)元素已經(jīng)處理完
          ???ShutDown()?????????????//?關(guān)閉隊(duì)列
          ???ShuttingDown()?bool????//?是否正在關(guān)閉
          }

          這個(gè)基礎(chǔ)的隊(duì)列接口定義很清晰,我們繼續(xù)來看其實(shí)現(xiàn)的類型:

          type?Type?struct?{
          ???queue?[]t????????????//?定義元素的處理順序,里面所有元素都應(yīng)該在?dirty?set?中有,而不能出現(xiàn)在?processing?set?中
          ???dirty?set????????????//?標(biāo)記所有需要被處理的元素
          ???processing?set???????//?當(dāng)前正在被處理的元素,當(dāng)處理完后需要檢查該元素是否在?dirty?set?中,如果有則添加到?queue?里

          ???cond?*sync.Cond??????//?條件鎖
          ???shuttingDown?bool????//?是否正在關(guān)閉
          ???metrics?queueMetrics
          ???unfinishedWorkUpdatePeriod?time.Duration
          ???clock??????????????????????clock.Clock
          }

          Queue 的工作邏輯大致是這樣,里面的三個(gè)屬性 queue、dirty、processing 都保存 items,但是含義有所不同:

          • queue:這是一個(gè) []t 類型,也就是一個(gè)切片,因?yàn)槠溆行颍赃@里當(dāng)作一個(gè)列表來存儲(chǔ) item 的處理順序。
          • dirty:這是一個(gè) set 類型,也就是一個(gè)集合,這個(gè)集合存儲(chǔ)的是所有需要處理的 item,這些 item 也會(huì)保存在 queue 中,但是 set 里是無需的,set 的特性是唯一。
          • processing:這也是一個(gè) set,存放的是當(dāng)前正在處理的 item,也就是說這個(gè) item 來自 queue 出隊(duì)的元素,同時(shí)這個(gè)元素會(huì)被從 dirty 中刪除。

          下面分別介紹 set 類型和 Queue 接口的集合核心方法的實(shí)現(xiàn)。

          set

          上面提到的 dirty 和 processing 字段都是 set 類型,set 相關(guān)定義如下:

          type?empty?struct{}
          type?t?interface{}
          type?set?map[t]empty

          func?(s?set)?has(item?t)?bool?{
          ???_,?exists?:=?s[item]
          ???return?exists
          }

          func?(s?set)?insert(item?t)?{
          ???s[item]?=?empty{}
          }

          func?(s?set)?delete(item?t)?{
          ???delete(s,?item)
          }

          set 是一個(gè)空接口到空結(jié)構(gòu)體的 map,也就是實(shí)現(xiàn)了一個(gè)集合的功能,集合元素是 interface{} 類型,也就是可以存儲(chǔ)任意類型。而 map 的 value 是 struct{} 類型,也就是空。這里利用 map 的 key 唯一的特性實(shí)現(xiàn)了一個(gè)集合類型,附帶三個(gè)方法 has()、insert()delete() 來實(shí)現(xiàn)集合相關(guān)操作。

          Add()

          Add() 方法用于標(biāo)記一個(gè) item 需要被處理,代碼如下:

          func?(q?*Type)?Add(item?interface{})?{
          ???q.cond.L.Lock()
          ???defer?q.cond.L.Unlock()
          ???if?q.shuttingDown?{?//?如果?queue?正在被關(guān)閉,則返回
          ??????return
          ???}
          ???if?q.dirty.has(item)?{?//?如果?dirty?set?中已經(jīng)有了該?item,則返回
          ??????return
          ???}

          ???q.metrics.add(item)

          ???q.dirty.insert(item)?//?添加到?dirty?set?中
          ???if?q.processing.has(item)?{?//?如果正在被處理,則返回
          ??????return
          ???}

          ???q.queue?=?append(q.queue,?item)?//?如果沒有正在處理,則加到?q.queue?中
          ???q.cond.Signal()?//?通知某個(gè)?getter?有新?item?到來
          }

          Get()

          func?(q?*Type)?Get()?(item?interface{},?shutdown?bool)?{
          ???q.cond.L.Lock()
          ???defer?q.cond.L.Unlock()
          ???for?len(q.queue)?==?0?&&?!q.shuttingDown?{?//?如果?q.queue?為空,并且沒有正在關(guān)閉,則等待下一個(gè)?item?的到來
          ??????q.cond.Wait()
          ???}
          ???if?len(q.queue)?==?0?{?//?這時(shí)候如果?q.queue?長(zhǎng)度還是?0,說明?q.shuttingDown?為?true,所以直接返回
          ??????return?nil,?true
          ???}

          ???item,?q.queue?=?q.queue[0],?q.queue[1:]?//?獲取?q.queue?第一個(gè)元素,同時(shí)更新?q.queue

          ???q.metrics.get(item)

          ???q.processing.insert(item)?//?剛才獲取到的?q.queue?第一個(gè)元素放到?processing?set?中
          ???q.dirty.delete(item)?//?dirty?set?中刪除該元素

          ???return?item,?false?//?返回?item
          }

          Done()

          func?(q?*Type)?Done(item?interface{})?{
          ???q.cond.L.Lock()
          ???defer?q.cond.L.Unlock()

          ???q.metrics.done(item)

          ???q.processing.delete(item)?//?processing?set?中刪除該?item
          ???if?q.dirty.has(item)?{?//?如果?dirty?中還有,說明還需要再次處理,放到?q.queue?中
          ??????q.queue?=?append(q.queue,?item)
          ??????q.cond.Signal()?//?通知某個(gè)?getter?有新的?item
          ???}
          }

          DelayingQueue

          接口和結(jié)構(gòu)體

          還是先看接口定義:

          • k8s.io/client-go/util/workqueue/delaying_queue.go:30
          type?DelayingInterface?interface?{
          ???Interface
          ???//?AddAfter?adds?an?item?to?the?workqueue?after?the?indicated?duration?has?passed
          ???AddAfter(item?interface{},?duration?time.Duration)
          }

          相比 Queue 這里只是多了一個(gè) AddAfter(item interface{}, duration time.Duration) 方法,望文生義,也就是延時(shí)添加 item。

          結(jié)構(gòu)體定義:

          type?delayingType?struct?{
          ???Interface???????????????//?用來嵌套普通?Queue
          ???clock?clock.Clock???????//?計(jì)時(shí)器
          ???stopCh?chan?struct{}
          ???stopOnce?sync.Once??????//?用來確保?ShutDown()?方法只執(zhí)行一次
          ???heartbeat?clock.Ticker??//?默認(rèn)10s的心跳,后面用在一個(gè)大循環(huán)里,避免沒有新?item?時(shí)一直卡住
          ???waitingForAddCh?chan?*waitFor??//?傳遞?waitFor?的?channel,默認(rèn)大小?1000
          ???metrics?retryMetrics
          }

          對(duì)于延時(shí)隊(duì)列,我們關(guān)注的入口方法肯定就是新增的 AddAfter() 了,看這個(gè)方法的具體的邏輯前我們先看下上面提到的 waitFor 類型。

          waitFor

          先看下 waitFor 結(jié)構(gòu)定義,代碼如下:

          type?waitFor?struct?{
          ???data????t??????????//?準(zhǔn)備添加到隊(duì)列中的數(shù)據(jù)
          ???readyAt?time.Time??//?應(yīng)該被加入隊(duì)列的時(shí)間
          ???index?int??????????//?在?heap?中的索引
          }

          然后可以注意到有這樣一行代碼:

          type?waitForPriorityQueue?[]*waitFor

          這里定義了一個(gè) waitFor 的優(yōu)先級(jí)隊(duì)列,用最小堆的方式來實(shí)現(xiàn),這個(gè)類型實(shí)現(xiàn)了 heap.Interface 接口,我們具體看下源碼:

          //?添加一個(gè)?item?到隊(duì)列中
          func?(pq?*waitForPriorityQueue)?Push(x?interface{})?{
          ???n?:=?len(*pq)
          ???item?:=?x.(*waitFor)
          ???item.index?=?n
          ???*pq?=?append(*pq,?item)?//?添加到隊(duì)列的尾巴
          }

          //?從隊(duì)列尾巴移除一個(gè)?item
          func?(pq?*waitForPriorityQueue)?Pop()?interface{}?{
          ???n?:=?len(*pq)
          ???item?:=?(*pq)[n-1]
          ???item.index?=?-1
          ???*pq?=?(*pq)[0:(n?-?1)]
          ???return?item
          }

          //?獲取隊(duì)列第一個(gè)?item
          func?(pq?waitForPriorityQueue)?Peek()?interface{}?{
          ???return?pq[0]
          }

          NewDelayingQueue

          接著看一下 DelayingQueue 相關(guān)的幾個(gè) New 函數(shù),理解了這里的邏輯,才能繼續(xù)往后面分析 AddAfter() 方法。

          //?這里可以傳遞一個(gè)名字
          func?NewNamedDelayingQueue(name?string)?DelayingInterface?{
          ???return?NewDelayingQueueWithCustomClock(clock.RealClock{},?name)
          }

          //?上面一個(gè)函數(shù)只是調(diào)用當(dāng)前函數(shù),附帶一個(gè)名字,這里加了一個(gè)指定?clock?的能力
          func?NewDelayingQueueWithCustomClock(clock?clock.Clock,?name?string)?DelayingInterface?{
          ??return?newDelayingQueue(clock,?NewNamed(name),?name)?//?注意這里的?NewNamed()?函數(shù)
          }

          func?newDelayingQueue(clock?clock.Clock,?q?Interface,?name?string)?*delayingType?{
          ???ret?:=?&delayingType{
          ??????Interface:???????q,
          ??????clock:???????????clock,
          ??????heartbeat:???????clock.NewTicker(maxWait),?//?10s?一次心跳
          ??????stopCh:??????????make(chan?struct{}),
          ??????waitingForAddCh:?make(chan?*waitFor,?1000),
          ??????metrics:?????????newRetryMetrics(name),
          ???}

          ???go?ret.waitingLoop()?//?留意這里的函數(shù)調(diào)用
          ???return?ret
          }

          上面涉及到兩個(gè)細(xì)節(jié):

          • NewNamed(name)
          • go ret.waitingLoop()

          NewNamed() 函數(shù)用于創(chuàng)建一個(gè)前面提到的 Queue 的對(duì)應(yīng)類型 Type 對(duì)象,這個(gè)值被傳遞給了 newDelayingQueue() 函數(shù),進(jìn)而賦值給了 delayingType{} 對(duì)象的 Interface 字段,于是后面 delayingType 類型才能直接調(diào)用 Type 類型實(shí)現(xiàn)的方法。

          func?NewNamed(name?string)?*Type?{
          ???rc?:=?clock.RealClock{}
          ???return?newQueue(
          ??????rc,
          ??????globalMetricsFactory.newQueueMetrics(name,?rc),
          ??????defaultUnfinishedWorkUpdatePeriod,
          ???)
          }

          waitingLoop() 方法邏輯不少,我們單獨(dú)放到下面一個(gè)小節(jié)。

          waitingLoop()

          這個(gè)方法是實(shí)現(xiàn)延時(shí)隊(duì)列的核心邏輯所在:

          func?(q?*delayingType)?waitingLoop()?{
          ???defer?utilruntime.HandleCrash()
          ???//?隊(duì)列里沒有?item?時(shí)實(shí)現(xiàn)等待用的
          ???never?:=?make(<-chan?time.Time)
          ???var?nextReadyAtTimer?clock.Timer
          ???//?構(gòu)造一個(gè)有序隊(duì)列
          ???waitingForQueue?:=?&waitForPriorityQueue{}
          ???heap.Init(waitingForQueue)?//?這一行其實(shí)是多余的,等下提個(gè)?pr?給它刪掉

          ???//?這個(gè)?map?用來處理重復(fù)添加邏輯的,下面會(huì)講到
          ???waitingEntryByData?:=?map[t]*waitFor{}
          ???//?無限循環(huán)
          ???for?{
          ??????//?這個(gè)地方?Interface?是多余的,等下也提個(gè)?pr?把它刪掉吧
          ??????if?q.Interface.ShuttingDown()?{
          ?????????return
          ??????}

          ??????now?:=?q.clock.Now()
          ??????//?隊(duì)列里有?item?就開始循環(huán)
          ??????for?waitingForQueue.Len()?>?0?{
          ?????????//?獲取第一個(gè)?item
          ?????????entry?:=?waitingForQueue.Peek().(*waitFor)
          ?????????//?時(shí)間還沒到,先不處理
          ?????????if?entry.readyAt.After(now)?{
          ????????????break
          ?????????}
          ????????//?時(shí)間到了,pop 出第一個(gè)元素;注意 waitingForQueue.Pop()?是最后一個(gè) item,heap.Pop()?是第一個(gè)元素
          ?????????entry?=?heap.Pop(waitingForQueue).(*waitFor)
          ?????????//?將數(shù)據(jù)加到延時(shí)隊(duì)列里
          ?????????q.Add(entry.data)
          ?????????//?map?里刪除已經(jīng)加到延時(shí)隊(duì)列的?item
          ?????????delete(waitingEntryByData,?entry.data)
          ??????}

          ??????//?如果隊(duì)列中有?item,就用第一個(gè)?item?的等待時(shí)間初始化計(jì)時(shí)器,如果為空則一直等待
          ??????nextReadyAt?:=?never
          ??????if?waitingForQueue.Len()?>?0?{
          ?????????if?nextReadyAtTimer?!=?nil?{
          ????????????nextReadyAtTimer.Stop()
          ?????????}
          ?????????entry?:=?waitingForQueue.Peek().(*waitFor)
          ?????????nextReadyAtTimer?=?q.clock.NewTimer(entry.readyAt.Sub(now))
          ?????????nextReadyAt?=?nextReadyAtTimer.C()
          ??????}

          ??????select?{
          ??????case?<-q.stopCh:
          ?????????return
          ??????case?<-q.heartbeat.C():?//?心跳時(shí)間是?10s,到了就繼續(xù)下一輪循環(huán)
          ??????case?<-nextReadyAt:?//?第一個(gè)?item?的等到時(shí)間到了,繼續(xù)下一輪循環(huán)
          ??????case?waitEntry?:=?<-q.waitingForAddCh:?//?waitingForAddCh?收到新的?item
          ?????????//?如果時(shí)間沒到,就加到優(yōu)先級(jí)隊(duì)列里,如果時(shí)間到了,就直接加到延時(shí)隊(duì)列里
          ?????????if?waitEntry.readyAt.After(q.clock.Now())?{
          ????????????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
          ?????????}?else?{
          ????????????q.Add(waitEntry.data)
          ?????????}
          ?????????//?下面的邏輯就是將?waitingForAddCh?中的數(shù)據(jù)處理完
          ?????????drained?:=?false
          ?????????for?!drained?{
          ????????????select?{
          ????????????case?waitEntry?:=?<-q.waitingForAddCh:
          ???????????????if?waitEntry.readyAt.After(q.clock.Now())?{
          ??????????????????insert(waitingForQueue,?waitingEntryByData,?waitEntry)
          ???????????????}?else?{
          ??????????????????q.Add(waitEntry.data)
          ???????????????}
          ????????????default:
          ???????????????drained?=?true
          ????????????}
          ?????????}
          ??????}
          ???}
          }

          上面函數(shù)還有一個(gè) insert() 調(diào)用,我們?cè)賮砜匆幌逻@個(gè)插入邏輯:

          func?insert(q?*waitForPriorityQueue,?knownEntries?map[t]*waitFor,?entry?*waitFor)?{
          ???//?這里的主要邏輯是看一個(gè)?entry?是否存在,如果已經(jīng)存在,新的?entry?的?ready?時(shí)間更短,就更新時(shí)間
          ???existing,?exists?:=?knownEntries[entry.data]
          ???if?exists?{
          ??????if?existing.readyAt.After(entry.readyAt)?{
          ?????????existing.readyAt?=?entry.readyAt?//?如果存在就只更新時(shí)間
          ?????????heap.Fix(q,?existing.index)
          ??????}

          ??????return
          ???}
          ???//?如果不存在就丟到?q?里,同時(shí)在?map?里記錄一下,用于查重
          ???heap.Push(q,?entry)
          ???knownEntries[entry.data]?=?entry
          }

          AddAfter()

          這個(gè)方法的作用是在指定的延時(shí)到達(dá)之后,在 workqueue 中添加一個(gè)元素,源碼如下:

          func?(q?*delayingType)?AddAfter(item?interface{},?duration?time.Duration)?{
          ???if?q.ShuttingDown()?{?//?已經(jīng)在關(guān)閉中就直接返回
          ??????return
          ???}

          ???q.metrics.retry()

          ???if?duration?<=?0?{?//?如果時(shí)間到了,就直接添加
          ??????q.Add(item)
          ??????return
          ???}

          ???select?{
          ???case?<-q.stopCh:
          ?????//?構(gòu)造?waitFor{},丟到?waitingForAddCh
          ???case?q.waitingForAddCh?<-?&waitFor{data:?item,?readyAt:?q.clock.Now().Add(duration)}:
          ???}
          }

          RateLimitingQueue

          最后一個(gè) workqueue 就是限速隊(duì)列,我們繼續(xù)來看。

          接口和結(jié)構(gòu)體

          先看接口定義:

          • k8s.io/client-go/util/workqueue/rate_limiting_queue.go:20
          type?RateLimitingInterface?interface?{
          ???DelayingInterface???????????????????//?延時(shí)隊(duì)列里內(nèi)嵌了普通隊(duì)列,限速隊(duì)列里內(nèi)嵌了延時(shí)隊(duì)列
          ???AddRateLimited(item?interface{})????//?限速方式往隊(duì)列里加入一個(gè)元素
          ???Forget(item?interface{})????????????//?標(biāo)識(shí)一個(gè)元素結(jié)束重試
          ???NumRequeues(item?interface{})?int???//?標(biāo)識(shí)這個(gè)元素被處理里多少次了
          }

          然后看下兩個(gè) New 函數(shù)。

          func?NewRateLimitingQueue(rateLimiter?RateLimiter)?RateLimitingInterface?{
          ???return?&rateLimitingType{
          ??????DelayingInterface:?NewDelayingQueue(),
          ??????rateLimiter:???????rateLimiter,
          ???}
          }

          func?NewNamedRateLimitingQueue(rateLimiter?RateLimiter,?name?string)?RateLimitingInterface?{
          ???return?&rateLimitingType{
          ??????DelayingInterface:?NewNamedDelayingQueue(name),
          ??????rateLimiter:???????rateLimiter,
          ???}
          }

          這里的區(qū)別就是里面的延時(shí)隊(duì)列有沒有指定的名字。注意到這里有一個(gè) RateLimiter 類型,后面要詳細(xì)講,另外 rateLimitingType 就是上面接口的具體實(shí)現(xiàn)類型了。

          RateLimiter

          RateLimiter 表示一個(gè)限速器,我們看下限速器是什么意思。先看接口定義:

          • k8s.io/client-go/util/workqueue/default_rate_limiters.go:27
          type?RateLimiter?interface?{
          ???When(item?interface{})?time.Duration?//?返回一個(gè)?item?需要等待的時(shí)常
          ???Forget(item?interface{})?????????????//?標(biāo)識(shí)一個(gè)元素結(jié)束重試
          ???NumRequeues(item?interface{})?int????//?標(biāo)識(shí)這個(gè)元素被處理里多少次了
          }

          這個(gè)接口有五個(gè)實(shí)現(xiàn),分別叫做:

          • BucketRateLimiter
          • ItemExponentialFailureRateLimiter
          • ItemFastSlowRateLimiter
          • MaxOfRateLimiter
          • WithMaxWaitRateLimiter

          下面分別來看:

          • BucketRateLimiter 這個(gè)限速器可說的不多,用了 golang 標(biāo)準(zhǔn)庫(kù)的 golang.org/x/time/rate.Limiter 實(shí)現(xiàn)。BucketRateLimiter 實(shí)例化的時(shí)候比如傳遞一個(gè) rate.NewLimiter(rate.Limit(10), 100) 進(jìn)去,表示令牌桶里最多有 100 個(gè)令牌,每秒發(fā)放 10 個(gè)令牌。
          type?BucketRateLimiter?struct?{
          ???*rate.Limiter
          }

          var?_?RateLimiter?=?&BucketRateLimiter{}

          func?(r?*BucketRateLimiter)?When(item?interface{})?time.Duration?{
          ???return?r.Limiter.Reserve().Delay()?//?過多久后給當(dāng)前?item?發(fā)放一個(gè)令牌
          }

          func?(r?*BucketRateLimiter)?NumRequeues(item?interface{})?int?{
          ???return?0
          }

          func?(r?*BucketRateLimiter)?Forget(item?interface{})?{
          }
          • ItemExponentialFailureRateLimiter

            Exponential 是指數(shù)的意思,從這個(gè)限速器的名字大概能猜到是失敗次數(shù)越多,限速越長(zhǎng)而且是指數(shù)級(jí)增長(zhǎng)的一種限速器。

            結(jié)構(gòu)體定義如下,屬性含義基本可以望文生義。

          type?ItemExponentialFailureRateLimiter?struct?{
          ???failuresLock?sync.Mutex
          ???failures?????map[interface{}]int

          ???baseDelay?time.Duration
          ???maxDelay??time.Duration
          }

          主要邏輯是 When() 函數(shù)是如何實(shí)現(xiàn)的:

          func?(r?*ItemExponentialFailureRateLimiter)?When(item?interface{})?time.Duration?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???exp?:=?r.failures[item]
          ???r.failures[item]?=?r.failures[item]?+?1?//?失敗次數(shù)加一

          ???//?每調(diào)用一次,exp?也就加了1,對(duì)應(yīng)到這里時(shí)?2^n?指數(shù)爆炸
          ???backoff?:=?float64(r.baseDelay.Nanoseconds())?*?math.Pow(2,?float64(exp))
          ???if?backoff?>?math.MaxInt64?{?//?如果超過了最大整型,就返回最大延時(shí),不然后面時(shí)間轉(zhuǎn)換溢出了
          ??????return?r.maxDelay
          ???}

          ???calculated?:=?time.Duration(backoff)
          ???if?calculated?>?r.maxDelay?{?//?如果超過最大延時(shí),則返回最大延時(shí)
          ??????return?r.maxDelay
          ???}

          ???return?calculated
          }

          另外兩個(gè)函數(shù)太簡(jiǎn)單了:

          func?(r?*ItemExponentialFailureRateLimiter)?NumRequeues(item?interface{})?int?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???return?r.failures[item]
          }

          func?(r?*ItemExponentialFailureRateLimiter)?Forget(item?interface{})?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???delete(r.failures,?item)
          }
          • ItemFastSlowRateLimiter 快慢限速器,也就是先快后慢,定義一個(gè)閾值,超過了就慢慢重試。先看類型定義:
          type?ItemFastSlowRateLimiter?struct?{
          ???failuresLock?sync.Mutex
          ???failures?????map[interface{}]int

          ???maxFastAttempts?int????????????//?快速重試的次數(shù)
          ???fastDelay???????time.Duration??//?快重試間隔
          ???slowDelay???????time.Duration??//?慢重試間隔
          }

          同樣繼續(xù)來看具體的方法實(shí)現(xiàn):

          func?(r?*ItemFastSlowRateLimiter)?When(item?interface{})?time.Duration?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???r.failures[item]?=?r.failures[item]?+?1?//?標(biāo)識(shí)重試次數(shù)?+?1

          ???if?r.failures[item]?<=?r.maxFastAttempts?{?//?如果快重試次數(shù)沒有用完,則返回?fastDelay
          ??????return?r.fastDelay
          ???}

          ???return?r.slowDelay?//?反之返回?slowDelay
          }

          func?(r?*ItemFastSlowRateLimiter)?NumRequeues(item?interface{})?int?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???return?r.failures[item]
          }

          func?(r?*ItemFastSlowRateLimiter)?Forget(item?interface{})?{
          ???r.failuresLock.Lock()
          ???defer?r.failuresLock.Unlock()

          ???delete(r.failures,?item)
          }
          • MaxOfRateLimiter 這個(gè)限速器看著有點(diǎn)樂呵人,內(nèi)部放多個(gè)限速器,然后返回限速最狠的一個(gè)延時(shí):
          type?MaxOfRateLimiter?struct?{
          ???limiters?[]RateLimiter
          }

          func?(r?*MaxOfRateLimiter)?When(item?interface{})?time.Duration?{
          ???ret?:=?time.Duration(0)
          ???for?_,?limiter?:=?range?r.limiters?{
          ??????curr?:=?limiter.When(item)
          ??????if?curr?>?ret?{
          ?????????ret?=?curr
          ??????}
          ???}

          ???return?ret
          }
          • WithMaxWaitRateLimiter 這個(gè)限速器也很簡(jiǎn)單,就是在其他限速器上包裝一個(gè)最大延遲的屬性,如果到了最大延時(shí),則直接返回:
          type?WithMaxWaitRateLimiter?struct?{
          ???limiter??RateLimiter???//?其他限速器
          ???maxDelay?time.Duration?//?最大延時(shí)
          }

          func?NewWithMaxWaitRateLimiter(limiter?RateLimiter,?maxDelay?time.Duration)?RateLimiter?{
          ???return?&WithMaxWaitRateLimiter{limiter:?limiter,?maxDelay:?maxDelay}
          }

          func?(w?WithMaxWaitRateLimiter)?When(item?interface{})?time.Duration?{
          ???delay?:=?w.limiter.When(item)
          ???if?delay?>?w.maxDelay?{
          ??????return?w.maxDelay?//?已經(jīng)超過了最大延時(shí),直接返回最大延時(shí)
          ???}

          ???return?delay
          }

          限速隊(duì)列的實(shí)現(xiàn)

          看完了上面的限速器的概念,限速隊(duì)列的實(shí)現(xiàn)就很簡(jiǎn)單了:

          func?(q?*rateLimitingType)?AddRateLimited(item?interface{})?{
          ???//?內(nèi)部存了一個(gè)延時(shí)隊(duì)列,通過限速器計(jì)算出一個(gè)等待時(shí)間,然后傳給延時(shí)隊(duì)列
          ???q.DelayingInterface.AddAfter(item,?q.rateLimiter.When(item))
          }

          func?(q?*rateLimitingType)?NumRequeues(item?interface{})?int?{
          ???return?q.rateLimiter.NumRequeues(item)
          }

          func?(q?*rateLimitingType)?Forget(item?interface{})?{
          ???q.rateLimiter.Forget(item)
          }


          瀏覽 52
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  www.操| 翔田千里成人AV片 | 久aaa| 久久久久电影 | 日韩最新高清无码 |