go-zero 的自適應(yīng)熔斷器
上篇文章我們介紹了微服務(wù)的限流,詳細(xì)分析了計(jì)數(shù)器限流和令牌桶限流算法,這篇文章來(lái)說(shuō)說(shuō)熔斷。
熔斷和限流還不太一樣,限流是控制請(qǐng)求速率,只要還能承受,那么都會(huì)處理,但熔斷不是。
在一條調(diào)用鏈上,如果發(fā)現(xiàn)某個(gè)服務(wù)異常,比如響應(yīng)超時(shí)。那么調(diào)用者為了避免過(guò)多請(qǐng)求導(dǎo)致資源消耗過(guò)大,最終引發(fā)系統(tǒng)雪崩,會(huì)直接返回錯(cuò)誤,而不是瘋狂調(diào)用這個(gè)服務(wù)。
本篇文章會(huì)介紹主流熔斷器的工作原理,并且會(huì)借助 go-zero 源碼,分析 googleBreaker 是如何通過(guò)滑動(dòng)窗口來(lái)統(tǒng)計(jì)流量,并且最終執(zhí)行熔斷的。
工作原理
這部分主要介紹兩種熔斷器的工作原理,分別是 Netflix 開(kāi)源的 Hystrix,其也是 Spring Cloud 默認(rèn)的熔斷組件,和 Google 的自適應(yīng)的熔斷器。
Hystrix is no longer in active development, and is currently in maintenance mode.
注意,Hystrix 官方已經(jīng)宣布不再積極開(kāi)發(fā)了,目前處在維護(hù)模式。
Hystrix 官方推薦替代的開(kāi)源組件:Resilience4j,還有阿里開(kāi)源的 Sentinel 也是不錯(cuò)的替代品。
hystrixBreaker
Hystrix 采用了熔斷器模式,相當(dāng)于電路中的保險(xiǎn)絲,系統(tǒng)出現(xiàn)緊急問(wèn)題,立刻禁止所有請(qǐng)求,已達(dá)到保護(hù)系統(tǒng)的作用。

系統(tǒng)需要維護(hù)三種狀態(tài),分別是:
- 關(guān)閉: 默認(rèn)狀態(tài),所有請(qǐng)求全部能夠通過(guò)。當(dāng)請(qǐng)求失敗數(shù)量增加,失敗率超過(guò)閾值時(shí),會(huì)進(jìn)入到斷開(kāi)狀態(tài)。
- 斷開(kāi): 此狀態(tài)下,所有請(qǐng)求都會(huì)被攔截。當(dāng)經(jīng)過(guò)一段超時(shí)時(shí)間后,會(huì)進(jìn)入到半斷開(kāi)狀態(tài)。
- 半斷開(kāi): 此狀態(tài)下會(huì)允許一部分請(qǐng)求通過(guò),并統(tǒng)計(jì)成功數(shù)量,當(dāng)請(qǐng)求成功時(shí),恢復(fù)到關(guān)閉狀態(tài),否則繼續(xù)斷開(kāi)。
通過(guò)狀態(tài)的變更,可以有效防止系統(tǒng)雪崩的問(wèn)題。同時(shí),在半斷開(kāi)狀態(tài)下,又可以讓系統(tǒng)進(jìn)行自我修復(fù)。
googleBreaker
googleBreaker 實(shí)現(xiàn)了一種自適應(yīng)的熔斷模式,來(lái)看一下算法的計(jì)算公式,客戶(hù)端請(qǐng)求被拒絕的概率。

參數(shù)很少,也比較好理解:
- requests:請(qǐng)求數(shù)量
- accepts:后端接收的請(qǐng)求數(shù)量
- K:敏感度,一般推薦 1.5-2 之間
通過(guò)分析公式,我們可以得到下面幾個(gè)結(jié)論,也就是產(chǎn)生熔斷的實(shí)際原理:
- 正常情況下,requests 和 accepts 是相等的,拒絕的概率就是 0,沒(méi)有產(chǎn)生熔斷
- 當(dāng)正常請(qǐng)求量,也就是 accepts 減少時(shí),概率會(huì)逐漸增加,當(dāng)概率大于 0 時(shí),就會(huì)產(chǎn)生熔斷。如果 accepts 等于 0 了,則完全熔斷。
- 當(dāng)服務(wù)恢復(fù)后,requests 和 accepts 的數(shù)量會(huì)同時(shí)增加,但由于 K * accepts 增長(zhǎng)的更快,所以概率又會(huì)很快變回到 0,相當(dāng)于關(guān)閉了熔斷。
總的來(lái)說(shuō),googleBreaker 的實(shí)現(xiàn)方案更加優(yōu)雅,而且參數(shù)也少,不用維護(hù)那么多的狀態(tài)。
go-zero 就是采用了 googleBreaker 的方案,下面就來(lái)分析代碼,看看到底是怎么實(shí)現(xiàn)的。
接口設(shè)計(jì)
接口定義這部分我個(gè)人感覺(jué)還是挺不好理解的,看了好多遍才理清了它們之間的關(guān)系。
其實(shí)看代碼和看書(shū)是一樣的,書(shū)越看越薄,代碼會(huì)越看越短。剛開(kāi)始看感覺(jué)代碼很長(zhǎng),隨著看懂的地方越來(lái)越多,明顯感覺(jué)代碼變短了。所以遇到不懂的代碼不要怕,反復(fù)看,總會(huì)看懂的。

