一文講透自適應熔斷的原理和實現(xiàn)
// 為什么需要熔斷
微服務集群中,每個應用基本都會依賴一定數(shù)量的外部服務。有可能隨時都會遇到網(wǎng)絡連接緩慢,超時,依賴服務過載,服務不可用的情況,在高并發(fā)場景下如果此時調(diào)用方不做任何處理,繼續(xù)持續(xù)請求故障服務的話很容易引起整個微服務集群雪崩。比如高并發(fā)場景的用戶訂單服務,一般需要依賴一下服務:
商品服務
賬戶服務
庫存服務

假如此時 賬戶服務 過載,訂單服務持續(xù)請求賬戶服務只能被動的等待賬戶服務報錯或者請求超時,進而導致訂單請求被大量堆積,這些無效請求依然會占用系統(tǒng)資源:cpu,內(nèi)存,數(shù)據(jù)連接...導致訂單服務整體不可用。即使賬戶服務恢復了訂單服務也無法自我恢復。

這時如果有一個主動保護機制應對這種場景的話訂單服務至少可以保證自身的運行狀態(tài),等待賬戶服務恢復時訂單服務也同步自我恢復,這種自我保護機制在服務治理中叫熔斷機制。
熔斷
熔斷是調(diào)用方自我保護的機制(客觀上也能保護被調(diào)用方),熔斷對象是外部服務。
降級
降級是被調(diào)用方(服務提供者)的防止因自身資源不足導致過載的自我保護機制,降級對象是自身。

熔斷這一詞來源時我們?nèi)粘I铍娐防锩娴娜蹟嗥鳎斬撦d過高時(電流過大)保險絲會自行熔斷防止電路被燒壞,很多技術都是來自生活場景的提煉。
// 工作原理

熔斷器一般具有三個狀態(tài):
關閉:默認狀態(tài),請求能被到達目標服務,同時統(tǒng)計在窗口時間成功和失敗次數(shù),如果達到錯誤率閾值將會進入斷開狀態(tài)。
斷開:此狀態(tài)下將會直接返回錯誤,如果有 fallback 配置則直接調(diào)用 fallback 方法。
半斷開:進行斷開狀態(tài)會維護一個超市時間,到達超時時間開始進入 半斷開 狀態(tài),嘗試允許一部門請求正常通過并統(tǒng)計成功數(shù)量,如果請求正常則認為此時目標服務已恢復進入 關閉 狀態(tài),否則進入 斷開 狀態(tài)。半斷開 狀態(tài)存在的目的在于實現(xiàn)了自我修復,同時防止正在恢復的服務再次被大量打垮。
使用較多的熔斷組件:
hystrix circuit breaker(不再維護)
hystrix-go
resilience4j(推薦)
sentinel(推薦)
什么是自適應熔斷
基于上面提到的熔斷器原理,項目中我們要使用好熔斷器通常需要準備以下參數(shù):
錯誤比例閾值:達到該閾值進入 斷開 狀態(tài)。
斷開狀態(tài)超時時間:超時后進入 半斷開 狀態(tài)。
半斷開狀態(tài)允許請求數(shù)量。
窗口時間大小。
實際上可選的配置參數(shù)還有非常非常多,參考?https://resilience4j.readme.io/docs/circuitbreaker
對于經(jīng)驗不夠豐富的開發(fā)人員而言,這些參數(shù)設置多少合適心里其實并沒有底。
那么有沒有一種自適應的熔斷算法能讓我們不關注參數(shù),只要簡單配置就能滿足大部分場景?
其實是有的,google sre提供了一種自適應熔斷算法來計算丟棄請求的概率:

