Go 第三方庫源碼分析:uber-go/ratelimit
https://github.com/uber-go/ratelimit 是一個(gè)漏桶限流器的實(shí)現(xiàn),
rl := ratelimit.New(100) // per secondprev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()fmt.Println(i, now.Sub(prev))prev = now}
在這個(gè)例子中,我們給定限流器每秒可以通過 100 個(gè)請求,也就是平均每個(gè)請求間隔 10ms。因此,最終會(huì)每 10ms 打印一行數(shù)據(jù)。輸出結(jié)果如下:
// Output:// 0 0// 1 10ms// 2 10ms
整個(gè)包中源碼如下:
example_test.golimiter_atomic.golimiter_mutexbased.goratelimit.goratelimit_bench_test.goratelimit_test.go
1,ratelimit.New
先看下初始化的過程:
// New returns a Limiter that will limit to the given RPS.func New(rate int, opts ...Option) Limiter {return newAtomicBased(rate, opts...)}
傳入的參數(shù)是1s內(nèi)產(chǎn)生的token數(shù)量:
// newAtomicBased returns a new atomic based limiter.func newAtomicBased(rate int, opts ...Option) *atomicLimiter {// TODO consider moving config building to the implementation// independent code.config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &atomicLimiter{perRequest: perRequest,maxSlack: -1 * time.Duration(config.slack) * perRequest,clock: config.clock,}initialState := state{last: time.Time{},sleepFor: 0,}atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))return l}
1,通過options修改配置參數(shù) config := buildConfig(opts)
func buildConfig(opts []Option) config {c := config{clock: clock.New(),slack: 10,per: time.Second,}for _, opt := range opts {opt.apply(&c)}return c}
可以看到,默認(rèn)情況下per是1s
2,計(jì)算產(chǎn)生一個(gè)令牌話費(fèi)的時(shí)間(時(shí)間間隔)
perRequest := config.per / time.Duration(rate)
3,初始化atomicLimiter,令牌產(chǎn)生時(shí)間間隔,時(shí)鐘
type atomicLimiter struct {state unsafe.Pointer:ignore U1000 Padding is unused but it is crucial to maintain performanceof this rate limiter in case of collocation with other frequently accessed memory.padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.perRequest time.DurationmaxSlack time.Durationclock Clock}
4,記錄初始化狀態(tài):當(dāng)前時(shí)間和休眠時(shí)間
type state struct {last time.TimesleepFor time.Duration}
完成初始化的流程后,我們就進(jìn)入了令牌產(chǎn)生的流程了。
2,rl.Take
Take是一個(gè)接口,返回當(dāng)前時(shí)間
// Limiter is used to rate-limit some process, possibly across goroutines.// The process is expected to call Take() before every iteration, which// may block to throttle the goroutine.type Limiter interface {// Take should block to make sure that the RPS is met.Take() time.Time}
atomicLimiter 實(shí)現(xiàn)了這個(gè)接口
// Take blocks to ensure that the time spent between multiple// Take calls is on average time.Second/rate.func (t *atomicLimiter) Take() time.Time {var (newState statetaken boolinterval time.Duration)for !taken {now := t.clock.Now()previousStatePointer := atomic.LoadPointer(&t.state)oldState := (*state)(previousStatePointer)newState = state{last: now,sleepFor: oldState.sleepFor,}// If this is our first request, then we allow it.if oldState.last.IsZero() {taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))continue}// sleepFor calculates how much time we should sleep based on// the perRequest budget and how long the last request took.// Since the request may take longer than the budget, this number// can get negative, and is summed across requests.newState.sleepFor += t.perRequest - now.Sub(oldState.last)// We shouldn't allow sleepFor to get too negative, since it would mean that// a service that slowed down a lot for a short period of time would get// a much higher RPS following that.if newState.sleepFor < t.maxSlack {newState.sleepFor = t.maxSlack}if newState.sleepFor > 0 {newState.last = newState.last.Add(newState.sleepFor)interval, newState.sleepFor = newState.sleepFor, 0}taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))}t.clock.Sleep(interval)return newState.last}
1,獲取當(dāng)前時(shí)間
2,如果是初始化狀態(tài),也就是上一次訪問時(shí)間是0,那么設(shè)置上一次訪問時(shí)間是當(dāng)前時(shí)間,直接返回。
3,計(jì)算睡眠的時(shí)間,睡眠時(shí)間=上一次記錄的睡眠時(shí)間+每個(gè)令牌產(chǎn)生的時(shí)間間隔-(當(dāng)前時(shí)間-上一次訪問時(shí)間),也就是訪問時(shí)間間隔
4,如果睡眠時(shí)間小于maxSlack,說明請求量很小,距離上一次訪問時(shí)間已經(jīng)很久了,將睡眠時(shí)間修改成maxSlack,否則無法應(yīng)對大量突發(fā)流量。
5,如果睡眠時(shí)間大于0,說明請求量比較大,需要等待一段時(shí)間才能返回,調(diào)用 t.clock.Sleep(t.sleepFor),進(jìn)入睡眠狀態(tài),同時(shí)修改上一次訪問時(shí)間和休眠時(shí)間
6,如果小于等于0,說明請求量不大,可以立即返回,并記錄當(dāng)前時(shí)間
mutexLimiter 也實(shí)現(xiàn)了上述接口:
type mutexLimiter struct {sync.Mutexlast time.TimesleepFor time.DurationperRequest time.DurationmaxSlack time.Durationclock Clock}
差別就是一個(gè)是基于互斥鎖實(shí)現(xiàn)的,一個(gè)是基于原子操作實(shí)現(xiàn)的
func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// If this is our first request, then we allow it.if t.last.IsZero() {t.last = nowreturn t.last}// sleepFor calculates how much time we should sleep based on// the perRequest budget and how long the last request took.// Since the request may take longer than the budget, this number// can get negative, and is summed across requests.t.sleepFor += t.perRequest - now.Sub(t.last)// We shouldn't allow sleepFor to get too negative, since it would mean that// a service that slowed down a lot for a short period of time would get// a much higher RPS following that.if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// If sleepFor is positive, then we should sleep now.if t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0} else {t.last = now}return t.last}
Leaky Bucket,每個(gè)請求的間隔是固定的,然而,在實(shí)際上的互聯(lián)網(wǎng)應(yīng)用中,流量經(jīng)常是突發(fā)性的。對于這種情況,uber-go 對 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。我們先理解下整體背景: 假如我們要求每秒限定 100 個(gè)請求,平均每個(gè)請求間隔 10ms。但是實(shí)際情況下,有些請求間隔比較長,有些請求間隔比較短。
(1)當(dāng) t.sleepFor > 0,代表此前的請求多余出來的時(shí)間,無法完全抵消此次的所需量,因此需要 sleep 相應(yīng)時(shí)間, 同時(shí)將 t.sleepFor 置為 0。
(2)當(dāng) t.sleepFor < 0,說明此次請求間隔大于預(yù)期間隔,將多出來的時(shí)間累加到 t.sleepFor 即可。
但是,對于某種情況,請求 1 完成后,請求 2 過了很久到達(dá) (好幾個(gè)小時(shí)都有可能),那么此時(shí)對于請求 2 的請求間隔 now.Sub(t.last),會(huì)非常大。以至于即使后面大量請求瞬時(shí)到達(dá),也無法抵消完這個(gè)時(shí)間。那這樣就失去了限流的意義。
為了防止這種情況,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 該值為負(fù)值,表示允許抵消的最長時(shí)間,防止以上情況的出現(xiàn)。
推薦閱讀
