<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          ETCD源碼分析Client端啟動流程分析

          共 10926字,需瀏覽 22分鐘

           ·

          2021-09-17 21:21

          ETCD源碼基于v3.5,在分析之前,需要搭建好源碼分析的環(huán)境。首先,從GitHub的倉庫中克隆下ETCD的源碼,再利用docker搭建我們的ETCD測試集群,命令如下:

          REGISTRY=quay.io/coreos/etcd
          NAME_1=etcd-node-0
          NAME_2=etcd-node-1
          NAME_3=etcd-node-2
          # IP在不同機器上不同,請查看docker的子網(wǎng)網(wǎng)段
          HOST_1=172.20.0.2
          HOST_2=172.20.0.3
          HOST_3=172.20.0.4
          PORT_1=2379
          PORT_2=12379
          PORT_3=22379
          PORT_C_1=2380
          PORT_C_2=12380
          PORT_C_3=22380
          CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_C_3}
          # 需要保證目錄存在并可寫
          DATA_DIR=/var/folders/

          # 需要創(chuàng)建docker網(wǎng)絡,用于模擬集群網(wǎng)絡分區(qū)的情況。
          docker network create etcd_cluster

          docker run \
          -p $PORT_1:$PORT_1 \
          -p $PORT_C_1:$PORT_C_1 \
          --volume "${DATA_DIR}${NAME_1}:/etcd-data" \
          --name ${NAME_1} \
          --network etcd_cluster \
          ${REGISTRY}:v3.5.0 \
          /usr/local/bin/etcd \
          --name ${NAME_1} \
          --data-dir /etcd-data \
          --listen-client-urls http://0.0.0.0:$PORT_1 \
          --advertise-client-urls http://$HOST_1:$PORT_1 \
          --listen-peer-urls http://0.0.0.0:$PORT_C_1 \
          --initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \
          --initial-cluster ${CLUSTER} \
          --initial-cluster-token tkn \
          --initial-cluster-state new \
          --log-level info \
          --logger zap \
          --log-outputs stderr

          docker run \
          -p $PORT_2:$PORT_2 \
          -p $PORT_C_2:$PORT_C_2 \
          --volume=${DATA_DIR}${NAME_2}:/etcd-data \
          --name ${NAME_2} \
          --network etcd_cluster \
          ${REGISTRY}:v3.5.0 \
          /usr/local/bin/etcd \
          --name ${NAME_2} \
          --data-dir /etcd-data \
          --listen-client-urls http://0.0.0.0:$PORT_2 \
          --advertise-client-urls http://$HOST_2:$PORT_2 \
          --listen-peer-urls http://0.0.0.0:$PORT_C_2 \
          --initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \
          --initial-cluster ${CLUSTER} \
          --initial-cluster-token tkn \
          --initial-cluster-state new \
          --log-level info \
          --logger zap \
          --log-outputs stderr

          docker run \
          -p $PORT_3:$PORT_3 \
          -p $PORT_C_3:$PORT_C_3 \
          --volume=${DATA_DIR}${NAME_3}:/etcd-data \
          --name ${NAME_3} \
          --network etcd_cluster \
          ${REGISTRY}:v3.5.0 \
          /usr/local/bin/etcd \
          --name ${NAME_3} \
          --data-dir /etcd-data \
          --listen-client-urls http://0.0.0.0:$PORT_3 \
          --advertise-client-urls http://$HOST_3:$PORT_3 \
          --listen-peer-urls http://0.0.0.0:$PORT_C_3 \
          --initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \
          --initial-cluster ${CLUSTER} \
          --initial-cluster-token tkn \
          --initial-cluster-state new \
          --log-level info \
          --logger zap \
          --log-outputs stderr

          復制代碼

          如上,我們創(chuàng)建了三個ETCD節(jié)點,組成了一個集群。接下來我們正式進入源碼分析流程。

          ETCD Client啟動流程分析

          我們先看一段啟動代碼樣例:

                  cli, err := clientv3.New(clientv3.Config{
          Endpoints: exampleEndpoints(),
          DialTimeout: dialTimeout,
          })
          if err != nil {
          log.Fatal(err)
          }
          defer cli.Close()

          ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
          _, err = cli.Put(ctx, "sample_key", "sample_value")
          cancel()
          if err != nil {
          log.Fatal(err)
          }
          復制代碼

          一個最簡單的程序只需要提供集群的所有節(jié)點的ip和端口就能訪問,這里需要注意的是,一定要填寫ETCD集群的所有節(jié)點,這樣才能有故障轉(zhuǎn)移、負載均衡的特性。或者運行一個ETCD的代理節(jié)點(ETCD網(wǎng)關)負責轉(zhuǎn)發(fā)請求,這樣只填寫代理節(jié)點ip即可,當然性能上會有所損失。

          一、ETCD的Client啟動流程分析

          接下來我們看看Client是如何被創(chuàng)建出來的:


          func newClient(cfg *Config) (*Client, error) {
          // -----A-----
          ctx, cancel := context.WithCancel(baseCtx)
          client := &Client{
          conn: nil,
          cfg: *cfg,
          creds: creds,
          ctx: ctx,
          cancel: cancel,
          mu: new(sync.RWMutex),
          callOpts: defaultCallOpts,
          lgMu: new(sync.RWMutex),
          }
          // -----A-----

          // -----B-----
          client.resolver = resolver.New(cfg.Endpoints...)

          conn, err := client.dialWithBalancer()
          if err != nil {
          client.cancel()
          client.resolver.Close()
          return nil, err
          }
          client.conn = conn
          // -----B-----

          // -----C-----
          client.Cluster = NewCluster(client)
          client.KV = NewKV(client)
          client.Lease = NewLease(client)
          client.Watcher = NewWatcher(client)
          client.Auth = NewAuth(client)
          client.Maintenance = NewMaintenance(client)

          ...
          // -----C-----

          return client, nil
          }

          復制代碼
          A段代碼分析

          首先來看第A段代碼,其主要是初始化了一個client的實例,并把Config結構體傳遞給它,那么Config中包含了什么配置項呢?

          type Config struct {
          // ETCD服務器地址,注意需要提供ETCD集群所有節(jié)點的ip
          Endpoints []string `json:"endpoints"`

          // 設置了此間隔時間,每 AutoSyncInterval 時間ETCD客戶端都會
          // 自動向ETCD服務端請求最新的ETCD集群的所有節(jié)點列表
          //
          // 默認為0,即不請求
          AutoSyncInterval time.Duration `json:"auto-sync-interval"`

          // 建立底層的GRPC連接的超時時間
          DialTimeout time.Duration `json:"dial-timeout"`

          // 這個配置和下面的 DialKeepAliveTimeoutt
          // 都是用來打開GRPC提供的 KeepAlive
          // 功能,作用主要是保持底層TCP連接的有效性,
          // 及時發(fā)現(xiàn)連接斷開的異常。
          //
          // 默認不打開 keepalive
          DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`

          // 客戶端發(fā)送 keepalive 的 ping 后,等待服務端的 ping ack 包的時長
          // 超過此時長會報 `translation is closed`
          DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`

          // 也是 keepalive 中的設置,
          // true則表示無論有沒有活躍的GRPC連接,都執(zhí)行ping
          // false的話,沒有活躍的連接也就不會發(fā)送ping。
          PermitWithoutStream bool `json:"permit-without-stream"`

          // 最大可發(fā)送字節(jié)數(shù),默認為2MB
          // 也就是說,我們ETCD的一條KV記錄最大不能超過2MB,
          // 如果要設置超過2MB的KV值,
          // 只修改這個配置也是無效的,因為ETCD服務端那邊的限制也是2MB。
          // 需要先修改ETCD服務端啟動參數(shù):`--max-request-bytes`,再修改此值。
          MaxCallSendMsgSize int

          // 最大可接收的字節(jié)數(shù),默認為`Int.MaxInt32`
          // 一般不需要改動
          MaxCallRecvMsgSize int

          // HTTPS證書配置
          TLS *tls.Config

          // 上下文,一般用于取消操作
          ctx.Context

          // 設置此值,會拒絕連接到低版本的ETCD
          // 什么是低版本呢?
          // 寫死了,小于v3.2的版本都是低版本。
          RejectOldCluster bool `json:"reject-old-cluster"`

          // GRPC 的連接配置,具體可參考GRPC文檔
          DialOptions []grpc.DialOption

          // zap包的Logger配置
          // ETCD用的日志包就是zap
          Logger *zap.Logger
          LogConfig *zap.Config

          ...
          }
          復制代碼

          還有一些常用配置項,比較簡單,這里就不再列出了。

          B段代碼分析

          本段是整個代碼的核心部分,主要做了兩件事:

          1. 創(chuàng)建了 resolver 用于解析ETCD服務的地址
            resolver(解析器)其實是grpc中的概念,比如:DNS解析器,域名轉(zhuǎn)化為真實的ip;服務注冊中心,也是一種把服務名轉(zhuǎn)化為真實ip的解析服務。

            具體的概念就不展開了,如果對grpc這方面比較感興趣,文末會推薦一個講的很好的grpc源碼分析博客。

            總之,etcd自己寫了一個解析器,就在resolver包里,這個解析器提供了以下幾個功能:

            1. 把Endpoints里的ETCD服務器地址傳給grpc框架,這里,因為ETCD自己實現(xiàn)的解析器不支持DNS解析,所以Endpoints只能是ip地址或者unix套接字。

            2. 告訴grpc,如果Endpoints有多個,負載均衡的策略是輪詢,這點很重要。

          2. dialWithBalancer() 建立了到ETCD的服務端鏈接

          func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
          creds := c.credentialsForEndpoint(c.Endpoints()[0])
          opts := append(dopts, grpc.WithResolvers(c.resolver))
          return c.dial(creds, opts...)
          }
          復制代碼

          這個用于建立到ETCD服務端的連接的方法名很有意思,雖然叫dialWithBalancer但內(nèi)部代碼很簡單,可以看到里面并無Balancer(負載均衡器)的出現(xiàn)。但其實因為上面說到,ETCD使用了自己的resolver,其內(nèi)部已經(jīng)寫好了負載均衡策略:round_robin。所以這里通過grpc.WithResolvers()把resolver傳進去,也是達到了負載均衡的效果。

          接下來進入dial(),這個方法雖然有些長,但整體邏輯是非常清晰的,省略無關代碼后,其內(nèi)部是做了以下幾件事:

          func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
          // 首先,ETCD通過這行代碼,向GRPC框架加入了一些自己的
          // 配置,比如:KeepAlive特性(配置里提到的配置項)、
          // TLS證書配置、還有最重要的重試策略。
          opts, err := c.dialSetupOpts(creds, dopts...)
          ...

          // context 的一段經(jīng)典樣例代碼
          // 問:如果我同時把非零的DialTimeout和
          // 帶超時的 context 傳給客戶端,
          // 到底以哪個超時為準?
          // 答:這里新建了子context(dctx),父context和DialTimeout
          // 哪個先到deadline,就以哪個為準。
          dctx := c.ctx
          if c.cfg.DialTimeout > 0 {
          var cancel context.CancelFunc
          // 同時包含父context和DialTimeout
          // 哪個先倒時間就以哪個為準。
          dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
          defer cancel()
          }

          // 最終調(diào)用grpc.DialContext()建立連接
          conn, err := grpc.DialContext(dctx, target, opts...)
          ...
          return conn, nil
          }
          復制代碼
          C段代碼分析

          C段代碼無非就是做一些功能接口的初始化,比如:KV接口(用于提供Put、Get等)、Wathcer接口(用于監(jiān)聽Key)等,具體如何初始化到分析各接口再講。

          再回到啟動流程,初始化功能完畢后,就是getToken了,這個token是我們開啟了ETCD的賬號密碼功能后,通過賬號密碼獲取到了token,然后才能訪問ETCD提供的GRPC接口。

          然后是提供 RejectOldCluster 和 autoSync 功能,這個在介紹Config時也提過,這里就不再贅述了。

          ETCD Client重試策略分析

          對ETCD客戶端提供的自動重試策略的分析,是本文的重點。自動重試是ETCD能提供高可用特性的重要保證,在往下分析之前,一定要記住以下兩個概念:

          1. 自動重試不會在ETCD集群的同一節(jié)點上進行,這跟我們平常做的重試不同,因為前面說了ETCD是通過GRPC框架提供對集群訪問的負載均衡策略的,所以會輪詢的重試集群的每個節(jié)點。

          2. 自動重試只會重試一些特定的錯誤,比如:codes.Unavailable

          接下來,就讓我們來看看ETCD是如何利用GRPC提供的攔截器做自動重試的,學會這個,我們也能在自己的GRPC項目中用上同樣的套路:

              // 這段代碼在dialWithBalancer->dial->dialSetupOpts中
          rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))

          opts = append(opts,
          grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
          grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
          )

          復制代碼

          看以上的代碼,要自動重試只需兩步:

          1. 創(chuàng)建backoff函數(shù),也就是計算重試等待時間的函數(shù)。

          2. 通過WithXXXInterceptor(),注冊重試攔截器,這樣每次GRPC有請求都會回調(diào)該攔截器。

          這里,grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),我們看到Stream的重試攔截器,其最大重試次數(shù)設置為了0(withMax()),也就是不重試,這其實是故意為之,因為Client端的Stream重試不被支持。(Client端需要重試Stream,需要自己做單獨處理,不能通過攔截器。)

          那我們首先看看如何計算等待時間:

          // waitBetween 重試間隔時長
          // jitterFraction 隨機抖動率,
          // 比如:默認重試間隔為25ms,抖動率:0.1,
          // 那么實際重試間隔就在 25土2.5ms 之間。
          // attempt 實際重試了多少次
          func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration,
          jitterFraction float64) backoffFunc {

          return func(attempt uint) time.Duration {
          n := uint(len(c.Endpoints()))
          quorum := (n/2 + 1)
          if attempt%quorum == 0 {
          c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
          return jitterUp(waitBetween, jitterFraction)
          }
          c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
          return 0
          }
          }
          復制代碼

          可以看到roundRobinQuorumBackoff返回了一個閉包,內(nèi)部是重試間隔時長計算邏輯,這個邏輯說來也簡單:

              1. 若重試次數(shù)已經(jīng)達到集群的法定人數(shù)(quorum),則真正的計算間隔時長,
          間隔時長到期后,才進行重試。
          2. 否則,直接返回0,也就是馬上重試。
          復制代碼

          還記得剛才說的必須記住的兩個概念嗎?其中一點就是負載均衡策略寫死是輪詢,而這個重試邏輯一定要配合負載均衡是輪詢策略,達到的效果是:假如你訪問集群中的一臺節(jié)點失敗,可能是那臺節(jié)點出問題了,但如果整個集群是好的,這時候馬上重試,輪詢到下臺節(jié)點就行。

          但是,如果重試多次,集群大多數(shù)節(jié)點(法定人數(shù))都失敗了,那應該是集群出問題了,這時候就需要計算間隔時間,等會兒再重試看看問題能不能解決。

          這里也可以看到ETCD的Client端,考慮的細節(jié)問題是非常多的,一個簡單的重試時間計算,也能進行邏輯上的小小優(yōu)化。

          那么重試攔截器又是如何實現(xiàn)的呢?接著看攔截器的相關代碼:

          func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
          ...
          // 如果最大重試次數(shù)設置為0,那就不重試。
          if callOpts.max == 0 {
          return invoker(ctx, method, req, reply, cc, grpcOpts...)
          }
          var lastErr error
          // 開始重試計數(shù)
          for attempt := uint(0); attempt < callOpts.max; attempt++ {
          // 計算重試間隔時間,并阻塞代碼,等待
          // 這里最終會調(diào)用到 roundRobinQuorumBackoff 來計算時間
          if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
          return err
          }

          // 再次重新執(zhí)行GRPC請求
          lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
          if lastErr == nil {
          // 重試成功,退出
          return nil
          }

          // 這段代碼分析了兩種情況
          // 1. 服務端返回了 Context Error(超時、被取消),直接重試
          // 2. 客戶端的 Context 也出現(xiàn)了Error
          if isContextError(lastErr) {
          if ctx.Err() != nil {
          // 客戶端本身的ctx也報錯了,不重試了,退出。
          return lastErr
          }
          // 服務端返回,直接重試
          continue
          }

          if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
          // 是AuthToken不正確,重新獲取Token
          gterr := c.getToken(ctx)
          ...
          continue
          }
          // 只有在特定錯誤才重試(code.Unavailable)
          // 否則返回Err,不重試。
          if !isSafeRetry(c.lg, lastErr, callOpts) {
          return lastErr
          }
          }
          return lastErr
          }
          }
          復制代碼

          代碼做了一定程度的精簡,但是主要流程都是保留的。

          由此,ETCD的整體重試流程也介紹完畢了。

          總結

          通過對ETCD整個啟動流程的代碼分析,我們可以總結出以下幾點:

          1. Endpoints 用來做負載均衡和重試策略計算法定人數(shù),一定要填寫集群的全部節(jié)點,
          或者打開AutoSync功能。

          2. ETCD 自己編寫了GRPC的resolver和balancer,可以借鑒到GRPC的相關項目中去。
          resolver只能解析ip和unix套接字,balancer策略寫死是輪詢策略。

          3. ETCD 重試流程只重試部分錯誤,所以不要完全指望ETCD的自動重試,一定要自己做好錯誤處理。

          復制代碼

          啟動流程圖,其中列出的函數(shù)就是整個啟動流程上的重要函數(shù):

          Config

          New

          newClient

          resolver.New

          dialWithBanlancer

          dial

          grpc.DialContext

          最后,本文涉及到一些GRPC的基礎知識,不了解的小伙伴可以去(blog.csdn.net/u011582922/… )這里看看,講的很詳細。


          作者:FengY_HYY
          鏈接:https://juejin.cn/post/7007992988672458765
          來源:掘金
          著作權歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。



          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  九九视频网| 最近中文字幕免费MV第一季歌词十 | 婷婷激情网站 | 靠逼网站免费观看 | 被黑人操逼视频 |