<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          go-zero 的自適應(yīng)熔斷器

          共 19732字,需瀏覽 40分鐘

           ·

          2023-08-28 20:28

          上篇文章我們介紹了微服務(wù)的限流,詳細(xì)分析了計(jì)數(shù)器限流和令牌桶限流算法,這篇文章來(lái)說(shuō)說(shuō)熔斷。

          熔斷和限流還不太一樣,限流是控制請(qǐng)求速率,只要還能承受,那么都會(huì)處理,但熔斷不是。

          在一條調(diào)用鏈上,如果發(fā)現(xiàn)某個(gè)服務(wù)異常,比如響應(yīng)超時(shí)。那么調(diào)用者為了避免過(guò)多請(qǐng)求導(dǎo)致資源消耗過(guò)大,最終引發(fā)系統(tǒng)雪崩,會(huì)直接返回錯(cuò)誤,而不是瘋狂調(diào)用這個(gè)服務(wù)。

          本篇文章會(huì)介紹主流熔斷器的工作原理,并且會(huì)借助 go-zero 源碼,分析 googleBreaker 是如何通過(guò)滑動(dòng)窗口來(lái)統(tǒng)計(jì)流量,并且最終執(zhí)行熔斷的。

          工作原理

          這部分主要介紹兩種熔斷器的工作原理,分別是 Netflix 開(kāi)源的 Hystrix,其也是 Spring Cloud 默認(rèn)的熔斷組件,和 Google 的自適應(yīng)的熔斷器。

          Hystrix is no longer in active development, and is currently in maintenance mode.

          注意,Hystrix 官方已經(jīng)宣布不再積極開(kāi)發(fā)了,目前處在維護(hù)模式。

          Hystrix 官方推薦替代的開(kāi)源組件:Resilience4j,還有阿里開(kāi)源的 Sentinel 也是不錯(cuò)的替代品。

          hystrixBreaker

          Hystrix 采用了熔斷器模式,相當(dāng)于電路中的保險(xiǎn)絲,系統(tǒng)出現(xiàn)緊急問(wèn)題,立刻禁止所有請(qǐng)求,已達(dá)到保護(hù)系統(tǒng)的作用。

          35f7259e1f12604e719036aee498b187.webp

          系統(tǒng)需要維護(hù)三種狀態(tài),分別是:

          • 關(guān)閉: 默認(rèn)狀態(tài),所有請(qǐng)求全部能夠通過(guò)。當(dāng)請(qǐng)求失敗數(shù)量增加,失敗率超過(guò)閾值時(shí),會(huì)進(jìn)入到斷開(kāi)狀態(tài)。
          • 斷開(kāi): 此狀態(tài)下,所有請(qǐng)求都會(huì)被攔截。當(dāng)經(jīng)過(guò)一段超時(shí)時(shí)間后,會(huì)進(jìn)入到半斷開(kāi)狀態(tài)。
          • 半斷開(kāi): 此狀態(tài)下會(huì)允許一部分請(qǐng)求通過(guò),并統(tǒng)計(jì)成功數(shù)量,當(dāng)請(qǐng)求成功時(shí),恢復(fù)到關(guān)閉狀態(tài),否則繼續(xù)斷開(kāi)。

          通過(guò)狀態(tài)的變更,可以有效防止系統(tǒng)雪崩的問(wèn)題。同時(shí),在半斷開(kāi)狀態(tài)下,又可以讓系統(tǒng)進(jìn)行自我修復(fù)。

          googleBreaker

          googleBreaker 實(shí)現(xiàn)了一種自適應(yīng)的熔斷模式,來(lái)看一下算法的計(jì)算公式,客戶(hù)端請(qǐng)求被拒絕的概率

          71175e9b99d70feb83a88934de635491.webp

          參數(shù)很少,也比較好理解:

          1. requests:請(qǐng)求數(shù)量
          2. accepts:后端接收的請(qǐng)求數(shù)量
          3. K:敏感度,一般推薦 1.5-2 之間

          通過(guò)分析公式,我們可以得到下面幾個(gè)結(jié)論,也就是產(chǎn)生熔斷的實(shí)際原理:

          1. 正常情況下,requests 和 accepts 是相等的,拒絕的概率就是 0,沒(méi)有產(chǎn)生熔斷
          2. 當(dāng)正常請(qǐng)求量,也就是 accepts 減少時(shí),概率會(huì)逐漸增加,當(dāng)概率大于 0 時(shí),就會(huì)產(chǎn)生熔斷。如果 accepts 等于 0 了,則完全熔斷。
          3. 當(dāng)服務(wù)恢復(fù)后,requests 和 accepts 的數(shù)量會(huì)同時(shí)增加,但由于 K * accepts 增長(zhǎng)的更快,所以概率又會(huì)很快變回到 0,相當(dāng)于關(guān)閉了熔斷。

          總的來(lái)說(shuō),googleBreaker 的實(shí)現(xiàn)方案更加優(yōu)雅,而且參數(shù)也少,不用維護(hù)那么多的狀態(tài)。

          go-zero 就是采用了 googleBreaker 的方案,下面就來(lái)分析代碼,看看到底是怎么實(shí)現(xiàn)的。

          接口設(shè)計(jì)

          接口定義這部分我個(gè)人感覺(jué)還是挺不好理解的,看了好多遍才理清了它們之間的關(guān)系。

          其實(shí)看代碼和看書(shū)是一樣的,書(shū)越看越薄,代碼會(huì)越看越短。剛開(kāi)始看感覺(jué)代碼很長(zhǎng),隨著看懂的地方越來(lái)越多,明顯感覺(jué)代碼變短了。所以遇到不懂的代碼不要怕,反復(fù)看,總會(huì)看懂的。

          d27bd51d7f36d6f701aa5343f9f314ec.webp

          首先來(lái)看一下 breaker 部分的 UML 圖,有了這張圖,很多地方看起來(lái)還是相對(duì)清晰的,下面來(lái)詳細(xì)分析。

          這里用到了靜態(tài)代理模式,也可以說(shuō)是接口裝飾器,接下來(lái)就看看到底是怎么定義的:

                
                // core/breaker/breaker.go
          internalThrottle interface {
              allow() (internalPromise, error)
              doReq(req func() errorfallback func(err error) erroracceptable Acceptableerror
          }

          // core/breaker/googlebreaker.go
          type googleBreaker struct {
              k     float64
              stat  *collection.RollingWindow
              proba *mathx.Proba
          }

          這個(gè)接口是最終實(shí)現(xiàn)熔斷方法的接口,由 googleBreaker 結(jié)構(gòu)體實(shí)現(xiàn)。

                
                // core/breaker/breaker.go
          throttle interface {
              allow() (Promise, error)
              doReq(req func() errorfallback func(err error) erroracceptable Acceptableerror
          }

          type loggedThrottle struct {
              name string
              internalThrottle
              errWin *errorWindow
          }

          func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
              return loggedThrottle{
                  name:             name,
                  internalThrottle: t,
                  errWin:           new(errorWindow),
              }
          }

          這個(gè)是實(shí)現(xiàn)了日志收集的結(jié)構(gòu)體,首先它實(shí)現(xiàn)了 throttle 接口,然后它包含了一個(gè)字段 internalThrottle,相當(dāng)于具體的熔斷方法是代理給 internalThrottle 來(lái)做的。

                
                // core/breaker/breaker.go
          func (lt loggedThrottle) allow() (Promise, error) {
              promise, err := lt.internalThrottle.allow()
              return promiseWithReason{
                  promise: promise,
                  errWin:  lt.errWin,
              }, lt.logError(err)
          }

          func (lt loggedThrottle) doReq(req func() errorfallback func(err error) erroracceptable Acceptableerror {
              return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
                  accept := acceptable(err)
                  if !accept && err != nil {
                      lt.errWin.add(err.Error())
                  }
                  return accept
              }))
          }

          所以當(dāng)它執(zhí)行相應(yīng)方法時(shí),都是直接調(diào)用 internalThrottle 接口的方法,然后再加上自己的邏輯。

          這也就是代理所起到的作用,在不改變?cè)椒ǖ幕A(chǔ)上,擴(kuò)展原方法的功能。

                
                // core/breaker/breaker.go
          circuitBreaker struct {
              name string
              throttle
          }

          // NewBreaker returns a Breaker object.
          // opts can be used to customize the Breaker.
          func NewBreaker(opts ...Option) Breaker {
              var b circuitBreaker
              for _, opt := range opts {
                  opt(&b)
              }
              if len(b.name) == 0 {
                  b.name = stringx.Rand()
              }
              b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())

              return &b
          }

          最終的熔斷器又將功能代理給了 throttle

          這就是它們之間的關(guān)系,如果感覺(jué)有點(diǎn)亂的話(huà),就反復(fù)看,看的次數(shù)多了,就清晰了。

          日志收集

          上文介紹過(guò)了,loggedThrottle 是為了記錄日志而設(shè)計(jì)的代理層,這部分內(nèi)容來(lái)分析一下是如何記錄日志的。

                
                // core/breaker/breaker.go
          type errorWindow struct {
              // 記錄日志的數(shù)組
              reasons [numHistoryReasons]string
              // 索引
              index   int
              // 數(shù)組元素?cái)?shù)量,小于等于 numHistoryReasons
              count   int
              lock    sync.Mutex
          }

          func (ew *errorWindow) add(reason string) {
              ew.lock.Lock()
              // 記錄錯(cuò)誤日志內(nèi)容
              ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
              // 對(duì) numHistoryReasons 進(jìn)行取余來(lái)得到數(shù)組索引
              ew.index = (ew.index + 1) % numHistoryReasons
              ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
              ew.lock.Unlock()
          }

          func (ew *errorWindow) String() string {
              var reasons []string

              ew.lock.Lock()
              // reverse order
              for i := ew.index - 1; i >= ew.index-ew.count; i-- {
                  reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
              }
              ew.lock.Unlock()

              return strings.Join(reasons, "\n")
          }

          核心就是這里采用了一個(gè)環(huán)形數(shù)組,通過(guò)維護(hù)兩個(gè)字段來(lái)實(shí)現(xiàn),分別是 indexcount

          count 表示數(shù)組中元素的個(gè)數(shù),最大值是數(shù)組的長(zhǎng)度;index 是索引,每次 +1,然后對(duì)數(shù)組長(zhǎng)度取余得到新索引。

          我之前有一次面試就讓我設(shè)計(jì)一個(gè)環(huán)形數(shù)組,當(dāng)時(shí)答的還不是很好,這次算是學(xué)會(huì)了。

          滑動(dòng)窗口

          一般來(lái)說(shuō),想要判斷是否需要觸發(fā)熔斷,那么首先要知道一段時(shí)間的請(qǐng)求數(shù)量,一段時(shí)間內(nèi)的數(shù)量統(tǒng)計(jì)可以使用滑動(dòng)窗口來(lái)實(shí)現(xiàn)。

          首先看一下滑動(dòng)窗口的定義:

                
                // core/collection/rollingwindow.go

          type RollingWindow struct {
              lock          sync.RWMutex
              // 窗口大小
              size          int
              // 窗口數(shù)據(jù)容器
              win           *window
              // 時(shí)間間隔
              interval      time.Duration
              // 游標(biāo),用于定位當(dāng)前應(yīng)該寫(xiě)入哪個(gè) bucket
              offset        int
              // 匯總數(shù)據(jù)時(shí),是否忽略當(dāng)前正在寫(xiě)入桶的數(shù)據(jù)
              // 某些場(chǎng)景下因?yàn)楫?dāng)前正在寫(xiě)入的桶數(shù)據(jù)并沒(méi)有經(jīng)過(guò)完整的窗口時(shí)間間隔
              // 可能導(dǎo)致當(dāng)前桶的統(tǒng)計(jì)并不準(zhǔn)確
              ignoreCurrent bool
              // 最后寫(xiě)入桶的時(shí)間
              // 用于計(jì)算下一次寫(xiě)入數(shù)據(jù)間隔最后一次寫(xiě)入數(shù)據(jù)的之間
              // 經(jīng)過(guò)了多少個(gè)時(shí)間間隔
              lastTime      time.Duration // start time of the last bucket
          }

          再來(lái)看一下 window 的結(jié)構(gòu):

                
                type Bucket struct {
              // 桶內(nèi)值的和
              Sum   float64
              // 桶內(nèi) add 次數(shù)
              Count int64
          }

          func (b *Bucket) add(v float64) {
              b.Sum += v
              b.Count++
          }

          func (b *Bucket) reset() {
              b.Sum = 0
              b.Count = 0
          }

          type window struct {
              // 桶,一個(gè)桶就是一個(gè)時(shí)間間隔
              buckets []*Bucket
              // 窗口大小,也就是桶的數(shù)量
              size    int
          }

          有了這兩個(gè)結(jié)構(gòu)之后,我們就可以畫(huà)出這個(gè)滑動(dòng)窗口了,如圖所示。

          330b618cac5eb1daf05dbc2c82a26792.webp

          現(xiàn)在來(lái)看一下向窗口中添加數(shù)據(jù),是怎樣一個(gè)過(guò)程。

                
                func (rw *RollingWindow) Add(v float64) {
              rw.lock.Lock()
              defer rw.lock.Unlock()
              // 獲取當(dāng)前寫(xiě)入下標(biāo)
              rw.updateOffset()
              // 向 bucket 中寫(xiě)入數(shù)據(jù)
              rw.win.add(rw.offset, v)
          }

          func (rw *RollingWindow) span() int {
              // 計(jì)算距離 lastTime 經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè)桶
              offset := int(timex.Since(rw.lastTime) / rw.interval)
              // 如果在窗口范圍內(nèi),返回實(shí)際值,否則返回窗口大小
              if 0 <= offset && offset < rw.size {
                  return offset
              }

              return rw.size
          }

          func (rw *RollingWindow) updateOffset() {
              // 經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè)桶
              span := rw.span()
              // 還在同一單元時(shí)間內(nèi)不需要更新
              if span <= 0 {
                  return
              }

              offset := rw.offset
              // reset expired buckets
              // 這里是清除過(guò)期桶的數(shù)據(jù)
              // 也是對(duì)數(shù)組大小進(jìn)行取余的方式,類(lèi)似上文介紹的環(huán)形數(shù)組
              for i := 0; i < span; i++ {
                  rw.win.resetBucket((offset + i + 1) % rw.size)
              }

              // 更新游標(biāo)
              rw.offset = (offset + span) % rw.size
              now := timex.Now()
              // align to interval time boundary
              // 這里應(yīng)該是一個(gè)時(shí)間的對(duì)齊,保持在桶內(nèi)指向位置是一致的
              rw.lastTime = now - (now-rw.lastTime)%rw.interval
          }

          // 向桶內(nèi)添加數(shù)據(jù)
          func (w *window) add(offset int, v float64) {
              // 根據(jù) offset 對(duì)數(shù)組大小取余得到索引,然后添加數(shù)據(jù)
              w.buckets[offset%w.size].add(v)
          }

          // 重置桶數(shù)據(jù)
          func (w *window) resetBucket(offset int) {
              w.buckets[offset%w.size].reset()
          }

          我畫(huà)了一張圖,來(lái)模擬整個(gè)滑動(dòng)過(guò)程:

          6ffb6693733334fefddda323ec47c315.webp

          主要經(jīng)歷 4 個(gè)步驟:

          1. 計(jì)算當(dāng)前時(shí)間距離上次添加時(shí)間經(jīng)過(guò)了多少個(gè)時(shí)間間隔,也就是多少個(gè) bucket
          2. 清理過(guò)期桶數(shù)據(jù)
          3. 更新 offset,更新 offset 的過(guò)程實(shí)際就是模擬窗口滑動(dòng)的過(guò)程
          4. 添加數(shù)據(jù)

          比如上圖,剛開(kāi)始 offset 指向了 bucket[1],經(jīng)過(guò)了兩個(gè) span 之后,bucket[2]bucket[3] 會(huì)被清空,同時(shí),新的 offset 會(huì)指向 bucket[3],新添加的數(shù)據(jù)會(huì)寫(xiě)入到 bucket[3]

          再來(lái)看看數(shù)據(jù)統(tǒng)計(jì),也就是窗口內(nèi)的有效數(shù)據(jù)量是多少。

                
                // Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
          func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
              rw.lock.RLock()
              defer rw.lock.RUnlock()

              var diff int
              span := rw.span()
              // ignore current bucket, because of partial data
              if span == 0 && rw.ignoreCurrent {
                  diff = rw.size - 1
              } else {
                  diff = rw.size - span
              }
              // 需要統(tǒng)計(jì)的 bucket 數(shù)量,窗口大小減去 span 數(shù)量
              if diff > 0 {
                  // 獲取統(tǒng)計(jì)的起始位置,span 是已經(jīng)被重置的 bucket
                  offset := (rw.offset + span + 1) % rw.size
                  rw.win.reduce(offset, diff, fn)
              }
          }

          func (w *window) reduce(start, count int, fn func(b *Bucket)) {
              for i := 0; i < count; i++ {
                  // 自定義統(tǒng)計(jì)函數(shù)
                  fn(w.buckets[(start+i)%w.size])
              }
          }

          統(tǒng)計(jì)出窗口數(shù)據(jù)之后,就可以判斷是否需要熔斷了。

          執(zhí)行熔斷

          接下來(lái)就是執(zhí)行熔斷了,主要就是看看自適應(yīng)熔斷是如何實(shí)現(xiàn)的。

                
                // core/breaker/googlebreaker.go

          const (
              // 250ms for bucket duration
              window     = time.Second * 10
              buckets    = 40
              k          = 1.5
              protection = 5
          )

          窗口的定義部分,整個(gè)窗口是 10s,然后分成 40 個(gè) bucket,每個(gè) bucket 就是 250ms。

                
                // googleBreaker is a netflixBreaker pattern from google.
          // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
          type googleBreaker struct {
              k     float64
              stat  *collection.RollingWindow
              proba *mathx.Proba
          }

          func (b *googleBreaker) accept() error {
              // 獲取最近一段時(shí)間的統(tǒng)計(jì)數(shù)據(jù)
              accepts, total := b.history()
              // 根據(jù)上文提到的算法來(lái)計(jì)算一個(gè)概率
              weightedAccepts := b.k * float64(accepts)
              // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
              dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
              // 如果小于等于 0 直接通過(guò),不熔斷
              if dropRatio <= 0 {
                  return nil
              }

              // 隨機(jī)產(chǎn)生 0.0-1.0 之間的隨機(jī)數(shù)與上面計(jì)算出來(lái)的熔斷概率相比較
              // 如果隨機(jī)數(shù)比熔斷概率小則進(jìn)行熔斷
              if b.proba.TrueOnProba(dropRatio) {
                  return ErrServiceUnavailable
              }

              return nil
          }

          func (b *googleBreaker) history() (accepts, total int64) {
              b.stat.Reduce(func(b *collection.Bucket) {
                  accepts += int64(b.Sum)
                  total += b.Count
              })

              return
          }

          以上就是自適應(yīng)熔斷的邏輯,通過(guò)概率的比較來(lái)隨機(jī)淘汰掉部分請(qǐng)求,然后隨著服務(wù)恢復(fù),淘汰的請(qǐng)求會(huì)逐漸變少,直至不淘汰。

                
                func (b *googleBreaker) allow() (internalPromise, error) {
              if err := b.accept(); err != nil {
                  return nil, err
              }

              // 返回一個(gè) promise 異步回調(diào)對(duì)象,可由開(kāi)發(fā)者自行決定是否上報(bào)結(jié)果到熔斷器
              return googlePromise{
                  b: b,
              }, nil
          }

          // req - 熔斷對(duì)象方法
          // fallback - 自定義快速失敗函數(shù),可對(duì)熔斷產(chǎn)生的err進(jìn)行包裝后返回
          // acceptable - 對(duì)本次未熔斷時(shí)執(zhí)行請(qǐng)求的結(jié)果進(jìn)行自定義的判定,比如可以針對(duì)http.code,rpc.code,body.code
          func (b *googleBreaker) doReq(req func() errorfallback func(err error) erroracceptable Acceptableerror {
              if err := b.accept(); err != nil {
                  // 熔斷中,如果有自定義的fallback則執(zhí)行
                  if fallback != nil {
                      return fallback(err)
                  }

                  return err
              }

              defer func() {
                  // 如果執(zhí)行req()過(guò)程發(fā)生了panic,依然判定本次執(zhí)行失敗上報(bào)至熔斷器
                  if e := recover(); e != nil {
                      b.markFailure()
                      panic(e)
                  }
              }()

              err := req()
              // 上報(bào)結(jié)果
              if acceptable(err) {
                  b.markSuccess()
              } else {
                  b.markFailure()
              }

              return err
          }

          熔斷器對(duì)外暴露兩種類(lèi)型的方法:

          1、簡(jiǎn)單場(chǎng)景直接判斷對(duì)象是否被熔斷,執(zhí)行請(qǐng)求后必須需手動(dòng)上報(bào)執(zhí)行結(jié)果至熔斷器。

                
                
                  func (b *googleBreaker) allow() (internalPromise, error)
                  

          2、復(fù)雜場(chǎng)景下支持自定義快速失敗,自定義判定請(qǐng)求是否成功的熔斷方法,自動(dòng)上報(bào)執(zhí)行結(jié)果至熔斷器。

                
                
                  func (b *googleBreaker) doReq(req func() errorfallback func(err error) erroracceptable Acceptableerror
                  

          個(gè)人感覺(jué),熔斷這部分代碼,相較于前幾篇文章,理解起來(lái)是更困難的。但其中的一些設(shè)計(jì)思想,和底層的實(shí)現(xiàn)原理也是非常值得學(xué)習(xí)的,希望這篇文章能夠?qū)Υ蠹矣袔椭?/p>

          以上就是本文的全部?jī)?nèi)容,如果覺(jué)得還不錯(cuò)的話(huà)歡迎點(diǎn)贊轉(zhuǎn)發(fā)關(guān)注,感謝支持。


          參考文章:

          • https://juejin.cn/post/7030997067560386590
          • https://go-zero.dev/docs/tutorials/service/governance/breaker
          • https://sre.google/sre-book/handling-overload/
          • https://martinfowler.com/bliki/CircuitBreaker.html

          推薦閱讀:

          瀏覽 40
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  资优生的性爱大对决 | 成人免费性生活视频 | 人妻av中文字幕 日本高清不卡视频 | 人妻大香蕉 | 久操网免费视频在线观看 |