漏桶、令牌桶限流算法的Go語言實現
限流
限流又稱為流量控制(流控),通常是指限制到達系統的并發(fā)請求數。
我們生活中也會經常遇到限流的場景,比如:某景區(qū)限制每日進入景區(qū)的游客數量為 8 萬人;沙河地鐵站早高峰通過站外排隊逐一放行的方式限制同一時間進入車站的旅客數量等。
限流雖然會影響部分用戶的使用體驗,但是卻能在一定程度上保障系統的穩(wěn)定性,不至于崩潰(大家都沒了用戶體驗)。
而互聯網上類似需要限流的業(yè)務場景也有很多,比如電商系統的秒殺、微博上突發(fā)熱點新聞、雙十一購物節(jié)、12306 搶票等等。這些場景下的用戶請求量通常會激增,遠遠超過平時正常的請求量,此時如果不加任何限制很容易就會將后端服務打垮,影響服務的穩(wěn)定性。
此外,一些廠商公開的 API 服務通常也會限制用戶的請求次數,比如百度地圖開放平臺等會根據用戶的付費情況來限制用戶的請求數等。

常用的限流策略
漏桶
漏桶法限流很好理解,假設我們有一個水桶按固定的速率向下方滴落一滴水,無論有多少請求,請求的速率有多大,都按照固定的速率流出,對應到系統中就是按照固定的速率處理請求。

漏桶法的關鍵點在于漏桶始終按照固定的速率運行,但是它并不能很好的處理有大量突發(fā)請求的場景,畢竟在某些場景下我們可能需要提高系統的處理效率,而不是一味的按照固定速率處理請求。
關于漏桶的實現,uber 團隊有一個開源的https://github.com/uber-go/ratelimit實現。使用方法也比較簡單,Take() 方法會返回漏桶下一次滴水的時間。
import?(
?"fmt"
?"time"
?"go.uber.org/ratelimit"
)
func?main()?{
????rl?:=?ratelimit.New(100)?//?per?second
????prev?:=?time.Now()
????for?i?:=?0;?i?10;?i++?{
????????now?:=?rl.Take()
????????fmt.Println(i,?now.Sub(prev))
????????prev?=?now
????}
????//?Output:
????//?0?0
????//?1?10ms
????//?2?10ms
????//?3?10ms
????//?4?10ms
????//?5?10ms
????//?6?10ms
????//?7?10ms
????//?8?10ms
????//?9?10ms
}
它的源碼實現也比較簡單,這里大致說一下關鍵的地方,有興趣的同學可以自己去看一下完整的源碼。
限制器是一個接口類型,其要求實現一個Take()方法:
type?Limiter?interface?{
?//?Take方法應該阻塞已確保滿足?RPS
?Take()?time.Time
}
實現限制器接口的結構體定義如下,這里可以重點留意下maxSlack字段,它在后面的Take()方法中的處理。
type?limiter?struct?{
?sync.Mutex????????????????//?鎖
?last???????time.Time??????//?上一次的時刻
?sleepFor???time.Duration??//?需要等待的時間
?perRequest?time.Duration??//?每次的時間間隔
?maxSlack???time.Duration??//?最大的富余量
?clock??????Clock??????????//?時鐘
}
limiter結構體實現Limiter接口的Take()方法內容如下:
//?Take?會阻塞確保兩次請求之間的時間走完
//?Take?調用平均數為?time.Second/rate.
func?(t?*limiter)?Take()?time.Time?{
?t.Lock()
?defer?t.Unlock()
?now?:=?t.clock.Now()
?//?如果是第一次請求就直接放行
?if?t.last.IsZero()?{
??t.last?=?now
??return?t.last
?}
?//?sleepFor?根據?perRequest?和上一次請求的時刻計算應該sleep的時間
?//?由于每次請求間隔的時間可能會超過perRequest,?所以這個數字可能為負數,并在多個請求之間累加
?t.sleepFor?+=?t.perRequest?-?now.Sub(t.last)
?//?我們不應該讓sleepFor負的太多,因為這意味著一個服務在短時間內慢了很多隨后會得到更高的RPS。
?if?t.sleepFor???t.sleepFor?=?t.maxSlack
?}
?//?如果?sleepFor?是正值那么就?sleep
?if?t.sleepFor?>?0?{
??t.clock.Sleep(t.sleepFor)
??t.last?=?now.Add(t.sleepFor)
??t.sleepFor?=?0
?}?else?{
??t.last?=?now
?}
?return?t.last
}
上面的代碼根據記錄每次請求的間隔時間和上一次請求的時刻來計算當次請求需要阻塞的時間——sleepFor,這里需要留意的是sleepFor的值可能為負,在經過間隔時間長的兩次訪問之后會導致隨后大量的請求被放行,所以代碼中針對這個場景有專門的優(yōu)化處理。maxSlack默認值可以通過創(chuàng)建限制器的New函數看到。
func?New(rate?int,?opts?...Option)?Limiter?{
?l?:=?&limiter{
??perRequest:?time.Second?/?time.Duration(rate),
??maxSlack:???-10?*?time.Second?/?time.Duration(rate),
?}
?for?_,?opt?:=?range?opts?{
??opt(l)
?}
?if?l.clock?==?nil?{
??l.clock?=?clock.New()
?}
?return?l
}
令牌桶
令牌桶其實和漏桶的原理類似,令牌桶按固定的速率往桶里放入令牌,并且只要能從桶里取出令牌就能通過,令牌桶支持突發(fā)流量的快速處理。

