Go官方庫源碼分析:time/rate
這是golang 源碼中實(shí)現(xiàn)的限流器,是基于令牌桶算法的:
官方地址: golang.org/x/time/rate
github地址:github.com/golang/time/rate
???r?:=?rate.Every(100?*?time.Millisecond)???limit?:=?rate.NewLimiter(r,?20)for {if limit.AllowN(time.Now(), 8) {log.Info("log:event happen")} else {log.Info("log:event not allow")}}
一秒內(nèi)產(chǎn)生10 個(gè)令牌,桶的容量是20,當(dāng)前時(shí)刻取8個(gè)token
源碼很簡單只有兩個(gè)文件:
rate.gorate_test.go
1,NewLimiter
// NewLimiter returns a new Limiter that allows events up to rate r and permits// bursts of at most b tokens.func NewLimiter(r Limit, b int) *Limiter {return &Limiter{limit: r,burst: b,}}
簡單構(gòu)造了一個(gè)limiter對象
type Limiter struct {mu sync.Mutexlimit Limitburst inttokens float64// last is the last time the limiter's tokens field was updatedlast time.Time// lastEvent is the latest time of a rate-limited event (past or future)lastEvent time.Time}
記錄了上一次分發(fā)token的時(shí)間,和上一次請求token的時(shí)間
func Every(interval time.Duration) Limit {if interval <= 0 {return Inf}return 1 / Limit(interval.Seconds())}
僅僅做了從時(shí)間間隔向頻率的轉(zhuǎn)換。
2,AllowN/Allow
// Allow is shorthand for AllowN(time.Now(), 1).func (lim *Limiter) Allow() bool {return lim.AllowN(time.Now(), 1)}// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate limit.// Otherwise use Reserve or Wait.func (lim *Limiter) AllowN(now time.Time, n int) bool {return lim.reserveN(now, n, 0).ok}
底層都是調(diào)用了reserveN函數(shù),maxFutureReserve參數(shù)傳的是0
// reserveN is a helper method for AllowN, ReserveN, and WaitN.// maxFutureReserve specifies the maximum reservation wait duration allowed.// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()if lim.limit == Inf {lim.mu.Unlock()return Reservation{ok: true,lim: lim,tokens: n,timeToAct: now,}}now, last, tokens := lim.advance(now)// Calculate the remaining number of tokens resulting from the request.tokens -= float64(n)// Calculate the wait durationvar waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// Decide resultok := n <= lim.burst && waitDuration <= maxFutureReserve// Prepare reservationr := Reservation{ok: ok,lim: lim,limit: lim.limit,}if ok {r.tokens = nr.timeToAct = now.Add(waitDuration)}// Update stateif ok {lim.last = nowlim.tokens = tokenslim.lastEvent = r.timeToAct} else {lim.last = last}lim.mu.Unlock()return r}
1,如果lim.limit == Inf,返回Reservation對象
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.// A Reservation may be canceled, which may enable the Limiter to permit additional events.type Reservation struct {ok boollim *Limitertokens inttimeToAct time.Time// This is the Limit at reservation time, it can change later.limit Limit}
2,獲取當(dāng)前時(shí)間,上一次產(chǎn)生token的時(shí)間和,產(chǎn)生的token
// advance calculates and returns an updated state for lim resulting from the passage of time.// lim is not changed.// advance requires that lim.mu is held.func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {last := lim.lastif now.Before(last) {last = now}// Calculate the new number of tokens, due to time that passed.elapsed := now.Sub(last)delta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + deltaif burst := float64(lim.burst); tokens > burst {tokens = burst}return now, last, tokens}
A,如果當(dāng)前時(shí)間比上一次獲取token時(shí)間早(說明有請求在等待獲取token),那么更新當(dāng)前時(shí)間為上一次獲取token時(shí)間(和上一個(gè)請求一起等)
B,計(jì)算從上一次獲取token到現(xiàn)在的時(shí)間間隔
C,計(jì)算產(chǎn)生的token增量
delta := lim.limit.tokensFromDuration(elapsed)type Limit float64// tokensFromDuration is a unit conversion function from a time duration to the number of tokens// which could be accumulated during that duration at a rate of limit tokens per second.func (limit Limit) tokensFromDuration(d time.Duration) float64 {return d.Seconds() * float64(limit)}
也就是時(shí)間間隔的秒數(shù)乘以每秒產(chǎn)生的token數(shù)量。
D,計(jì)算總的token數(shù)量
E,如果桶已經(jīng)滿了,丟棄多余的token
3,扣減本次請求需要的token
4,如果token數(shù)不夠,計(jì)算需要等待的時(shí)間間隔
5,如果請求的token數(shù)量比桶的容量小,并且可以等待的時(shí)間大于需要等待的時(shí)間說明這個(gè)請求是合法的。
ok?:=?n?<=?lim.burst?&&?waitDuration?<=?maxFutureReserve6,構(gòu)造Reservation對象,存儲當(dāng)前l(fā)imiter對象到lim
7,如果請求合法,存儲當(dāng)前請求需要的token數(shù)量和需要等待的時(shí)間(當(dāng)前時(shí)間+等待時(shí)間間隔)
8,如果合法,更新當(dāng)前l(fā)imiter的上一次獲取token時(shí)間為當(dāng)前時(shí)間,獲取的token數(shù)量為扣減后剩余的token數(shù)量,獲取token時(shí)間為將來能夠真正獲取token的時(shí)間點(diǎn)。
9,否則更新limiter的上一次獲取token時(shí)間為本次計(jì)算的上一次獲取token時(shí)間。
上面就是獲取token的所有代碼實(shí)現(xiàn)。
Limiter提供了三類方法供用戶消費(fèi)Token,用戶可以每次消費(fèi)一個(gè)Token,也可以一次性消費(fèi)多個(gè)Token。
1,AllowN 方法表示,截止到某一時(shí)刻,目前桶中數(shù)目是否至少為 n 個(gè),滿足則返回 true,同時(shí)從桶中消費(fèi) n 個(gè) token。反之返回不消費(fèi) token,false。也就是前面介紹的方法。
func (lim *Limiter) Allow() boolfunc (lim *Limiter) AllowN(now time.Time, n int) bool
2,當(dāng)使用 Wait 方法消費(fèi) token 時(shí),如果此時(shí)桶內(nèi) token 數(shù)組不足 (小于 N),那么 Wait 方法將會阻塞一段時(shí)間,直至 token 滿足條件。如果充足則直接返回。
func (lim *Limiter) Wait(ctx context.Context) (err error)func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.// It returns an error if n exceeds the Limiter's burst size, the Context is// canceled, or the expected wait time exceeds the Context's Deadline.// The burst limit is ignored if the rate limit is Inf.func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// Check if ctx is already cancelledselect {case <-ctx.Done():return ctx.Err()default:}// Determine wait limitnow := time.Now()waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(now)}// Reserver := lim.reserveN(now, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// Wait if necessarydelay := r.DelayFrom(now)if delay == 0 {return nil}t := time.NewTimer(delay)defer t.Stop()select {case <-t.C:// We can proceed.return nilcase <-ctx.Done():// Context was canceled before we could proceed. Cancel the// reservation, which may permit other events to proceed sooner.r.Cancel()return ctx.Err()}}
A,如果請求數(shù)量超出了桶的容量,直接報(bào)錯(cuò)
B,通過ctx.Deadline()計(jì)算允許等待的時(shí)間間隔
C,調(diào)用r := lim.reserveN(now, n, waitLimit)?獲取Reserve對象
D,如果reserve對象表示不能成功(超出桶的容量,超出時(shí)間限制),返回錯(cuò)誤
E,計(jì)算需要等待的時(shí)間,timeToAct表示能夠獲取token的時(shí)間。
// DelayFrom returns the duration for which the reservation holder must wait// before taking the reserved action. Zero duration means act immediately.// InfDuration means the limiter cannot grant the tokens requested in this// Reservation within the maximum wait time.func (r *Reservation) DelayFrom(now time.Time) time.Duration {if !r.ok {return InfDuration}delay := r.timeToAct.Sub(now)if delay < 0 {return 0}return delay}
F,啟動定時(shí)器等待。
3,ReserveN 的用法就相對來說復(fù)雜一些,當(dāng)調(diào)用完成后,無論 token 是否充足,都會返回一個(gè) Reservation * 對象。
你可以調(diào)用該對象的 Delay() 方法,該方法返回了需要等待的時(shí)間。如果等待時(shí)間為 0,則說明不用等待。
必須等到等待時(shí)間之后,才能進(jìn)行接下來的工作。
或者,如果不想等待,可以調(diào)用 Cancel() 方法,該方法會將 token 歸還。
func (lim *Limiter) Reserve() *Reservationfunc (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
這個(gè)方法比較原始直接返回Reserve對象,交給用戶處理
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {r := lim.reserveN(now, n, InfDuration)return &r}
推薦閱讀
