Go實現(xiàn)各類限流算法

前 言
在開發(fā)高并發(fā)系統(tǒng)時,我們可能會遇到接口訪問頻次過高,為了保證系統(tǒng)的高可用和穩(wěn)定性,這時候就需要做流量限制,你可能是用的?
Nginx?這種來控制請求,也可能是用了一些流行的類庫實現(xiàn)。限流是高并發(fā)系統(tǒng)的一大殺器,在設計限流算法之前我們先來了解一下它們是什么。
限 流
限流的目的是通過對并發(fā)訪問請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統(tǒng),一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理。通過對并發(fā)(或者一定時間窗口內)請求進行限速來保護系統(tǒng),一旦達到限制速率則拒絕服務(定向到錯誤頁或告知資源沒有了)、排隊等待(比如秒殺、評論、下單)、降級(返回兜底數(shù)據(jù)或默認數(shù)據(jù))。
如 圖:

如圖上的漫畫,在某個時間段流量上來了,服務的接口訪問頻率可能會非常快,如果我們沒有對接口訪問頻次做限制可能會導致服務器無法承受過高的壓力掛掉,這時候也可能會產生數(shù)據(jù)丟失,所以就要對其進行限流處理。
限流算法就可以幫助我們去控制每個接口或程序的函數(shù)被調用頻率,它有點兒像保險絲,防止系統(tǒng)因為超過訪問頻率或并發(fā)量而引起癱瘓。我們可能在調用某些第三方的接口的時候會看到類似這樣的響應頭:
X-RateLimit-Limit:?60?????????//每秒60次請求
X-RateLimit-Remaining:?22?????//當前還剩下多少次
X-RateLimit-Reset:?1612184024?//限制重置時間
上面的?HTTP Response?是通過響應頭告訴調用方服務端的限流頻次是怎樣的,保證后端的接口訪問上限。為了解決限流問題出現(xiàn)了很多的算法,它們都有不同的用途,通常的策略就是拒絕超出的請求,或者讓超出的請求排隊等待。
一般來說,限流的常用處理手段有:
計數(shù)器 滑動窗口 漏桶 令牌桶
計數(shù)器
計數(shù)器是一種最簡單限流算法,其原理就是:
在一段時間間隔內,對請求進行計數(shù),與閥值進行比較判斷是否需要限流,一旦到了時間臨界點,將計數(shù)器清零。這個就像你去坐車一樣,車廂規(guī)定了多少個位置,滿了就不讓上車了,不然就是超載了,被交警叔叔抓到了就要罰款的,如果我們的系統(tǒng)那就不是罰款的事情了,可能直接崩掉了。
可以在程序中設置一個變量? count,當過來一個請求我就將這個數(shù)+1,同時記錄請求時間。當下一個請求來的時候判斷? count?的計數(shù)值是否超過設定的頻次,以及當前請求的時間和第一次請求時間是否在?1?分鐘內。如果在? 1?分鐘內并且超過設定的頻次則證明請求過多,后面的請求就拒絕掉。如果該請求與第一個請求的間隔時間大于計數(shù)周期,且? count?值還在限流范圍內,就重置?count。
代碼實現(xiàn):
package?main
import?(
????"log"
????"sync"
????"time"
)
type?Counter?struct?{
????rate??int???????????//計數(shù)周期內最多允許的請求數(shù)
????begin?time.Time?????//計數(shù)開始時間
????cycle?time.Duration?//計數(shù)周期
????count?int???????????//計數(shù)周期內累計收到的請求數(shù)
????lock??sync.Mutex
}
func?(l?*Counter)?Allow()?bool?{
????l.lock.Lock()
????defer?l.lock.Unlock()
????if?l.count?==?l.rate-1?{
????????now?:=?time.Now()
????????if?now.Sub(l.begin)?>=?l.cycle?{
????????????//速度允許范圍內,?重置計數(shù)器
????????????l.Reset(now)
????????????return?true
????????}?else?{
????????????return?false
????????}
????}?else?{
????????//沒有達到速率限制,計數(shù)加1
????????l.count++
????????return?true
????}
}
func?(l?*Counter)?Set(r?int,?cycle?time.Duration)?{
????l.rate?=?r
????l.begin?=?time.Now()
????l.cycle?=?cycle
????l.count?=?0
}
func?(l?*Counter)?Reset(t?time.Time)?{
????l.begin?=?t
????l.count?=?0
}
func?main()?{
????var?wg?sync.WaitGroup
????var?lr?Counter
????lr.Set(3,?time.Second)?//?1s內最多請求3次
????for?i?:=?0;?i?10;?i++?{
????????wg.Add(1)
????????log.Println("創(chuàng)建請求:",?i)
????????go?func(i?int)?{
??????????if?lr.Allow()?{
??????????????log.Println("響應請求:",?i)
??????????}
??????????wg.Done()
????????}(i)
????????time.Sleep(200?*?time.Millisecond)
????}
????wg.Wait()
}
OutPut:
2021/02/01?21:16:12?創(chuàng)建請求:?0
2021/02/01?21:16:12?響應請求:?0
2021/02/01?21:16:12?創(chuàng)建請求:?1
2021/02/01?21:16:12?響應請求:?1
2021/02/01?21:16:12?創(chuàng)建請求:?2
2021/02/01?21:16:13?創(chuàng)建請求:?3
2021/02/01?21:16:13?創(chuàng)建請求:?4
2021/02/01?21:16:13?創(chuàng)建請求:?5
2021/02/01?21:16:13?響應請求:?5
2021/02/01?21:16:13?創(chuàng)建請求:?6
2021/02/01?21:16:13?響應請求:?6
2021/02/01?21:16:13?創(chuàng)建請求:?7
2021/02/01?21:16:13?響應請求:?7
2021/02/01?21:16:14?創(chuàng)建請求:?8
2021/02/01?21:16:14?創(chuàng)建請求:?9
可以看到我們設置的是每200ms創(chuàng)建一個請求,明顯高于1秒最多3個請求的限制,運行起來之后發(fā)現(xiàn)編號為?2、3、4、8、9?的請求被丟棄,說明限流成功。
那么問題來了,如果有個需求對于某個接口?/query?每分鐘最多允許訪問 200 次,假設有個用戶在第 59 秒的最后幾毫秒瞬間發(fā)送 200 個請求,當 59 秒結束后?Counter?清零了,他在下一秒的時候又發(fā)送 200 個請求。那么在 1 秒鐘內這個用戶發(fā)送了 2 倍的請求,這個是符合我們的設計邏輯的,這也是計數(shù)器方法的設計缺陷,系統(tǒng)可能會承受惡意用戶的大量請求,甚至擊穿系統(tǒng)。
如下圖:

這種方法雖然簡單,但也有個大問題就是沒有很好的處理單位時間的邊界。
滑動窗口
滑動窗口是針對計數(shù)器存在的臨界點缺陷,所謂?滑動窗口(Sliding window)?是一種流量控制技術,這個詞出現(xiàn)在?TCP?協(xié)議中。滑動窗口把固定時間片進行劃分,并且隨著時間的流逝,進行移動,固定數(shù)量的可以移動的格子,進行計數(shù)并判斷閥值。
如 圖:

上圖中我們用紅色的虛線代表一個時間窗口(一分鐘),每個時間窗口有?6?個格子,每個格子是?10?秒鐘。每過?10?秒鐘時間窗口向右移動一格,可以看紅色箭頭的方向。我們?yōu)槊總€格子都設置一個獨立的計數(shù)器?Counter,假如一個請求在?0:45?訪問了那么我們將第五個格子的計數(shù)器?+1(也是就是?0:40~0:50),在判斷限流的時候需要把所有格子的計數(shù)加起來和設定的頻次進行比較即可。
那么滑動窗口如何解決我們上面遇到的問題呢?來看下面的圖:

當用戶在0:59?秒鐘發(fā)送了?200個請求就會被第六個格子的計數(shù)器記錄?+200,當下一秒的時候時間窗口向右移動了一個,此時計數(shù)器已經(jīng)記錄了該用戶發(fā)送的?200?個請求,所以再發(fā)送的話就會觸發(fā)限流,則拒絕新的請求。
其實計數(shù)器就是滑動窗口啊,只不過只有一個格子而已,所以想讓限流做的更精確只需要劃分更多的格子就可以了,為了更精確我們也不知道到底該設置多少個格子,格子的數(shù)量影響著滑動窗口算法的精度,依然有時間片的概念,無法根本解決臨界點問題。
相關算法實現(xiàn)?github.com/RussellLuo/slidingwindow
漏 桶
漏桶算法(Leaky Bucket),原理就是一個固定容量的漏桶,按照固定速率流出水滴。用過水龍頭都知道,打開龍頭開關水就會流下滴到水桶里,而漏桶指的是水桶下面有個漏洞可以出水。如果水龍頭開的特別大那么水流速就會過大,這樣就可能導致水桶的水滿了然后溢出。
如 圖:

一個固定容量的桶,有水流進來,也有水流出去。對于流進來的水來說,我們無法預計一共有多少水會流進來,也無法預計水流的速度。但是對于流出去的水來說,這個桶可以固定水流出的速率(處理速度),從而達到?流量整形?和?流量控制?的效果。
代碼實現(xiàn):
type?LeakyBucket?struct?{
????rate???????float64?//固定每秒出水速率
????capacity???float64?//桶的容量
????water??????float64?//桶中當前水量
????lastLeakMs?int64???//桶上次漏水時間戳?ms
????lock?sync.Mutex
}
func?(l?*LeakyBucket)?Allow()?bool?{
????l.lock.Lock()
????defer?l.lock.Unlock()
????now?:=?time.Now().UnixNano()?/?1e6
????eclipse?:=?float64((now?-?l.lastLeakMs))?*?l.rate?/?1000?//先執(zhí)行漏水
????l.water?=?l.water?-?eclipse??????????????????????????????//計算剩余水量
????l.water?=?math.Max(0,?l.water)???????????????????????????//桶干了
????l.lastLeakMs?=?now
????if?(l.water?+?1)?????????//?嘗試加水,并且水還未滿
????????l.water++
????????return?true
????}?else?{
????????//?水滿,拒絕加水
????????return?false
????}
}
func?(l?*LeakyBucket)?Set(r,?c?float64)?{
????l.rate?=?r
????l.capacity?=?c
????l.water?=?0
????l.lastLeakMs?=?time.Now().UnixNano()?/?1e6
}
漏桶算法有以下特點:
漏桶具有固定容量,出水速率是固定常量(流出請求) 如果桶是空的,則不需流出水滴 可以以任意速率流入水滴到漏桶(流入請求) 如果流入水滴超出了桶的容量,則流入的水滴溢出(新請求被拒絕)
漏桶限制的是常量流出速率(即流出速率是一個固定常量值),所以最大的速率就是出水的速率,不能出現(xiàn)突發(fā)流量。
令牌桶算法
令牌桶算法(Token Bucket)是網(wǎng)絡流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一種算法。典型情況下,令牌桶算法用來控制發(fā)送到網(wǎng)絡上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。

我們有一個固定的桶,桶里存放著令牌(token)。一開始桶是空的,系統(tǒng)按固定的時間(rate)往桶里添加令牌,直到桶里的令牌數(shù)滿,多余的請求會被丟棄。當請求來的時候,從桶里移除一個令牌,如果桶是空的則拒絕請求或者阻塞。
實現(xiàn)代碼:
type?TokenBucket?struct?{
????rate?????????int64?//固定的token放入速率,?r/s
????capacity?????int64?//桶的容量
????tokens???????int64?//桶中當前token數(shù)量
????lastTokenSec?int64?//桶上次放token的時間戳?s
????lock?sync.Mutex
}
func?(l?*TokenBucket)?Allow()?bool?{
????l.lock.Lock()
????defer?l.lock.Unlock()
????now?:=?time.Now().Unix()
????l.tokens?=?l.tokens?+?(now-l.lastTokenSec)*l.rate?//?先添加令牌
????if?l.tokens?>?l.capacity?{
????????l.tokens?=?l.capacity
????}
????l.lastTokenSec?=?now
????if?l.tokens?>?0?{
????????//?還有令牌,領取令牌
????????l.tokens--
????????return?true
????}?else?{
????????//?沒有令牌,則拒絕
????????return?false
????}
}
func?(l?*TokenBucket)?Set(r,?c?int64)?{
????l.rate?=?r
????l.capacity?=?c
????l.tokens?=?0
????l.lastTokenSec?=?time.Now().Unix()
}
令牌桶有以下特點:
令牌按固定的速率被放入令牌桶中 桶中最多存放? B?個令牌,當桶滿時,新添加的令牌被丟棄或拒絕如果桶中的令牌不足? N?個,則不會刪除令牌,且請求將被限流(丟棄或阻塞等待)
令牌桶限制的是平均流入速率(允許突發(fā)請求,只要有令牌就可以處理,支持一次拿3個令牌,4個令牌...),并允許一定程度突發(fā)流量。
小 結
目前常用的是令牌桶這種,本文介紹了幾種常見的限流算法實現(xiàn),文章撰寫不易,點個關注不迷路。
