ETCD源碼分析Client端啟動流程分析
ETCD源碼基于v3.5,在分析之前,需要搭建好源碼分析的環(huán)境。首先,從GitHub的倉庫中克隆下ETCD的源碼,再利用docker搭建我們的ETCD測試集群,命令如下:
REGISTRY=quay.io/coreos/etcd
NAME_1=etcd-node-0
NAME_2=etcd-node-1
NAME_3=etcd-node-2
# IP在不同機器上不同,請查看docker的子網(wǎng)網(wǎng)段
HOST_1=172.20.0.2
HOST_2=172.20.0.3
HOST_3=172.20.0.4
PORT_1=2379
PORT_2=12379
PORT_3=22379
PORT_C_1=2380
PORT_C_2=12380
PORT_C_3=22380
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_C_3}
# 需要保證目錄存在并可寫
DATA_DIR=/var/folders/
# 需要創(chuàng)建docker網(wǎng)絡,用于模擬集群網(wǎng)絡分區(qū)的情況。
docker network create etcd_cluster
docker run \
-p $PORT_1:$PORT_1 \
-p $PORT_C_1:$PORT_C_1 \
--volume "${DATA_DIR}${NAME_1}:/etcd-data" \
--name ${NAME_1} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_1} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_1 \
--advertise-client-urls http://$HOST_1:$PORT_1 \
--listen-peer-urls http://0.0.0.0:$PORT_C_1 \
--initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr
docker run \
-p $PORT_2:$PORT_2 \
-p $PORT_C_2:$PORT_C_2 \
--volume=${DATA_DIR}${NAME_2}:/etcd-data \
--name ${NAME_2} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_2} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_2 \
--advertise-client-urls http://$HOST_2:$PORT_2 \
--listen-peer-urls http://0.0.0.0:$PORT_C_2 \
--initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr
docker run \
-p $PORT_3:$PORT_3 \
-p $PORT_C_3:$PORT_C_3 \
--volume=${DATA_DIR}${NAME_3}:/etcd-data \
--name ${NAME_3} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_3} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_3 \
--advertise-client-urls http://$HOST_3:$PORT_3 \
--listen-peer-urls http://0.0.0.0:$PORT_C_3 \
--initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr
復制代碼如上,我們創(chuàng)建了三個ETCD節(jié)點,組成了一個集群。接下來我們正式進入源碼分析流程。
ETCD Client啟動流程分析
我們先看一段啟動代碼樣例:
cli, err := clientv3.New(clientv3.Config{
Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "sample_key", "sample_value")
cancel()
if err != nil {
log.Fatal(err)
}
復制代碼一個最簡單的程序只需要提供集群的所有節(jié)點的ip和端口就能訪問,這里需要注意的是,一定要填寫ETCD集群的所有節(jié)點,這樣才能有故障轉(zhuǎn)移、負載均衡的特性。或者運行一個ETCD的代理節(jié)點(ETCD網(wǎng)關)負責轉(zhuǎn)發(fā)請求,這樣只填寫代理節(jié)點ip即可,當然性能上會有所損失。
一、ETCD的Client啟動流程分析
接下來我們看看Client是如何被創(chuàng)建出來的:
func newClient(cfg *Config) (*Client, error) {
// -----A-----
ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}
// -----A-----
// -----B-----
client.resolver = resolver.New(cfg.Endpoints...)
conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolver.Close()
return nil, err
}
client.conn = conn
// -----B-----
// -----C-----
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)
...
// -----C-----
return client, nil
}
復制代碼A段代碼分析
首先來看第A段代碼,其主要是初始化了一個client的實例,并把Config結構體傳遞給它,那么Config中包含了什么配置項呢?
type Config struct {
// ETCD服務器地址,注意需要提供ETCD集群所有節(jié)點的ip
Endpoints []string `json:"endpoints"`
// 設置了此間隔時間,每 AutoSyncInterval 時間ETCD客戶端都會
// 自動向ETCD服務端請求最新的ETCD集群的所有節(jié)點列表
//
// 默認為0,即不請求
AutoSyncInterval time.Duration `json:"auto-sync-interval"`
// 建立底層的GRPC連接的超時時間
DialTimeout time.Duration `json:"dial-timeout"`
// 這個配置和下面的 DialKeepAliveTimeoutt
// 都是用來打開GRPC提供的 KeepAlive
// 功能,作用主要是保持底層TCP連接的有效性,
// 及時發(fā)現(xiàn)連接斷開的異常。
//
// 默認不打開 keepalive
DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`
// 客戶端發(fā)送 keepalive 的 ping 后,等待服務端的 ping ack 包的時長
// 超過此時長會報 `translation is closed`
DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`
// 也是 keepalive 中的設置,
// true則表示無論有沒有活躍的GRPC連接,都執(zhí)行ping
// false的話,沒有活躍的連接也就不會發(fā)送ping。
PermitWithoutStream bool `json:"permit-without-stream"`
// 最大可發(fā)送字節(jié)數(shù),默認為2MB
// 也就是說,我們ETCD的一條KV記錄最大不能超過2MB,
// 如果要設置超過2MB的KV值,
// 只修改這個配置也是無效的,因為ETCD服務端那邊的限制也是2MB。
// 需要先修改ETCD服務端啟動參數(shù):`--max-request-bytes`,再修改此值。
MaxCallSendMsgSize int
// 最大可接收的字節(jié)數(shù),默認為`Int.MaxInt32`
// 一般不需要改動
MaxCallRecvMsgSize int
// HTTPS證書配置
TLS *tls.Config
// 上下文,一般用于取消操作
ctx.Context
// 設置此值,會拒絕連接到低版本的ETCD
// 什么是低版本呢?
// 寫死了,小于v3.2的版本都是低版本。
RejectOldCluster bool `json:"reject-old-cluster"`
// GRPC 的連接配置,具體可參考GRPC文檔
DialOptions []grpc.DialOption
// zap包的Logger配置
// ETCD用的日志包就是zap
Logger *zap.Logger
LogConfig *zap.Config
...
}
復制代碼還有一些常用配置項,比較簡單,這里就不再列出了。
B段代碼分析
本段是整個代碼的核心部分,主要做了兩件事:
創(chuàng)建了 resolver 用于解析ETCD服務的地址
resolver(解析器)其實是grpc中的概念,比如:DNS解析器,域名轉(zhuǎn)化為真實的ip;服務注冊中心,也是一種把服務名轉(zhuǎn)化為真實ip的解析服務。具體的概念就不展開了,如果對grpc這方面比較感興趣,文末會推薦一個講的很好的grpc源碼分析博客。
總之,etcd自己寫了一個解析器,就在resolver包里,這個解析器提供了以下幾個功能:
把Endpoints里的ETCD服務器地址傳給grpc框架,這里,因為ETCD自己實現(xiàn)的解析器不支持DNS解析,所以Endpoints只能是ip地址或者unix套接字。
告訴grpc,如果Endpoints有多個,負載均衡的策略是輪詢,這點很重要。
dialWithBalancer() 建立了到ETCD的服務端鏈接
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
復制代碼這個用于建立到ETCD服務端的連接的方法名很有意思,雖然叫dialWithBalancer但內(nèi)部代碼很簡單,可以看到里面并無Balancer(負載均衡器)的出現(xiàn)。但其實因為上面說到,ETCD使用了自己的resolver,其內(nèi)部已經(jīng)寫好了負載均衡策略:round_robin。所以這里通過grpc.WithResolvers()把resolver傳進去,也是達到了負載均衡的效果。
接下來進入dial(),這個方法雖然有些長,但整體邏輯是非常清晰的,省略無關代碼后,其內(nèi)部是做了以下幾件事:
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
// 首先,ETCD通過這行代碼,向GRPC框架加入了一些自己的
// 配置,比如:KeepAlive特性(配置里提到的配置項)、
// TLS證書配置、還有最重要的重試策略。
opts, err := c.dialSetupOpts(creds, dopts...)
...
// context 的一段經(jīng)典樣例代碼
// 問:如果我同時把非零的DialTimeout和
// 帶超時的 context 傳給客戶端,
// 到底以哪個超時為準?
// 答:這里新建了子context(dctx),父context和DialTimeout
// 哪個先到deadline,就以哪個為準。
dctx := c.ctx
if c.cfg.DialTimeout > 0 {
var cancel context.CancelFunc
// 同時包含父context和DialTimeout
// 哪個先倒時間就以哪個為準。
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel()
}
// 最終調(diào)用grpc.DialContext()建立連接
conn, err := grpc.DialContext(dctx, target, opts...)
...
return conn, nil
}
復制代碼C段代碼分析
C段代碼無非就是做一些功能接口的初始化,比如:KV接口(用于提供Put、Get等)、Wathcer接口(用于監(jiān)聽Key)等,具體如何初始化到分析各接口再講。
再回到啟動流程,初始化功能完畢后,就是getToken了,這個token是我們開啟了ETCD的賬號密碼功能后,通過賬號密碼獲取到了token,然后才能訪問ETCD提供的GRPC接口。
然后是提供 RejectOldCluster 和 autoSync 功能,這個在介紹Config時也提過,這里就不再贅述了。
ETCD Client重試策略分析
對ETCD客戶端提供的自動重試策略的分析,是本文的重點。自動重試是ETCD能提供高可用特性的重要保證,在往下分析之前,一定要記住以下兩個概念:
自動重試不會在ETCD集群的同一節(jié)點上進行,這跟我們平常做的重試不同,因為前面說了ETCD是通過GRPC框架提供對集群訪問的負載均衡策略的,所以會輪詢的重試集群的每個節(jié)點。
自動重試只會重試一些特定的錯誤,比如:codes.Unavailable
接下來,就讓我們來看看ETCD是如何利用GRPC提供的攔截器做自動重試的,學會這個,我們也能在自己的GRPC項目中用上同樣的套路:
// 這段代碼在dialWithBalancer->dial->dialSetupOpts中
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
opts = append(opts,
grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
)
復制代碼看以上的代碼,要自動重試只需兩步:
創(chuàng)建backoff函數(shù),也就是計算重試等待時間的函數(shù)。
通過WithXXXInterceptor(),注冊重試攔截器,這樣每次GRPC有請求都會回調(diào)該攔截器。
這里,grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),我們看到Stream的重試攔截器,其最大重試次數(shù)設置為了0(withMax()),也就是不重試,這其實是故意為之,因為Client端的Stream重試不被支持。(Client端需要重試Stream,需要自己做單獨處理,不能通過攔截器。)
那我們首先看看如何計算等待時間:
// waitBetween 重試間隔時長
// jitterFraction 隨機抖動率,
// 比如:默認重試間隔為25ms,抖動率:0.1,
// 那么實際重試間隔就在 25土2.5ms 之間。
// attempt 實際重試了多少次
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration,
jitterFraction float64) backoffFunc {
return func(attempt uint) time.Duration {
n := uint(len(c.Endpoints()))
quorum := (n/2 + 1)
if attempt%quorum == 0 {
c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
return jitterUp(waitBetween, jitterFraction)
}
c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
return 0
}
}
復制代碼可以看到roundRobinQuorumBackoff返回了一個閉包,內(nèi)部是重試間隔時長計算邏輯,這個邏輯說來也簡單:
1. 若重試次數(shù)已經(jīng)達到集群的法定人數(shù)(quorum),則真正的計算間隔時長,
間隔時長到期后,才進行重試。
2. 否則,直接返回0,也就是馬上重試。
復制代碼還記得剛才說的必須記住的兩個概念嗎?其中一點就是負載均衡策略寫死是輪詢,而這個重試邏輯一定要配合負載均衡是輪詢策略,達到的效果是:假如你訪問集群中的一臺節(jié)點失敗,可能是那臺節(jié)點出問題了,但如果整個集群是好的,這時候馬上重試,輪詢到下臺節(jié)點就行。
但是,如果重試多次,集群大多數(shù)節(jié)點(法定人數(shù))都失敗了,那應該是集群出問題了,這時候就需要計算間隔時間,等會兒再重試看看問題能不能解決。
這里也可以看到ETCD的Client端,考慮的細節(jié)問題是非常多的,一個簡單的重試時間計算,也能進行邏輯上的小小優(yōu)化。
那么重試攔截器又是如何實現(xiàn)的呢?接著看攔截器的相關代碼:
func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
...
// 如果最大重試次數(shù)設置為0,那就不重試。
if callOpts.max == 0 {
return invoker(ctx, method, req, reply, cc, grpcOpts...)
}
var lastErr error
// 開始重試計數(shù)
for attempt := uint(0); attempt < callOpts.max; attempt++ {
// 計算重試間隔時間,并阻塞代碼,等待
// 這里最終會調(diào)用到 roundRobinQuorumBackoff 來計算時間
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
return err
}
// 再次重新執(zhí)行GRPC請求
lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
if lastErr == nil {
// 重試成功,退出
return nil
}
// 這段代碼分析了兩種情況
// 1. 服務端返回了 Context Error(超時、被取消),直接重試
// 2. 客戶端的 Context 也出現(xiàn)了Error
if isContextError(lastErr) {
if ctx.Err() != nil {
// 客戶端本身的ctx也報錯了,不重試了,退出。
return lastErr
}
// 服務端返回,直接重試
continue
}
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
// 是AuthToken不正確,重新獲取Token
gterr := c.getToken(ctx)
...
continue
}
// 只有在特定錯誤才重試(code.Unavailable)
// 否則返回Err,不重試。
if !isSafeRetry(c.lg, lastErr, callOpts) {
return lastErr
}
}
return lastErr
}
}
復制代碼代碼做了一定程度的精簡,但是主要流程都是保留的。
由此,ETCD的整體重試流程也介紹完畢了。
總結
通過對ETCD整個啟動流程的代碼分析,我們可以總結出以下幾點:
1. Endpoints 用來做負載均衡和重試策略計算法定人數(shù),一定要填寫集群的全部節(jié)點,
或者打開AutoSync功能。
2. ETCD 自己編寫了GRPC的resolver和balancer,可以借鑒到GRPC的相關項目中去。
resolver只能解析ip和unix套接字,balancer策略寫死是輪詢策略。
3. ETCD 重試流程只重試部分錯誤,所以不要完全指望ETCD的自動重試,一定要自己做好錯誤處理。
復制代碼啟動流程圖,其中列出的函數(shù)就是整個啟動流程上的重要函數(shù):
最后,本文涉及到一些GRPC的基礎知識,不了解的小伙伴可以去(blog.csdn.net/u011582922/… )這里看看,講的很詳細。
作者:FengY_HYY
鏈接:https://juejin.cn/post/7007992988672458765
來源:掘金
著作權歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權,非商業(yè)轉(zhuǎn)載請注明出處。
