<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go 第三方庫源碼分析:uber-go/ratelimit

          共 6190字,需瀏覽 13分鐘

           ·

          2021-10-02 04:00

          https://github.com/uber-go/ratelimit 是一個(gè)漏桶限流器的實(shí)現(xiàn),

          rl := ratelimit.New(100// per second
          prev := time.Now()for i := 0; i < 10; i++ { now := rl.Take() fmt.Println(i, now.Sub(prev)) prev = now}

          在這個(gè)例子中,我們給定限流器每秒可以通過 100 個(gè)請求,也就是平均每個(gè)請求間隔 10ms。因此,最終會(huì)每 10ms 打印一行數(shù)據(jù)。輸出結(jié)果如下:

          // Output:// 0 0// 1 10ms// 2 10ms

          整個(gè)包中源碼如下:

          example_test.golimiter_atomic.golimiter_mutexbased.goratelimit.goratelimit_bench_test.goratelimit_test.go


          1,ratelimit.New

          先看下初始化的過程:

          // New returns a Limiter that will limit to the given RPS.func New(rate int, opts ...Option) Limiter {  return newAtomicBased(rate, opts...)}

          傳入的參數(shù)是1s內(nèi)產(chǎn)生的token數(shù)量:

          // newAtomicBased returns a new atomic based limiter.func newAtomicBased(rate int, opts ...Option) *atomicLimiter {  // TODO consider moving config building to the implementation  // independent code.  config := buildConfig(opts)  perRequest := config.per / time.Duration(rate)  l := &atomicLimiter{    perRequest: perRequest,    maxSlack:   -1 * time.Duration(config.slack) * perRequest,    clock:      config.clock,  }
          initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l}

          1,通過options修改配置參數(shù) config := buildConfig(opts)

          func buildConfig(opts []Option) config {  c := config{    clock: clock.New(),    slack: 10,    per:   time.Second,  }
          for _, opt := range opts { opt.apply(&c) } return c}

          可以看到,默認(rèn)情況下per是1s

          2,計(jì)算產(chǎn)生一個(gè)令牌話費(fèi)的時(shí)間(時(shí)間間隔

          perRequest := config.per / time.Duration(rate)

          3,初始化atomicLimiter,令牌產(chǎn)生時(shí)間間隔,時(shí)鐘

          type atomicLimiter struct {  state unsafe.Pointer  //lint:ignore U1000 Padding is unused but it is crucial to maintain performance  // of this rate limiter in case of collocation with other frequently accessed memory.  padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
          perRequest time.Duration maxSlack time.Duration clock Clock}

          4,記錄初始化狀態(tài):當(dāng)前時(shí)間和休眠時(shí)間

          type state struct {  last     time.Time  sleepFor time.Duration}

          完成初始化的流程后,我們就進(jìn)入了令牌產(chǎn)生的流程了。


          2,rl.Take

          Take是一個(gè)接口,返回當(dāng)前時(shí)間

          // Limiter is used to rate-limit some process, possibly across goroutines.// The process is expected to call Take() before every iteration, which// may block to throttle the goroutine.type Limiter interface {  // Take should block to make sure that the RPS is met.  Take() time.Time}

          atomicLimiter 實(shí)現(xiàn)了這個(gè)接口

          // Take blocks to ensure that the time spent between multiple// Take calls is on average time.Second/rate.func (t *atomicLimiter) Take() time.Time {  var (    newState state    taken    bool    interval time.Duration  )  for !taken {    now := t.clock.Now()
          previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer)
          newState = state{ last: now, sleepFor: oldState.sleepFor, }
          // If this is our first request, then we allow it. if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue }
          // sleepFor calculates how much time we should sleep based on // the perRequest budget and how long the last request took. // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. newState.sleepFor += t.perRequest - now.Sub(oldState.last) // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) return newState.last}

          1,獲取當(dāng)前時(shí)間

          2,如果是初始化狀態(tài),也就是上一次訪問時(shí)間是0,那么設(shè)置上一次訪問時(shí)間是當(dāng)前時(shí)間,直接返回。

          3,計(jì)算睡眠的時(shí)間,睡眠時(shí)間=上一次記錄的睡眠時(shí)間+每個(gè)令牌產(chǎn)生的時(shí)間間隔-(當(dāng)前時(shí)間-上一次訪問時(shí)間),也就是訪問時(shí)間間隔

          4,如果睡眠時(shí)間小于maxSlack,說明請求量很小,距離上一次訪問時(shí)間已經(jīng)很久了,將睡眠時(shí)間修改成maxSlack否則無法應(yīng)對大量突發(fā)流量

          5,如果睡眠時(shí)間大于0,說明請求量比較大,需要等待一段時(shí)間才能返回,調(diào)用 t.clock.Sleep(t.sleepFor),進(jìn)入睡眠狀態(tài),同時(shí)修改上一次訪問時(shí)間和休眠時(shí)間

          6,如果小于等于0,說明請求量不大,可以立即返回,并記錄當(dāng)前時(shí)間

          mutexLimiter 也實(shí)現(xiàn)了上述接口:

          type mutexLimiter struct {  sync.Mutex  last       time.Time  sleepFor   time.Duration  perRequest time.Duration  maxSlack   time.Duration  clock      Clock}

          差別就是一個(gè)是基于互斥鎖實(shí)現(xiàn)的,一個(gè)是基于原子操作實(shí)現(xiàn)的

          func (t *mutexLimiter) Take() time.Time {  t.Lock()  defer t.Unlock()
          now := t.clock.Now()
          // If this is our first request, then we allow it. if t.last.IsZero() { t.last = now return t.last }
          // sleepFor calculates how much time we should sleep based on // the perRequest budget and how long the last request took. // Since the request may take longer than the budget, this number // can get negative, and is summed across requests. t.sleepFor += t.perRequest - now.Sub(t.last)
          // We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack }
          // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now }
          return t.last}

           Leaky Bucket,每個(gè)請求的間隔是固定的,然而,在實(shí)際上的互聯(lián)網(wǎng)應(yīng)用中,流量經(jīng)常是突發(fā)性的。對于這種情況,uber-go 對 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。我們先理解下整體背景: 假如我們要求每秒限定 100 個(gè)請求,平均每個(gè)請求間隔 10ms。但是實(shí)際情況下,有些請求間隔比較長,有些請求間隔比較短。

          (1)當(dāng) t.sleepFor > 0,代表此前的請求多余出來的時(shí)間,無法完全抵消此次的所需量,因此需要 sleep 相應(yīng)時(shí)間, 同時(shí)將 t.sleepFor 置為 0。

          (2)當(dāng) t.sleepFor < 0,說明此次請求間隔大于預(yù)期間隔,將多出來的時(shí)間累加到 t.sleepFor 即可。

          但是,對于某種情況,請求 1 完成后,請求 2 過了很久到達(dá) (好幾個(gè)小時(shí)都有可能),那么此時(shí)對于請求 2 的請求間隔 now.Sub(t.last),會(huì)非常大。以至于即使后面大量請求瞬時(shí)到達(dá),也無法抵消完這個(gè)時(shí)間。那這樣就失去了限流的意義。

          了防止這種情況,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 該值為負(fù)值,表示允許抵消的最長時(shí)間,防止以上情況的出現(xiàn)。



          推薦閱讀


          福利

          我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。


          瀏覽 110
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人精品视频99在线观看免费 | 国产又白又嫩又爽又黄 | 欧美亚在线| 中文字幕++中文字幕明步 | 内射网站 |