<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 第三方庫(kù)源碼分析:juju/ratelimit

          共 6371字,需瀏覽 13分鐘

           ·

          2021-09-13 20:14

          https://github.com/juju/ratelimit 是一個(gè)基于令牌桶算法的限流器:令牌桶就是想象有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定速率向桶中放 Token,桶滿則暫時(shí)不放。漏桶算法和令牌桶算法的主要區(qū)別在于,"漏桶算法"能夠強(qiáng)行限制數(shù)據(jù)的傳輸速率(或請(qǐng)求頻率),而"令牌桶算法"在能夠限制數(shù)據(jù)的平均傳輸速率外,還允許某種程度的突發(fā)傳輸。


          首先看下如何使用:

          import "github.com/juju/ratelimit"
          var tokenBucket ratelimit.Bucket = nil
          func init() { // func NewBucket(fillInterval time.Duration, capacity int64) *Bucket // fillInterval令牌填充的時(shí)間間隔 // capacity令牌桶的最大容量 tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)}
          func Handler() { available := tokenBucket.TakeAvailable(1) if available <= 0 { // 限流處理 } // handling}

          下面看下源碼實(shí)現(xiàn),juju/ratelimit實(shí)現(xiàn)很簡(jiǎn)單,一共只有兩個(gè)源碼文件和一個(gè)測(cè)試文件:

          ratelimit.goratelimit_test.goreader.go

          下面我們分析下常用的這兩個(gè)接口的實(shí)現(xiàn):


          1,ratelimit.NewBucket

          傳入的兩個(gè)參數(shù)分別是產(chǎn)生令牌的的間隔和桶的容量

          func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {  return NewBucketWithClock(fillInterval, capacity, nil)}
          func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {  return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)}

          默認(rèn)一個(gè)間隔周期內(nèi)就產(chǎn)生一個(gè)token,如果是高并發(fā)情況下,可以通過(guò)參數(shù)quantum控制產(chǎn)生多個(gè)。第三個(gè)參數(shù)是一個(gè)clock  interface,主要是方便mock測(cè)試,如果傳nil用的就是realClock{}

          // Clock represents the passage of time in a way that// can be faked out for tests.type Clock interface {  // Now returns the current time.  Now() time.Time  // Sleep sleeps for at least the given duration.  Sleep(d time.Duration)}

          realClock是實(shí)現(xiàn)了上述接口的結(jié)構(gòu)體:

          // realClock implements Clock in terms of standard time functions.type realClock struct{}
          // Now implements Clock.Now by calling time.Now.func (realClock) Now() time.Time { return time.Now()}
          // Now implements Clock.Sleep by calling time.Sleep.func (realClock) Sleep(d time.Duration) { time.Sleep(d)}

          上面幾個(gè)函數(shù)僅僅是對(duì)這個(gè)函數(shù)的一個(gè)簡(jiǎn)單包裝,加上默認(rèn)參數(shù),方便一般場(chǎng)景的使用,最終都是調(diào)用了這個(gè)函數(shù)

          func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {  if clock == nil {    clock = realClock{}  }  if fillInterval <= 0 {    panic("token bucket fill interval is not > 0")  }  if capacity <= 0 {    panic("token bucket capacity is not > 0")  }  if quantum <= 0 {    panic("token bucket quantum is not > 0")  }  return &Bucket{    clock:           clock,    startTime:       clock.Now(),    latestTick:      0,    fillInterval:    fillInterval,    capacity:        capacity,    quantum:         quantum,    availableTokens: capacity,  }}

          出來(lái)參數(shù)檢驗(yàn)外,最后生成了結(jié)構(gòu)體Bucket的指針

          type Bucket struct {  clock Clock
          // startTime holds the moment when the bucket was // first created and ticks began. startTime time.Time
          // capacity holds the overall capacity of the bucket. capacity int64
          // quantum holds how many tokens are added on // each tick. quantum int64
          // fillInterval holds the interval between each tick. fillInterval time.Duration
          // mu guards the fields below it. mu sync.Mutex
          // availableTokens holds the number of available // tokens as of the associated latestTick. // It will be negative when there are consumers // waiting for tokens. availableTokens int64
          // latestTick holds the latest tick for which // we know the number of tokens in the bucket. latestTick int64}

          Bucket里面出了存儲(chǔ)初始化必要的參數(shù)外,多了兩個(gè)變量:

          availableTokens當(dāng)前可用的令牌數(shù)量

          latestTick從程序運(yùn)行到上一次訪問(wèn)的時(shí)候,一共產(chǎn)生了多少次計(jì)數(shù)(如果quantum等于1的話 ,就是一共產(chǎn)生的令牌數(shù)量


          2,TakeAvailable

          有一個(gè)參數(shù),每次取的token數(shù)量,一般是一個(gè),為了并發(fā)安全,一般會(huì)加鎖:

          func (tb *Bucket) TakeAvailable(count int64) int64 {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.takeAvailable(tb.clock.Now(), count)}

          調(diào)用了令牌桶計(jì)算的核心函數(shù)takeAvailable,第一個(gè)參數(shù)表示是當(dāng)前時(shí)間,用于計(jì)算一共產(chǎn)生了多少個(gè)token:

          func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {  if count <= 0 {    return 0  }  tb.adjustavailableTokens(tb.currentTick(now))  if tb.availableTokens <= 0 {    return 0  }  if count > tb.availableTokens {    count = tb.availableTokens  }  tb.availableTokens -= count  return count}

          其中tb.adjustavailableTokens(tb.currentTick(now))用于計(jì)算修改可用token數(shù)量availableTokens,如果availableTokens<=0,說(shuō)明限流了;如果輸入的count比availableTokens,我么最多只能獲取availableTokens個(gè)token,獲取后,我們把a(bǔ)vailableTokens減去已經(jīng)使用的token數(shù)量。

          func (tb *Bucket) currentTick(now time.Time) int64 {  return int64(now.Sub(tb.startTime) / tb.fillInterval)}

          計(jì)算出了從開(kāi)始運(yùn)行到,當(dāng)前時(shí)間內(nèi)時(shí)間一共跳變了多少次,也就是一共產(chǎn)生了多少次令牌。

          func (tb *Bucket) adjustavailableTokens(tick int64) {  lastTick := tb.latestTick  tb.latestTick = tick  if tb.availableTokens >= tb.capacity {    return  }  tb.availableTokens += (tick - lastTick) * tb.quantum  if tb.availableTokens > tb.capacity {    tb.availableTokens = tb.capacity  }  return}

          1,如果可用token數(shù)量大于等于令牌桶的容量,說(shuō)明很長(zhǎng)時(shí)間沒(méi)有流量來(lái)獲取token了,不用處理。

          2,計(jì)算上一次獲取token 到現(xiàn)時(shí)刻,產(chǎn)生的token數(shù)量,把它加到availableTokens上

          3,如果availableTokens數(shù)量比capacity大,說(shuō)明溢出了,修改availableTokens為capacity。


          以上就是令牌桶算法的核心邏輯。當(dāng)然,這個(gè)包還封裝了一些其他的靈活的取令牌的接口,比如

          func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.take(tb.clock.Now(), count, maxWait)}

          這個(gè)函數(shù)就是獲取,在maxWait time.Duration超時(shí)的前提下,產(chǎn)生count個(gè)token,需要等待的時(shí)間間隔。

          func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {  if count <= 0 {    return 0, true  }
          tick := tb.currentTick(now) tb.adjustavailableTokens(tick) avail := tb.availableTokens - count if avail >= 0 { tb.availableTokens = avail return 0, true } // Round up the missing tokens to the nearest multiple // of quantum - the tokens won't be available until // that tick.
          // endTick holds the tick when all the requested tokens will // become available. endTick := tick + (-avail+tb.quantum-1)/tb.quantum endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) waitTime := endTime.Sub(now) if waitTime > maxWait { return 0, false } tb.availableTokens = avail return waitTime, true}

          函數(shù)的前半部分和takeAvailable一模一樣,后面邏輯表示,如果令牌不夠的情況下:

          1,計(jì)算還缺多少個(gè)令牌

          2,計(jì)算缺這么多令牌需要跳變多少次

          3,計(jì)算跳變這些次數(shù)需要的時(shí)間

          4,判斷需要的時(shí)間是否超時(shí)

          還有一個(gè)wait接口,用來(lái)計(jì)算,獲取count個(gè)令牌需要的時(shí)間,然后sleep這么長(zhǎng)時(shí)間。

          func (tb *Bucket) Wait(count int64) {  if d := tb.Take(count); d > 0 {    tb.clock.Sleep(d)  }}

          以上就是令牌桶算法的核心源碼實(shí)現(xiàn),

          ratelimit/reader.go

          里面實(shí)現(xiàn)了基于上述限流器實(shí)現(xiàn)的讀限速和寫(xiě)限速,原理是通過(guò)讀寫(xiě)buff的長(zhǎng)度來(lái)控制Wait函數(shù)的等待時(shí)間,實(shí)現(xiàn)讀寫(xiě)限速的

          func (r *reader) Read(buf []byte) (int, error) {  n, err := r.r.Read(buf)  if n <= 0 {    return n, err  }  r.bucket.Wait(int64(n))  return n, err}


          推薦閱讀


          福利

          我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

          瀏覽 40
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费高清无码 | 午夜免费性爱视频 | 北条麻妃影音先锋 | 亚洲高清成人电影 | 超碰人插人摸 |