算法參數(shù):
requests:窗口時間內(nèi)的請求總數(shù)
accepts:正常請求數(shù)量
K:敏感度,K 越小越容易丟請求,一般推薦 1.5-2 之間
算法解釋:
正常情況下 requests=accepts,所以概率是 0。
隨著正常請求數(shù)量減少,當達到 requests == K* accepts 繼續(xù)請求時,概率 P 會逐漸比 0 大開始按照概率逐漸丟棄一些請求,如果故障嚴重則丟包會越來越多,假如窗口時間內(nèi) accepts==0 則完全熔斷。
當應用逐漸恢復正常時,accepts、requests 同時都在增加,但是 K*accepts 會比 requests 增加的更快,所以概率很快就會歸 0,關閉熔斷。
// 代碼實現(xiàn)
接下來思考一個熔斷器如何實現(xiàn)。
初步思路是:
無論什么熔斷器都得依靠指標統(tǒng)計來轉換狀態(tài),而統(tǒng)計指標一般要求是最近的一段時間內(nèi)的數(shù)據(jù)(太久的數(shù)據(jù)沒有參考意義也浪費空間),所以通常采用一個 滑動時間窗口 數(shù)據(jù)結構 來存儲統(tǒng)計數(shù)據(jù)。同時熔斷器的狀態(tài)也需要依靠指標統(tǒng)計來實現(xiàn)可觀測性,我們實現(xiàn)任何系統(tǒng)第一步需要考慮就是可觀測性,不然系統(tǒng)就是一個黑盒。
外部服務請求結果各式各樣,所以需要提供一個自定義的判斷方法,判斷請求是否成功。可能是 http.code 、rpc.code、body.code,熔斷器需要實時收集此數(shù)據(jù)。
當外部服務被熔斷時使用者往往需要自定義快速失敗的邏輯,考慮提供自定義的 fallback() 功能。
下面來逐步分析 go-zero 的源碼實現(xiàn):
core/breaker/breaker.go
熔斷器接口定義
兵馬未動,糧草先行,明確了需求后就可以開始規(guī)劃定義接口了,接口是我們編碼思維抽象的第一步也是最重要的一步。
核心定義包含兩種類型的方法:
Allow():需要手動回調(diào)請求結果至熔斷器,相當于手動擋。
DoXXX():自動回調(diào)請求結果至熔斷器,相當于自動擋,實際上 DoXXX() 類型方法最后都是調(diào)用
DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
//?自定義判定執(zhí)行結果
Acceptable?func(err?error)?bool
//?手動回調(diào)
Promise?interface?{
????//?Accept?tells?the?Breaker?that?the?call?is?successful.
????//?請求成功
????Accept()
????//?Reject?tells?the?Breaker?that?the?call?is?failed.
????//?請求失敗
????Reject(reason?string)
}???
Breaker?interface?{
????//?熔斷器名稱
????Name()?string
????//?熔斷方法,執(zhí)行請求時必須手動上報執(zhí)行結果
????//?適用于簡單無需自定義快速失敗,無需自定義判定請求結果的場景
????//?相當于手動擋。。。
????Allow()?(Promise,?error)
????//?熔斷方法,自動上報執(zhí)行結果
????//?自動擋。。。
????Do(req?func()?error)?error
????//?熔斷方法
????//?acceptable?-?支持自定義判定執(zhí)行結果
????DoWithAcceptable(req?func()?error,?acceptable?Acceptable)?error
????//?熔斷方法
????//?fallback?-?支持自定義快速失敗
????DoWithFallback(req?func()?error,?fallback?func(err?error)?error)?error
????//?熔斷方法
????//?fallback?-?支持自定義快速失敗
????//?acceptable?-?支持自定義判定執(zhí)行結果
????DoWithFallbackAcceptable(req?func()?error,?fallback?func(err?error)?error,?acceptable?Acceptable)?error
}
熔斷器實現(xiàn)
circuitBreaker 繼承 throttle,實際上這里相當于靜態(tài)代理,代理模式可以在不改變原有對象的基礎上增強功能,后面我們會看到 go-zero 這樣做的原因是為了收集熔斷器錯誤數(shù)據(jù),也就是為了實現(xiàn)可觀測性。
熔斷器實現(xiàn)采用靜態(tài)代理模式,看起來稍微有點繞腦。