首先來(lái)看一下 breaker 部分的 UML 圖,有了這張圖,很多地方看起來(lái)還是相對(duì)清晰的,下面來(lái)詳細(xì)分析。
這里用到了靜態(tài)代理模式,也可以說(shuō)是接口裝飾器,接下來(lái)就看看到底是怎么定義的:
// core/breaker/breaker.go
internalThrottle interface {
allow() (internalPromise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
// core/breaker/googlebreaker.go
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
這個(gè)接口是最終實(shí)現(xiàn)熔斷方法的接口,由 googleBreaker 結(jié)構(gòu)體實(shí)現(xiàn)。
// core/breaker/breaker.go
throttle interface {
allow() (Promise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
type loggedThrottle struct {
name string
internalThrottle
errWin *errorWindow
}
func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
return loggedThrottle{
name: name,
internalThrottle: t,
errWin: new(errorWindow),
}
}
這個(gè)是實(shí)現(xiàn)了日志收集的結(jié)構(gòu)體,首先它實(shí)現(xiàn)了 throttle 接口,然后它包含了一個(gè)字段 internalThrottle,相當(dāng)于具體的熔斷方法是代理給 internalThrottle 來(lái)做的。
// core/breaker/breaker.go
func (lt loggedThrottle) allow() (Promise, error) {
promise, err := lt.internalThrottle.allow()
return promiseWithReason{
promise: promise,
errWin: lt.errWin,
}, lt.logError(err)
}
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
accept := acceptable(err)
if !accept && err != nil {
lt.errWin.add(err.Error())
}
return accept
}))
}
所以當(dāng)它執(zhí)行相應(yīng)方法時(shí),都是直接調(diào)用 internalThrottle 接口的方法,然后再加上自己的邏輯。
這也就是代理所起到的作用,在不改變?cè)椒ǖ幕A(chǔ)上,擴(kuò)展原方法的功能。
// core/breaker/breaker.go
circuitBreaker struct {
name string
throttle
}
// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
var b circuitBreaker
for _, opt := range opts {
opt(&b)
}
if len(b.name) == 0 {
b.name = stringx.Rand()
}
b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
return &b
}
最終的熔斷器又將功能代理給了 throttle。
這就是它們之間的關(guān)系,如果感覺(jué)有點(diǎn)亂的話(huà),就反復(fù)看,看的次數(shù)多了,就清晰了。
日志收集
上文介紹過(guò)了,loggedThrottle 是為了記錄日志而設(shè)計(jì)的代理層,這部分內(nèi)容來(lái)分析一下是如何記錄日志的。
// core/breaker/breaker.go
type errorWindow struct {
// 記錄日志的數(shù)組
reasons [numHistoryReasons]string
// 索引
index int
// 數(shù)組元素?cái)?shù)量,小于等于 numHistoryReasons
count int
lock sync.Mutex
}
func (ew *errorWindow) add(reason string) {
ew.lock.Lock()
// 記錄錯(cuò)誤日志內(nèi)容
ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
// 對(duì) numHistoryReasons 進(jìn)行取余來(lái)得到數(shù)組索引
ew.index = (ew.index + 1) % numHistoryReasons
ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
ew.lock.Unlock()
}
func (ew *errorWindow) String() string {
var reasons []string
ew.lock.Lock()
// reverse order
for i := ew.index - 1; i >= ew.index-ew.count; i-- {
reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
}
ew.lock.Unlock()
return strings.Join(reasons, "\n")
}
核心就是這里采用了一個(gè)環(huán)形數(shù)組,通過(guò)維護(hù)兩個(gè)字段來(lái)實(shí)現(xiàn),分別是 index 和 count。
count 表示數(shù)組中元素的個(gè)數(shù),最大值是數(shù)組的長(zhǎng)度;index 是索引,每次 +1,然后對(duì)數(shù)組長(zhǎng)度取余得到新索引。
我之前有一次面試就讓我設(shè)計(jì)一個(gè)環(huán)形數(shù)組,當(dāng)時(shí)答的還不是很好,這次算是學(xué)會(huì)了。
滑動(dòng)窗口
一般來(lái)說(shuō),想要判斷是否需要觸發(fā)熔斷,那么首先要知道一段時(shí)間的請(qǐng)求數(shù)量,一段時(shí)間內(nèi)的數(shù)量統(tǒng)計(jì)可以使用滑動(dòng)窗口來(lái)實(shí)現(xiàn)。
首先看一下滑動(dòng)窗口的定義:
// core/collection/rollingwindow.go
type RollingWindow struct {
lock sync.RWMutex
// 窗口大小
size int
// 窗口數(shù)據(jù)容器
win *window
// 時(shí)間間隔
interval time.Duration
// 游標(biāo),用于定位當(dāng)前應(yīng)該寫(xiě)入哪個(gè) bucket
offset int
// 匯總數(shù)據(jù)時(shí),是否忽略當(dāng)前正在寫(xiě)入桶的數(shù)據(jù)
// 某些場(chǎng)景下因?yàn)楫?dāng)前正在寫(xiě)入的桶數(shù)據(jù)并沒(méi)有經(jīng)過(guò)完整的窗口時(shí)間間隔
// 可能導(dǎo)致當(dāng)前桶的統(tǒng)計(jì)并不準(zhǔn)確
ignoreCurrent bool
// 最后寫(xiě)入桶的時(shí)間
// 用于計(jì)算下一次寫(xiě)入數(shù)據(jù)間隔最后一次寫(xiě)入數(shù)據(jù)的之間
// 經(jīng)過(guò)了多少個(gè)時(shí)間間隔
lastTime time.Duration // start time of the last bucket
}
再來(lái)看一下 window 的結(jié)構(gòu):
type Bucket struct {
// 桶內(nèi)值的和
Sum float64
// 桶內(nèi) add 次數(shù)
Count int64
}
func (b *Bucket) add(v float64) {
b.Sum += v
b.Count++
}
func (b *Bucket) reset() {
b.Sum = 0
b.Count = 0
}
type window struct {
// 桶,一個(gè)桶就是一個(gè)時(shí)間間隔
buckets []*Bucket
// 窗口大小,也就是桶的數(shù)量
size int
}
有了這兩個(gè)結(jié)構(gòu)之后,我們就可以畫(huà)出這個(gè)滑動(dòng)窗口了,如圖所示。

