Etcd Watch 居然會丟事件?
v1.27 的 K8s,在 kube-apiserver 的日志中會看到 "etcd event received with PrevKv=nil" 的字樣,資源對象被刪除后在 Etcd 中已經(jīng)不存在了但在 Reflector store 中仍然存在,可以在 Informer 或者 watchCache 中看到對應(yīng)的對象,依賴 Informer 的組件也不會感知到資源對象被刪除,通過 List API 設(shè)置 RV="0" 去 kube-apiserver 的 watchCache 中獲取的話也可以看到已經(jīng)被刪除的對象仍然存在。
根因先說結(jié)論,etcd watch 實現(xiàn)存在 bug,會導(dǎo)致客戶端在使用 etcd client watch 時出現(xiàn)丟事件的現(xiàn)象。
Etcd Watchetcd 本身是客戶端 + 服務(wù)端實現(xiàn)的 CS 架構(gòu),協(xié)議層通過 raft 算法保證了服務(wù)的強(qiáng)一致性和高可用性,同時 etcd 還提供了針對于存儲數(shù)據(jù)的 watch 監(jiān)聽回調(diào)功能。所謂 watch 機(jī)制,指的是應(yīng)用方可以針對存儲在 etcd 中特定范圍的數(shù)據(jù)創(chuàng)建 watch 監(jiān)聽器,在 watch 過程中,當(dāng)對應(yīng)數(shù)據(jù)發(fā)生變化時,etcd 會根據(jù) watch 記錄追溯到應(yīng)用方,對變更事件進(jìn)行同步。不清楚 etcd watch 流程的話,墻裂推薦先看小徐先生的編程世界公眾號的兩篇文章。
客戶端
以下小節(jié)直接摘自《etcd watch 機(jī)制源碼解析--客戶端篇》
創(chuàng)建 watch 鏈路
![]()
當(dāng)應(yīng)用方首次嘗試通過 etcd 客戶端發(fā)起創(chuàng)建 watch 的請求時,首先會進(jìn)行 etcd 客戶端與 etcd 服務(wù)端間通信架構(gòu)的初始化,在之后的請求中可以統(tǒng)一復(fù)用. 在這部分準(zhǔn)備工作中,客戶端側(cè)會創(chuàng)建并異步運行 grpc 長連接代理對象 watcherGrpcStream;同時會啟動協(xié)程 serveWatchClient,持續(xù)輪詢處理來自 etcd 服務(wù)端的響應(yīng).
在創(chuàng)建 watch 的主鏈路中,etcd 客戶端會創(chuàng)建好一個 channel(稱之為 upch) 提前返回給應(yīng)用方,后續(xù) watch 監(jiān)聽的數(shù)據(jù)發(fā)生變更時,應(yīng)用方可以通過這個 channel 接收到 watch 回調(diào)事件.
接下來,etcd 客戶端會通過 watcherGrpcStream 將創(chuàng)建 watch 的請求通過 grpc 長連接推送到 etcd 服務(wù)端,并且 watcherGrpcStream 會針對每個 watch 異步啟動一個 watchSubStream 進(jìn)行對應(yīng) watch 下回調(diào)事件的監(jiān)聽和處理.
watch 回調(diào)鏈路
![]()
每當(dāng) watch 監(jiān)聽的數(shù)據(jù)發(fā)生變更后,etcd 服務(wù)端會通過 grpc 長連接將變更事件推送到 etcd 客戶端.
etcd 客戶端會通過常駐的 serveWatchClient 協(xié)程接收到 watch 回調(diào)事件,接下來 watcherGrpcStream 會根據(jù)回調(diào)事件所屬的 watch 將其分配給對應(yīng)的 watchSubStream,最終通過 endpointManager 的周轉(zhuǎn),并通過應(yīng)用方持有的 upch,將回調(diào)事件推送到應(yīng)用方的手中.
服務(wù)端
以下小節(jié)直接摘自《etcd watch 機(jī)制源碼解析--服務(wù)端篇》
創(chuàng)建 watch
![]()
- 首先,etcd 客戶端通過 grpc 長連接向 etcd 服務(wù)端發(fā)送創(chuàng)建 watch 請求
- serverWatchStream 中持續(xù)運行的讀協(xié)程 recvLoop 通過 grpc 長連接接收到創(chuàng)建 watch 請求
- recvLoop 通過 watchStream,將創(chuàng)建 watch 的請求傳遞至底層的 watchableStore 模塊
- watchableStore 根據(jù)新增的 watcher 是否需要回溯歷史變更記錄,會將 watcher 添加到 synced group 或者 unsynced group 當(dāng)中存儲
- revLoop 接收到來自 watchableStore 的響應(yīng)后,會通過 ctrlStream 將響應(yīng)數(shù)據(jù)發(fā)往寫協(xié)程 sendLoop
- sendLoop 通過 ctrlStream 接收到響應(yīng)后,通過 grpc 長連接將其發(fā)往 etcd 客戶端
watch 回調(diào)
![]()
- 首先需要明確的是,不管是 etcd 客戶端和 etcd 服務(wù)端節(jié)點建立的 grpc 長連接還是創(chuàng)建的 watcher 監(jiān)聽器,其生效范圍都局限于某個特定的 etcd 服務(wù)端節(jié)點當(dāng)中.
- 在一個 etcd 服務(wù)端節(jié)點發(fā)生寫狀態(tài)機(jī)數(shù)據(jù)的動作時,會執(zhí)行 notify 動作,將變更的數(shù)據(jù)和 watchableStore synced group 中的 watcher 監(jiān)聽器進(jìn)行 join,產(chǎn)生出一個批次的 watch 回調(diào)事件
- watchableStore 通過 watchStream 將 watch 回調(diào)事件發(fā)往上層的 serverWatchStream
- serverWatchStream 的 sendLoop 協(xié)程接收到 watch 回調(diào)事件,通過 grpc 長連接將其發(fā)送到 etcd 客戶端
WatchableStore 刷新
![]()
watchableStore 模塊的存儲介質(zhì)分為三部分,都是完全基于內(nèi)存實現(xiàn)的存儲,并且不同節(jié)點之間這部分?jǐn)?shù)據(jù)內(nèi)容是相互獨立的:
- synced:用于存儲 watcher 監(jiān)聽器. 存放著的 watcher 是監(jiān)聽數(shù)據(jù)無需回溯歷史變更記錄、倘若有新數(shù)據(jù)變更事件發(fā)生即可立即發(fā)起 watch 回調(diào)的監(jiān)聽器
- unsynced:同樣用于存儲 watcher 監(jiān)聽器. 存放著的 watch 是監(jiān)聽數(shù)據(jù)仍存在歷史變更記錄需要回溯,因此新數(shù)據(jù)變更事件發(fā)生時也無法立即發(fā)起回調(diào)的這部分監(jiān)聽器
- victims:用于臨時存放一部分 watch 回調(diào)事件. 這部分回調(diào)事件是由于在通過 watchStream 發(fā)往上層途中發(fā)現(xiàn) channel 容量不足,為避免 notify 協(xié)程陷入阻塞,而選擇先將這部分變更事件追加到一個容量無上限的 victims 列表當(dāng)中
創(chuàng)建連接
調(diào)用 clientv3.New 創(chuàng)建新的 client 時會最終調(diào)用 grpc.DialContext 函數(shù)創(chuàng)建一個 grpc connection 出來。
創(chuàng)建流
通過 client.Watch 發(fā)起一次 Watch 請求,如下示例代碼
client, err := clientv3.New(clientv3.Config{
DialTimeout: 15 * time.Second,
DialKeepAliveTime: 15 * time.Second,
DialKeepAliveTimeout: 15 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
// use chained interceptors so that the default (retry and backoff) interceptors are added.
// otherwise they will be overwritten by the metric interceptor.
//
// these optional interceptors will be placed after the default ones.
// which seems to be what we want as the metrics will be collected on each attempt (retry)
grpc.WithChainUnaryInterceptor(grpcprom.UnaryClientInterceptor),
grpc.WithChainStreamInterceptor(grpcprom.StreamClientInterceptor),
},
Endpoints: []string{"https://127.0.0.1:2379"},
TLS: tlsConfig,
})
if err != nil {
panic(err)
}
wc := client.Watch(clientv3.WithRequireLeader(context.TODO()), keyPrefix)
這里就是是利用已經(jīng)建立起來的連接發(fā)起了一次 watch 請求。etcd 支持基于流的多路復(fù)用,與傳統(tǒng)基于連接的多路復(fù)用不同。如下代碼
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
fragment: ow.fragment,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
ctxKey := streamKeyFromCtx(ctx)
var closeCh chan WatchResponse
for {
// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
...
}
}
etcd 客戶端會先通過 streamKeyFromCtx(ctx) 獲取一個 key,一個 key 對應(yīng)一個 grpc stream。kube-apiserver cacher 在訪問 etcd 時的使用方式為每種資源類型有一個自己的 etcd client,也就是會為每種資源類型建立一個連接。但在 v1.27 中糾正了 RV="" 在 watch 請求中的語義,在這之前此類請求并不會穿透到 etcd,而在 v1.27 開始此類請求將直接穿透到 etcd。同一種資源類型的 watch 使用的是相同的 etcd client,且調(diào)用 client.Watch 時傳入的 ctx 也一樣,也就是同一種資源最終只對應(yīng)一個 grpc stream。從上面介紹的服務(wù)端實現(xiàn)可以看出針對每個 grpc stream 會有一個對應(yīng)的serverWatchStream 和 watchStream 對象。這就導(dǎo)致
- etcd 客戶端:同一種 k8s 資源的 watch 請求使用了同一個 grpc stream
- etcd 服務(wù)端:同一種 k8s 資源的所有 watcher 共用了同一個 ch (來自 watchStream)
最終的結(jié)果就是 serverWatchStream 通過 sendLoop 不斷的消費 ch 里面的數(shù)據(jù)并寫回到 grpc stream,眾多的 watcher 通過 notify 收到時間變更后將事件寫到 ch 里面。相當(dāng)于多個生產(chǎn)者,一個消費者,ch 是一個帶緩沖區(qū)的 chan,長度 hard code 為 128。當(dāng) watcher 越來越多后,會出現(xiàn)生產(chǎn)速度大于消費速度,最終導(dǎo)致生產(chǎn)者 watcher 在寫 ch 時阻塞從而被放入 unsynced 中。
丟事件
WatchableStore 會在后臺啟動兩個 goroutine:syncWatchersLoop 和 syncVictimsLoop 定時的執(zhí)行 watchers 同步,刷新的任務(wù),對應(yīng)上文 WatchableStore 刷新部分。
syncWatchersLoop
每 100ms 嘗試從 unsynced 里面找到固定數(shù)量的 watch,嘗試重新為這些 watcher 發(fā)送對應(yīng)的 events。關(guān)鍵代碼如下
func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
// after network partition, possibly choosing future revision watcher from restore operation
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
// do not panic when such watcher had been moved from "synced" watcher during restore operation
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}
// mark 'restore' done, since it's chosen
w.restore = false
}
if w.minRev < compactRev {
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
// retry next time
}
continue
}
if minRev > w.minRev {
minRev = w.minRev
}
}
return minRev
}
每個 watcher 對象維護(hù)了自己的 minRev 屬性,代表這個 watcher 接受的 event 的最小 RV,用來避免 event 的重復(fù)發(fā)送,注意 minRev 并不代表已經(jīng)發(fā)送成功的最大的 RV,不管發(fā)送成功與否,minRev 都會被更新,發(fā)送失敗的 watcher 和 events 會被保存在 watcherBatch 結(jié)構(gòu)中,最終保存到 victims 中。在 syncVictimsLoop 中處理 victims,繼續(xù)嘗試發(fā)給對應(yīng)的 watcher 發(fā)送對應(yīng)的 events,并根據(jù)結(jié)果同步 watcher 的狀態(tài)。
上述有一段關(guān)鍵代碼 if w.minRev < compactRev 代表 watcher 可接受的 event 的最小 Rev 比最新一次壓縮后的 Rev 還要小,意味著從 minRev 到 compactRev 的 event 已經(jīng)無法返回了,所以命中此邏輯之后會給 ch 發(fā)一個 WatchResponse 并且給 CompactionRevision 賦值,etcd 客戶端收到這個 WatchResponse 之后會判斷 CompactRevision>0 的話,代表 etcd server 端發(fā)生了壓縮,無法繼續(xù)正常返回數(shù)據(jù),會返回一個 ErrCompacted 報錯。
上述邏輯沒有問題,考慮到了異常情況。現(xiàn)實是上述代碼由于其他 bug 導(dǎo)致可能不會觸發(fā),如下
func (s *watchableStore) syncWatchers() int {
...
// in order to find key-value pairs from unsynced watchers, we need to
// find min revision index, and these revisions can be used to
// query the backend store of key-value pairs
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
var victims watcherBatch
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
w.minRev = curRev + 1
// send to ch
...
}
}
先通過 unsynced.choose 找到需要同步的 watchers 和其中最小的 minRev,獲取 minRev 之后的所有 events,將對應(yīng)的 events 發(fā)送給對應(yīng)的 watcher。問題在 w.minRev = curRev + 1,執(zhí)行到這里的時候有可能已經(jīng) w.minRev < compactedRev 了,也就是說 etcd server 端在 s.unsynced.choose 執(zhí)行結(jié)束后,在遍歷 wg.watchers 前進(jìn)行了壓縮的操作了。這里直接執(zhí)行 w.minRev 的賦值操作將有可能導(dǎo)致 wminRev 不再小于 compactedRev,如果接下來的 send 由于仍然阻塞而沒有執(zhí)行成功,這個 watcher 會被放在 civtims 中,最后在同步 victims 后如果這個 watcher 發(fā)送成功一部分 event 但沒有全部發(fā)送完,會被再次放入 unsynced 中,當(dāng)再次走到 syncWathers 中,通過 chooseAll 去找 unsynced 的 watcher 時,就不會觸發(fā)里面 w.minRev < compactRev 的邏輯,也就是說最終導(dǎo)致一個本該結(jié)束的 watcher 無法正常結(jié)束,繼續(xù)從 curRev + 1 返回 events,compactedRev 到 curRev 的 events 就會丟失。
PrevKV == nil
經(jīng)過上面分析,在 etcd server 端壓縮后,可能出現(xiàn)無法正常結(jié)束異常 watcher 的情況,同時由于在 watherBatch 中保存了未發(fā)送成功的 watcher 和 events,所以 serverWatchStream 仍然可能會收到異常 watcher 的異常 event,如下
func (sws *serverWatchStream) sendLoop() {
...
for {
select {
case wresp, ok := <-sws.watchStream.Chan():
if !ok {
return
}
// TODO: evs is []mvccpb.Event type
// either return []*mvccpb.Event from the mvcc package
// or define protocol buffer with []mvccpb.Event.
evs := wresp.Events
events := make([]*mvccpb.Event, len(evs))
sws.mu.RLock()
needPrevKV := sws.prevKV[wresp.WatchID]
sws.mu.RUnlock()
for i := range evs {
events[i] = &evs[i]
if needPrevKV && !IsCreateEvent(evs[i]) {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(context.TODO(), evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
}
}
}
...
}
調(diào)用 Range 方法尋找此 event 的上一個版本,由于中間進(jìn)行了壓縮,找不到了,會返回如下報錯:mvcc: required revision has been compacted,但是這里只判斷了正常情況下去給 event.PrevKV 賦值,未考慮異常情況,最終在 kube-apiserver 側(cè)就會看到 "etcd event received with PrevKv=nil" 的報錯。這是一個比較低級的問題,golang 任何返回 err 的地方都應(yīng)該去判斷 err 是否為 nil,這里居然沒有判斷。
問題修復(fù)
判斷 if w.minRev < compectedRev,則直接跳過下面所有邏輯的執(zhí)行,等待下一次 chooseAll 執(zhí)行時返回給客戶端帶了 CompactRevision 的 WatchResponse,參考 Fix watch event loss #17555[1],已經(jīng)合入 master,并 backport 回了 release-3.4 和 release-3.5 上。
總結(jié)Etcd 側(cè)丟事件的問題雖然修復(fù)了,但由于基于流的復(fù)用,仍然存在事件處理延遲的問題,以及阻塞時 etcd server 內(nèi)存暴漲的問題,關(guān)于后者,有另外一個 PR 在解決此問題,后續(xù)會再安排一篇專門解釋這個問題。對于前者,在前一篇中介紹了 k8s 側(cè)已經(jīng)通過給 cacher 發(fā)起的 watch 請求設(shè)置特定的 ctx,最終會和直接訪問 etcd 的請求區(qū)分開,別分使用兩個不同的 grpc stream。這樣就解決了 kube-apiserver cache 因為 etcd 丟事件導(dǎo)致的問題,但那些 RV="" 的 watch 請求之間仍然共用一個 grpc stream,雖然不會丟事件了,但這種請求多了之后仍然存在 event 接收延遲的風(fēng)險。
對于眾多的 k8s 用戶來說:
- 如果使用的是 v1.25(不包括) 之前的版本,那么無需擔(dān)心此問題,因為 k8s 側(cè)有兜底,即使不升級 etcd 也可以完全避免丟事件的問題;
- 如果使用的是 v1.25 ~ v1.27(不包括)的版本,k8s 側(cè)兜底邏輯失效,存在丟事件的風(fēng)險,但由于這些版本的 watch 請求不會穿透到 etcd,理論上丟事件發(fā)生的概率會非常低,可以忽略;
- 如果使用的是 v1.27 到 v1.30 (不包括)的 k8s 版本,墻裂建議升級 etcd,使用修復(fù)此問題的版本;
- 如果使用的是 v1.30 及之后的版本(應(yīng)該不會有,太新了),為 kube-apiserver 關(guān)閉這個 featuregate
WatchFromStorageWithoutResourceVersion
如果對上述內(nèi)容有任何疑問,歡迎加筆者微信討論 參考資料 [1]
pr#17555: https://github.com/etcd-io/etcd/pull/17555