//?熔斷器結構體
circuitBreaker?struct?{
????name?string
????//?實際上?circuitBreaker熔斷功能都代理給?throttle來實現(xiàn)
????throttle
}
//?熔斷器接口
throttle?interface?{
????//?熔斷方法
????allow()?(Promise,?error)
????//?熔斷方法
????//?DoXXX()方法最終都會該方法
????doReq(req?func()?error,?fallback?func(err?error)?error,?acceptable?Acceptable)?error
}
func?(cb?*circuitBreaker)?Allow()?(Promise,?error)?{
????return?cb.throttle.allow()
}
func?(cb?*circuitBreaker)?Do(req?func()?error)?error?{
??return?cb.throttle.doReq(req,?nil,?defaultAcceptable)
}
func?(cb?*circuitBreaker)?DoWithAcceptable(req?func()?error,?acceptable?Acceptable)?error?{
??return?cb.throttle.doReq(req,?nil,?acceptable)
}
func?(cb?*circuitBreaker)?DoWithFallback(req?func()?error,?fallback?func(err?error)?error)?error?{
??return?cb.throttle.doReq(req,?fallback,?defaultAcceptable)
}
func?(cb?*circuitBreaker)?DoWithFallbackAcceptable(req?func()?error,?fallback?func(err?error)?error,
??acceptable?Acceptable)?error?{
????return?cb.throttle.doReq(req,?fallback,?acceptable)
}???????
throttle 接口實現(xiàn)類:
loggedThrottle 增加了為了收集錯誤日志的滾動窗口,目的是為了收集當請求失敗時的錯誤日志。
//?帶日志功能的熔斷器
type?loggedThrottle?struct?{
????//?名稱
????name?string
????//?代理對象
????internalThrottle
????//?滾動窗口,滾動收集數(shù)據(jù),相當于環(huán)形數(shù)組
????errWin?*errorWindow
}
//?熔斷方法
func?(lt?loggedThrottle)?allow()?(Promise,?error)?{
????promise,?err?:=?lt.internalThrottle.allow()
????return?promiseWithReason{
????????promise:?promise,
????????errWin:??lt.errWin,
????},?lt.logError(err)
}
//?熔斷方法
func?(lt?loggedThrottle)?doReq(req?func()?error,?fallback?func(err?error)?error,?acceptable?Acceptable)?error?{
????return?lt.logError(lt.internalThrottle.doReq(req,?fallback,?func(err?error)?bool?{
????????accept?:=?acceptable(err)
????????if?!accept?{
????????????lt.errWin.add(err.Error())
????????}
????????return?accept
????}))
}
func?(lt?loggedThrottle)?logError(err?error)?error?{
????if?err?==?ErrServiceUnavailable?{
????????//?if?circuit?open,?not?possible?to?have?empty?error?window
????????stat.Report(fmt.Sprintf(
????????????"proc(%s/%d),?callee:?%s,?breaker?is?open?and?requests?dropped\nlast?errors:\n%s",
????????????proc.ProcessName(),?proc.Pid(),?lt.name,?lt.errWin))
????}
????return?err
}??
錯誤日志收集 errorWindow
errorWindow 是一個環(huán)形數(shù)組,新數(shù)據(jù)不斷滾動覆蓋最舊的數(shù)據(jù),通過取余實現(xiàn)。
//?滾動窗口
type?errorWindow?struct?{
????reasons?[numHistoryReasons]string
????index???int
????count???int
????lock????sync.Mutex
}
//?添加數(shù)據(jù)
func?(ew?*errorWindow)?add(reason?string)?{
????ew.lock.Lock()
????//?添加錯誤日志
????ew.reasons[ew.index]?=?fmt.Sprintf("%s?%s",?timex.Time().Format(timeFormat),?reason)
????//?更新index,為下一次寫入數(shù)據(jù)做準備
????//?這里用的取模實現(xiàn)了滾動功能
????ew.index?=?(ew.index?+?1)?%?numHistoryReasons
????//?統(tǒng)計數(shù)量
????ew.count?=?mathx.MinInt(ew.count+1,?numHistoryReasons)
????ew.lock.Unlock()
}
//?格式化錯誤日志
func?(ew?*errorWindow)?String()?string?{
????var?reasons?[]string
????ew.lock.Lock()
????//?reverse?order
????for?i?:=?ew.index?-?1;?i?>=?ew.index-ew.count;?i--?{
????????reasons?=?append(reasons,?ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
????}
????ew.lock.Unlock()
????return?strings.Join(reasons,?"\n")
}
看到這里我們還沒看到實際的熔斷器實現(xiàn),實際上真正的熔斷操作被代理給了 internalThrottle 對象。
internalThrottle?interface?{
????allow()?(internalPromise,?error)
????doReq(req?func()?error,?fallback?func(err?error)?error,?acceptable?Acceptable)?error
}
internalThrottle 接口實現(xiàn) googleBreaker 結構體定義
type?googleBreaker?struct?{
????//?敏感度,go-zero中默認值為1.5
????k?float64
????//?滑動窗口,用于記錄最近一段時間內(nèi)的請求總數(shù),成功總數(shù)
????stat?*collection.RollingWindow
????//?概率生成器
????//?隨機產(chǎn)生0.0-1.0之間的雙精度浮點數(shù)
????proba?*mathx.Proba
}
可以看到熔斷器屬性其實非常簡單,數(shù)據(jù)統(tǒng)計采用的是滑動時間窗口來實現(xiàn)。
RollingWindow 滑動窗口
滑動窗口屬于比較通用的數(shù)據(jù)結構,常用于最近一段時間內(nèi)的行為數(shù)據(jù)統(tǒng)計。
它的實現(xiàn)非常有意思,尤其是如何模擬窗口滑動過程。
先來看滑動窗口的結構體定義:
RollingWindow?struct?{
????//?互斥鎖
????lock?sync.RWMutex
????//?滑動窗口數(shù)量
????size?int
????//?窗口,數(shù)據(jù)容器
????win?*window
????//?滑動窗口單元時間間隔
????interval?time.Duration
????//?游標,用于定位當前應該寫入哪個bucket
????offset?int
????//?匯總數(shù)據(jù)時,是否忽略當前正在寫入桶的數(shù)據(jù)
????//?某些場景下因為當前正在寫入的桶數(shù)據(jù)并沒有經(jīng)過完整的窗口時間間隔
????//?可能導致當前桶的統(tǒng)計并不準確
????ignoreCurrent?bool
????//?最后寫入桶的時間
????//?用于計算下一次寫入數(shù)據(jù)間隔最后一次寫入數(shù)據(jù)的之間
????//?經(jīng)過了多少個時間間隔
????lastTime??????time.Duration?
}

window 是數(shù)據(jù)的實際存儲位置,其實就是一個數(shù)組,提供向指定 offset 添加數(shù)據(jù)與清除操作。數(shù)組里面按照 internal 時間間隔分隔成多個 bucket。
//?時間窗口
type?window?struct?{
????//?桶
????//?一個桶標識一個時間間隔
????buckets?[]*Bucket
????//?窗口大小
????size?int
}
//?添加數(shù)據(jù)
//?offset?-?游標,定位寫入bucket位置
//?v?-?行為數(shù)據(jù)
func?(w?*window)?add(offset?int,?v?float64)?{
????w.buckets[offset%w.size].add(v)
}
//?匯總數(shù)據(jù)
//?fn?-?自定義的bucket統(tǒng)計函數(shù)
func?(w?*window)?reduce(start,?count?int,?fn?func(b?*Bucket))?{
????for?i?:=?0;?i?????????fn(w.buckets[(start+i)%w.size])
????}
}
//?清理特定bucket
func?(w?*window)?resetBucket(offset?int)?{
????w.buckets[offset%w.size].reset()
}
//?桶
type?Bucket?struct?{
????//?當前桶內(nèi)值之和
????Sum?float64
????//?當前桶的add總次數(shù)
????Count?int64
}
//?向桶添加數(shù)據(jù)
func?(b?*Bucket)?add(v?float64)?{
????//?求和
????b.Sum?+=?v
????//?次數(shù)+1
????b.Count++
}
//?桶數(shù)據(jù)清零
func?(b?*Bucket)?reset()?{
????b.Sum?=?0
????b.Count?=?0
}
window 添加數(shù)據(jù):
計算當前時間距離上次添加時間經(jīng)過了多少個 時間間隔,實際上就是過期了幾個 bucket。
清理過期桶的數(shù)據(jù)
更新 offset,更新 offset 的過程實際上就是在模擬窗口滑動
添加數(shù)據(jù)