對于從桶里取不到令牌的場景,我們可以選擇等待也可以直接拒絕并返回。
對于令牌桶的 Go 語言實現,大家可以參照https://github.com/juju/ratelimit。
這個庫支持多種令牌桶模式,并且使用起來也比較簡單。?
創(chuàng)建令牌桶的方法:
//?創(chuàng)建指定填充速率和容量大小的令牌桶
func?NewBucket(fillInterval?time.Duration,?capacity?int64)?*Bucket
//?創(chuàng)建指定填充速率、容量大小和每次填充的令牌數的令牌桶
func?NewBucketWithQuantum(fillInterval?time.Duration,?capacity,?quantum?int64)?*Bucket
//?創(chuàng)建填充速度為指定速率和容量大小的令牌桶
//?NewBucketWithRate(0.1,?200)?表示每秒填充20個令牌
func?NewBucketWithRate(rate?float64,?capacity?int64)?*Bucket
取出令牌的方法:
//?非阻塞的取token
func?(tb?*Bucket)?Take(count?int64)?time.Duration
func?(tb?*Bucket)?TakeAvailable(count?int64)?int64
//?最多等maxWait時間取token
func?(tb?*Bucket)?TakeMaxDuration(count?int64,?maxWait?time.Duration)?(time.Duration,?bool)
//?阻塞的取token
func?(tb?*Bucket)?Wait(count?int64)
func?(tb?*Bucket)?WaitMaxDuration(count?int64,?maxWait?time.Duration)?bool
雖說是令牌桶,但是我們沒有必要真的去生成令牌放到桶里,我們只需要每次來取令牌的時候計算一下,當前是否有足夠的令牌可以使用就可以了,具體的計算公式如下。
當前令牌數?=?上一次剩余的令牌數?+?(本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔?*?每次放置的令牌數
github.com/juju/ratelimit這個庫中關于令牌數計算的具體實現如下:
func?(tb?*Bucket)?adjustavailableTokens(tick?int64)?{
?if?tb.availableTokens?>=?tb.capacity?{
??return
?}
?tb.availableTokens?+=?(tick?-?tb.latestTick)?*?tb.quantum
?if?tb.availableTokens?>?tb.capacity?{
??tb.availableTokens?=?tb.capacity
?}
?tb.latestTick?=?tick
?return
}
獲取令牌的TakeAvailable函數關鍵部分的源碼如下:
func?(tb?*Bucket)?takeAvailable(now?time.Time,?count?int64)?int64?{
?if?count?<=?0?{
??return?0
?}
?tb.adjustavailableTokens(tb.currentTick(now))
?if?tb.availableTokens?<=?0?{
??return?0
?}
?if?count?>?tb.availableTokens?{
??count?=?tb.availableTokens
?}
?tb.availableTokens?-=?count
?return?count
}
大家從代碼中也可以看到其實令牌桶的實現并沒有很復雜。
gin 框架中使用限流中間件
在 gin 框架構建的項目中,我們可以將限流組件定義成中間件。
這里使用令牌桶作為限流策略,編寫一個限流中間件如下:
func?RateLimitMiddleware(fillInterval?time.Duration,?cap?int64)?func(c?*gin.Context)?{
?bucket?:=?ratelimit.NewBucket(fillInterval,?cap)
?return?func(c?*gin.Context)?{
??//?如果取不到令牌就返回響應
??if?bucket.TakeAvailable(1)?==?0?{
???c.String(http.StatusOK,?"rate?limit...")
???c.Abort()
???return
??}
??c.Next()
?}
}
對于該限流中間件的注冊位置,我們可以按照不同的限流策略將其添加到不同的地方,例如:
如果要對全站限流就可以添加成全局的中間件。 如果是某一組路由需要限流,那么就只需添加到對應的路由組即可。
推薦閱讀
站長 polarisxu
自己的原創(chuàng)文章
不限于 Go 技術
職場和創(chuàng)業(yè)經驗
Go語言中文網
每天為你
分享 Go 知識
Go愛好者值得關注