現(xiàn)在來(lái)看一下向窗口中添加數(shù)據(jù),是怎樣一個(gè)過(guò)程。
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// 獲取當(dāng)前寫(xiě)入下標(biāo)
rw.updateOffset()
// 向 bucket 中寫(xiě)入數(shù)據(jù)
rw.win.add(rw.offset, v)
}
func (rw *RollingWindow) span() int {
// 計(jì)算距離 lastTime 經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè)桶
offset := int(timex.Since(rw.lastTime) / rw.interval)
// 如果在窗口范圍內(nèi),返回實(shí)際值,否則返回窗口大小
if 0 <= offset && offset < rw.size {
return offset
}
return rw.size
}
func (rw *RollingWindow) updateOffset() {
// 經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè)桶
span := rw.span()
// 還在同一單元時(shí)間內(nèi)不需要更新
if span <= 0 {
return
}
offset := rw.offset
// reset expired buckets
// 這里是清除過(guò)期桶的數(shù)據(jù)
// 也是對(duì)數(shù)組大小進(jìn)行取余的方式,類(lèi)似上文介紹的環(huán)形數(shù)組
for i := 0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
// 更新游標(biāo)
rw.offset = (offset + span) % rw.size
now := timex.Now()
// align to interval time boundary
// 這里應(yīng)該是一個(gè)時(shí)間的對(duì)齊,保持在桶內(nèi)指向位置是一致的
rw.lastTime = now - (now-rw.lastTime)%rw.interval
}
// 向桶內(nèi)添加數(shù)據(jù)
func (w *window) add(offset int, v float64) {
// 根據(jù) offset 對(duì)數(shù)組大小取余得到索引,然后添加數(shù)據(jù)
w.buckets[offset%w.size].add(v)
}
// 重置桶數(shù)據(jù)
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
我畫(huà)了一張圖,來(lái)模擬整個(gè)滑動(dòng)過(guò)程:

主要經(jīng)歷 4 個(gè)步驟:
- 計(jì)算當(dāng)前時(shí)間距離上次添加時(shí)間經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè) bucket
- 清理過(guò)期桶數(shù)據(jù)
- 更新 offset,更新 offset 的過(guò)程實(shí)際就是模擬窗口滑動(dòng)的過(guò)程
- 添加數(shù)據(jù)
比如上圖,剛開(kāi)始 offset 指向了 bucket[1],經(jīng)過(guò)了兩個(gè) span 之后,bucket[2] 和 bucket[3] 會(huì)被清空,同時(shí),新的 offset 會(huì)指向 bucket[3],新添加的數(shù)據(jù)會(huì)寫(xiě)入到 bucket[3]。
再來(lái)看看數(shù)據(jù)統(tǒng)計(jì),也就是窗口內(nèi)的有效數(shù)據(jù)量是多少。
// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span := rw.span()
// ignore current bucket, because of partial data
if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1
} else {
diff = rw.size - span
}
// 需要統(tǒng)計(jì)的 bucket 數(shù)量,窗口大小減去 span 數(shù)量
if diff > 0 {
// 獲取統(tǒng)計(jì)的起始位置,span 是已經(jīng)被重置的 bucket
offset := (rw.offset + span + 1) % rw.size
rw.win.reduce(offset, diff, fn)
}
}
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i := 0; i < count; i++ {
// 自定義統(tǒng)計(jì)函數(shù)
fn(w.buckets[(start+i)%w.size])
}
}
統(tǒng)計(jì)出窗口數(shù)據(jù)之后,就可以判斷是否需要熔斷了。
執(zhí)行熔斷
接下來(lái)就是執(zhí)行熔斷了,主要就是看看自適應(yīng)熔斷是如何實(shí)現(xiàn)的。
// core/breaker/googlebreaker.go
const (
// 250ms for bucket duration
window = time.Second * 10
buckets = 40
k = 1.5
protection = 5
)
窗口的定義部分,整個(gè)窗口是 10s,然后分成 40 個(gè) bucket,每個(gè) bucket 就是 250ms。
// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
func (b *googleBreaker) accept() error {
// 獲取最近一段時(shí)間的統(tǒng)計(jì)數(shù)據(jù)
accepts, total := b.history()
// 根據(jù)上文提到的算法來(lái)計(jì)算一個(gè)概率
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// 如果小于等于 0 直接通過(guò),不熔斷
if dropRatio <= 0 {
return nil
}
// 隨機(jī)產(chǎn)生 0.0-1.0 之間的隨機(jī)數(shù)與上面計(jì)算出來(lái)的熔斷概率相比較
// 如果隨機(jī)數(shù)比熔斷概率小則進(jìn)行熔斷
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})
return
}
以上就是自適應(yīng)熔斷的邏輯,通過(guò)概率的比較來(lái)隨機(jī)淘汰掉部分請(qǐng)求,然后隨著服務(wù)恢復(fù),淘汰的請(qǐng)求會(huì)逐漸變少,直至不淘汰。
func (b *googleBreaker) allow() (internalPromise, error) {
if err := b.accept(); err != nil {
return nil, err
}
// 返回一個(gè) promise 異步回調(diào)對(duì)象,可由開(kāi)發(fā)者自行決定是否上報(bào)結(jié)果到熔斷器
return googlePromise{
b: b,
}, nil
}
// req - 熔斷對(duì)象方法
// fallback - 自定義快速失敗函數(shù),可對(duì)熔斷產(chǎn)生的err進(jìn)行包裝后返回
// acceptable - 對(duì)本次未熔斷時(shí)執(zhí)行請(qǐng)求的結(jié)果進(jìn)行自定義的判定,比如可以針對(duì)http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
// 熔斷中,如果有自定義的fallback則執(zhí)行
if fallback != nil {
return fallback(err)
}
return err
}
defer func() {
// 如果執(zhí)行req()過(guò)程發(fā)生了panic,依然判定本次執(zhí)行失敗上報(bào)至熔斷器
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
err := req()
// 上報(bào)結(jié)果
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
熔斷器對(duì)外暴露兩種類(lèi)型的方法:
1、簡(jiǎn)單場(chǎng)景直接判斷對(duì)象是否被熔斷,執(zhí)行請(qǐng)求后必須需手動(dòng)上報(bào)執(zhí)行結(jié)果至熔斷器。
func (b *googleBreaker) allow() (internalPromise, error)
2、復(fù)雜場(chǎng)景下支持自定義快速失敗,自定義判定請(qǐng)求是否成功的熔斷方法,自動(dòng)上報(bào)執(zhí)行結(jié)果至熔斷器。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
個(gè)人感覺(jué),熔斷這部分代碼,相較于前幾篇文章,理解起來(lái)是更困難的。但其中的一些設(shè)計(jì)思想,和底層的實(shí)現(xiàn)原理也是非常值得學(xué)習(xí)的,希望這篇文章能夠?qū)Υ蠹矣袔椭?/p>
以上就是本文的全部?jī)?nèi)容,如果覺(jué)得還不錯(cuò)的話(huà)歡迎點(diǎn)贊,轉(zhuǎn)發(fā)和關(guān)注,感謝支持。
參考文章:
- https://juejin.cn/post/7030997067560386590
- https://go-zero.dev/docs/tutorials/service/governance/breaker
- https://sre.google/sre-book/handling-overload/
- https://martinfowler.com/bliki/CircuitBreaker.html
推薦閱讀:
