Go 第三方庫(kù)源碼分析:juju/ratelimit
https://github.com/juju/ratelimit 是一個(gè)基于令牌桶算法的限流器:令牌桶就是想象有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定速率向桶中放 Token,桶滿則暫時(shí)不放。漏桶算法和令牌桶算法的主要區(qū)別在于,"漏桶算法"能夠強(qiáng)行限制數(shù)據(jù)的傳輸速率(或請(qǐng)求頻率),而"令牌桶算法"在能夠限制數(shù)據(jù)的平均傳輸速率外,還允許某種程度的突發(fā)傳輸。
首先看下如何使用:
import "github.com/juju/ratelimit"var tokenBucket ratelimit.Bucket = nilfunc init() {// func NewBucket(fillInterval time.Duration, capacity int64) *Bucket// fillInterval令牌填充的時(shí)間間隔// capacity令牌桶的最大容量tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)}func Handler() {available := tokenBucket.TakeAvailable(1)if available <= 0 {// 限流處理}// handling}
下面看下源碼實(shí)現(xiàn),juju/ratelimit實(shí)現(xiàn)很簡(jiǎn)單,一共只有兩個(gè)源碼文件和一個(gè)測(cè)試文件:
ratelimit.goratelimit_test.goreader.go
下面我們分析下常用的這兩個(gè)接口的實(shí)現(xiàn):
1,ratelimit.NewBucket
傳入的兩個(gè)參數(shù)分別是產(chǎn)生令牌的的間隔和桶的容量。
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {return NewBucketWithClock(fillInterval, capacity, nil)}
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)}
默認(rèn)一個(gè)間隔周期內(nèi)就產(chǎn)生一個(gè)token,如果是高并發(fā)情況下,可以通過(guò)參數(shù)quantum控制產(chǎn)生多個(gè)。第三個(gè)參數(shù)是一個(gè)clock interface,主要是方便mock測(cè)試,如果傳nil用的就是realClock{}
// Clock represents the passage of time in a way that// can be faked out for tests.type Clock interface {// Now returns the current time.Now() time.Time// Sleep sleeps for at least the given duration.Sleep(d time.Duration)}
realClock是實(shí)現(xiàn)了上述接口的結(jié)構(gòu)體:
// realClock implements Clock in terms of standard time functions.type realClock struct{}// Now implements Clock.Now by calling time.Now.func (realClock) Now() time.Time {return time.Now()}// Now implements Clock.Sleep by calling time.Sleep.func (realClock) Sleep(d time.Duration) {time.Sleep(d)}
上面幾個(gè)函數(shù)僅僅是對(duì)這個(gè)函數(shù)的一個(gè)簡(jiǎn)單包裝,加上默認(rèn)參數(shù),方便一般場(chǎng)景的使用,最終都是調(diào)用了這個(gè)函數(shù)
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {if clock == nil {clock = realClock{}}if fillInterval <= 0 {panic("token bucket fill interval is not > 0")}if capacity <= 0 {panic("token bucket capacity is not > 0")}if quantum <= 0 {panic("token bucket quantum is not > 0")}return &Bucket{clock: clock,startTime: clock.Now(),latestTick: 0,fillInterval: fillInterval,capacity: capacity,quantum: quantum,availableTokens: capacity,}}
出來(lái)參數(shù)檢驗(yàn)外,最后生成了結(jié)構(gòu)體Bucket的指針
type Bucket struct {clock Clock// startTime holds the moment when the bucket was// first created and ticks began.startTime time.Time// capacity holds the overall capacity of the bucket.capacity int64// quantum holds how many tokens are added on// each tick.quantum int64// fillInterval holds the interval between each tick.fillInterval time.Duration// mu guards the fields below it.mu sync.Mutex// availableTokens holds the number of available// tokens as of the associated latestTick.// It will be negative when there are consumers// waiting for tokens.availableTokens int64// latestTick holds the latest tick for which// we know the number of tokens in the bucket.latestTick int64}
Bucket里面出了存儲(chǔ)初始化必要的參數(shù)外,多了兩個(gè)變量:
availableTokens:當(dāng)前可用的令牌數(shù)量
latestTick:從程序運(yùn)行到上一次訪問(wèn)的時(shí)候,一共產(chǎn)生了多少次計(jì)數(shù)(如果quantum等于1的話 ,就是一共產(chǎn)生的令牌數(shù)量)
2,TakeAvailable
有一個(gè)參數(shù),每次取的token數(shù)量,一般是一個(gè),為了并發(fā)安全,一般會(huì)加鎖:
func (tb *Bucket) TakeAvailable(count int64) int64 {tb.mu.Lock()defer tb.mu.Unlock()return tb.takeAvailable(tb.clock.Now(), count)}
調(diào)用了令牌桶計(jì)算的核心函數(shù)takeAvailable,第一個(gè)參數(shù)表示是當(dāng)前時(shí)間,用于計(jì)算一共產(chǎn)生了多少個(gè)token:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {if count <= 0 {return 0}tb.adjustavailableTokens(tb.currentTick(now))if tb.availableTokens <= 0 {return 0}if count > tb.availableTokens {count = tb.availableTokens}tb.availableTokens -= countreturn count}
其中tb.adjustavailableTokens(tb.currentTick(now))用于計(jì)算修改可用token數(shù)量availableTokens,如果availableTokens<=0,說(shuō)明限流了;如果輸入的count比availableTokens,我么最多只能獲取availableTokens個(gè)token,獲取后,我們把a(bǔ)vailableTokens減去已經(jīng)使用的token數(shù)量。
func (tb *Bucket) currentTick(now time.Time) int64 {return int64(now.Sub(tb.startTime) / tb.fillInterval)}
計(jì)算出了從開(kāi)始運(yùn)行到,當(dāng)前時(shí)間內(nèi)時(shí)間一共跳變了多少次,也就是一共產(chǎn)生了多少次令牌。
func (tb *Bucket) adjustavailableTokens(tick int64) {lastTick := tb.latestTicktb.latestTick = tickif tb.availableTokens >= tb.capacity {return}tb.availableTokens += (tick - lastTick) * tb.quantumif tb.availableTokens > tb.capacity {tb.availableTokens = tb.capacity}return}
1,如果可用token數(shù)量大于等于令牌桶的容量,說(shuō)明很長(zhǎng)時(shí)間沒(méi)有流量來(lái)獲取token了,不用處理。
2,計(jì)算上一次獲取token 到現(xiàn)時(shí)刻,產(chǎn)生的token數(shù)量,把它加到availableTokens上
3,如果availableTokens數(shù)量比capacity大,說(shuō)明溢出了,修改availableTokens為capacity。
以上就是令牌桶算法的核心邏輯。當(dāng)然,這個(gè)包還封裝了一些其他的靈活的取令牌的接口,比如
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {tb.mu.Lock()defer tb.mu.Unlock()return tb.take(tb.clock.Now(), count, maxWait)}
這個(gè)函數(shù)就是獲取,在maxWait time.Duration超時(shí)的前提下,產(chǎn)生count個(gè)token,需要等待的時(shí)間間隔。
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {if count <= 0 {return 0, true}tick := tb.currentTick(now)tb.adjustavailableTokens(tick)avail := tb.availableTokens - countif avail >= 0 {tb.availableTokens = availreturn 0, true}// Round up the missing tokens to the nearest multiple// of quantum - the tokens won't be available until// that tick.// endTick holds the tick when all the requested tokens will// become available.endTick := tick + (-avail+tb.quantum-1)/tb.quantumendTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)waitTime := endTime.Sub(now)if waitTime > maxWait {return 0, false}tb.availableTokens = availreturn waitTime, true}
函數(shù)的前半部分和takeAvailable一模一樣,后面邏輯表示,如果令牌不夠的情況下:
1,計(jì)算還缺多少個(gè)令牌
2,計(jì)算缺這么多令牌需要跳變多少次
3,計(jì)算跳變這些次數(shù)需要的時(shí)間
4,判斷需要的時(shí)間是否超時(shí)
還有一個(gè)wait接口,用來(lái)計(jì)算,獲取count個(gè)令牌需要的時(shí)間,然后sleep這么長(zhǎng)時(shí)間。
func (tb *Bucket) Wait(count int64) {if d := tb.Take(count); d > 0 {tb.clock.Sleep(d)}}
以上就是令牌桶算法的核心源碼實(shí)現(xiàn),
ratelimit/reader.go里面實(shí)現(xiàn)了基于上述限流器實(shí)現(xiàn)的讀限速和寫(xiě)限速,原理是通過(guò)讀寫(xiě)buff的長(zhǎng)度來(lái)控制Wait函數(shù)的等待時(shí)間,實(shí)現(xiàn)讀寫(xiě)限速的
func (r *reader) Read(buf []byte) (int, error) {n, err := r.r.Read(buf)if n <= 0 {return n, err}r.bucket.Wait(int64(n))return n, err}
推薦閱讀