//?添加數(shù)據(jù)
func?(rw?*RollingWindow)?Add(v?float64)?{
????rw.lock.Lock()
????defer?rw.lock.Unlock()
????//?獲取當前寫入的下標
????rw.updateOffset()
????//?添加數(shù)據(jù)
????rw.win.add(rw.offset,?v)
}
//?計算當前距離最后寫入數(shù)據(jù)經(jīng)過多少個單元時間間隔
//?實際上指的就是經(jīng)過多少個桶
func?(rw?*RollingWindow)?span()?int?{
????offset?:=?int(timex.Since(rw.lastTime)?/?rw.interval)
????if?0?<=?offset?&&?offset?????????return?offset
????}
????//?大于時間窗口時?返回窗口大小即可
????return?rw.size
}
//?更新當前時間的offset
//?實現(xiàn)窗口滑動
func?(rw?*RollingWindow)?updateOffset()?{
????//?經(jīng)過span個桶的時間
????span :=?rw.span()
????//?還在同一單元時間內(nèi)不需要更新
????if?span <=?0?{
????????return
????}
????offset?:=?rw.offset
????//?既然經(jīng)過了span個桶的時間沒有寫入數(shù)據(jù)
????//?那么這些桶內(nèi)的數(shù)據(jù)就不應該繼續(xù)保留了,屬于過期數(shù)據(jù)清空即可
????//?可以看到這里全部用的?%?取余操作,可以實現(xiàn)按照下標周期性寫入
????//?如果超出下標了那就從頭開始寫,確保新數(shù)據(jù)一定能夠正常寫入
????//?類似循環(huán)數(shù)組的效果
????for?i?:=?0;?i?????????rw.win.resetBucket((offset?+?i?+?1)?%?rw.size)
????}
????//?更新offset
????rw.offset?=?(offset?+?span)?%?rw.size
????now?:=?timex.Now()
????//?更新操作時間
????//?這里很有意思
????rw.lastTime?=?now?-?(now-rw.lastTime)%rw.interval
}
window 統(tǒng)計數(shù)據(jù):
//?歸納匯總數(shù)據(jù)
func?(rw?*RollingWindow)?Reduce(fn?func(b?*Bucket))?{
????rw.lock.RLock()
????defer?rw.lock.RUnlock()
????var?diff?int
????span :=?rw.span()
????//?當前時間截止前,未過期桶的數(shù)量
????if?span ==?0?&&?rw.ignoreCurrent?{
????????diff?=?rw.size?-?1
????}?else?{
????????diff?=?rw.size?-?span
????}
????if?diff?>?0?{
????????//?rw.offset?-?rw.offset+span之間的桶數(shù)據(jù)是過期的不應該計入統(tǒng)計
????????offset?:=?(rw.offset?+?span +?1)?%?rw.size
????????//?匯總數(shù)據(jù)
????????rw.win.reduce(offset,?diff,?fn)
????}
}
googleBreaker 判斷是否應該熔斷
收集滑動窗口內(nèi)的統(tǒng)計數(shù)據(jù)
計算熔斷概率
//?按照最近一段時間的請求數(shù)據(jù)計算是否熔斷
func?(b?*googleBreaker)?accept()?error?{
????//?獲取最近一段時間的統(tǒng)計數(shù)據(jù)
????accepts,?total?:=?b.history()
????//?計算動態(tài)熔斷概率
????weightedAccepts?:=?b.k?*?float64(accepts)
????//?https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
????dropRatio?:=?math.Max(0,?(float64(total-protection)-weightedAccepts)/float64(total+1))
????//?概率為0,通過
????if?dropRatio?<=?0?{
????????return?nil
????}
????//?隨機產(chǎn)生0.0-1.0之間的隨機數(shù)與上面計算出來的熔斷概率相比較
????//?如果隨機數(shù)比熔斷概率小則進行熔斷
????if?b.proba.TrueOnProba(dropRatio)?{
????????return?ErrServiceUnavailable
????}
????return?nil
}
googleBreaker 熔斷邏輯實現(xiàn)
熔斷器對外暴露兩種類型的方法
簡單場景直接判斷對象是否被熔斷,執(zhí)行請求后必須需手動上報執(zhí)行結果至熔斷器。
func (b *googleBreaker) allow() (internalPromise, error)
復雜場景下支持自定義快速失敗,自定義判定請求是否成功的熔斷方法,自動上報執(zhí)行結果至熔斷器。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error)
Acceptable 參數(shù)目的是自定義判斷請求是否成功。
Acceptable?func(err?error)?bool
//?熔斷方法
//?返回一個promise異步回調(diào)對象,可由開發(fā)者自行決定是否上報結果到熔斷器
func?(b?*googleBreaker)?allow()?(internalPromise,?error)?{
????if?err?:=?b.accept();?err?!=?nil?{
????????return?nil,?err
????}
????return?googlePromise{
????????b:?b,
????},?nil
}
//?熔斷方法
//?req?-?熔斷對象方法
//?fallback?-?自定義快速失敗函數(shù),可對熔斷產(chǎn)生的err進行包裝后返回
//?acceptable?-?對本次未熔斷時執(zhí)行請求的結果進行自定義的判定,比如可以針對http.code,rpc.code,body.code
func?(b?*googleBreaker)?doReq(req?func()?error,?fallback?func(err?error)?error,?acceptable?Acceptable)?error?{
????//?判定是否熔斷
????if?err?:=?b.accept();?err?!=?nil?{
????????//?熔斷中,如果有自定義的fallback則執(zhí)行
????????if?fallback?!=?nil?{
????????????return?fallback(err)
????????}
????????return?err
????}
????//?如果執(zhí)行req()過程發(fā)生了panic,依然判定本次執(zhí)行失敗上報至熔斷器
????defer?func()?{
????????if?e?:=?recover();?e?!=?nil?{
????????????b.markFailure()
????????????panic(e)
????????}
????}()
????//?執(zhí)行請求
????err?:=?req()
????//?判定請求成功
????if?acceptable(err)?{
????????b.markSuccess()
????}?else?{
????????b.markFailure()
????}
????return?err
}
//?上報成功
func?(b?*googleBreaker)?markSuccess()?{
????b.stat.Add(1)
}
//?上報失敗
func?(b?*googleBreaker)?markFailure()?{
????b.stat.Add(0)
}
//?統(tǒng)計數(shù)據(jù)
func?(b?*googleBreaker)?history()?(accepts,?total?int64)?{
????b.stat.Reduce(func(b?*collection.Bucket)?{
????????accepts?+=?int64(b.Sum)
????????total?+=?b.Count
????})
????return
}
// 資料
微軟 azure 關于熔斷器設計模式(https://docs.microsoft.com/en-us/previous-versions/msp-n-p/dn589784(v=pandp.10)
索尼參考微軟的文檔開源的熔斷器實現(xiàn)?(https://github.com/sony/gobreaker)
go-zero 自適應熔斷器文檔(https://go-zero.dev/cn/breaker-algorithms.html)
// 項目地址
https://github.com/zeromicro/go-zero
歡迎使用?go-zero?并?star?支持我們!
想要了解關于 Go 的更多資訊,還可以通過掃描的方式,進群一起探討哦~
