kube-apiserver 重啟引起的全量對象更新
現(xiàn)象
k8s master進(jìn)行線上升級,notifier利用client-go提供的informer機(jī)制注冊了EndPoint的Update Handler,當(dāng)kube-apiserver重啟時觸發(fā)了大量的update事件,觸發(fā)依賴的第三方服務(wù)限流。
原因排查
在測試環(huán)境進(jìn)行了測試,并且在注冊update事件處理函數(shù)中調(diào)用 reflect.DeepEqual(old, new) 進(jìn)行了比較,發(fā)現(xiàn)返回true,即old與new完全相同卻產(chǎn)生了update事件。
接下來就是到事件產(chǎn)生的地方去尋找原因,主要有兩個地方,一個是reflect的ListAndWatch,相當(dāng)于元數(shù)據(jù)的生產(chǎn)者,另一個是sharedIndexedInformer的HandleDeltas,消費(fèi)元數(shù)據(jù)并生成對應(yīng)類型的事件分發(fā)下去,接下來分別看
HandleDeltas (事件來源)
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)// 重點(diǎn)關(guān)注if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil}
很容易看出來,當(dāng)delta類型為非Delete時,informer會從自己的indexer(帶索引的緩存)中獲取指定的object是否存在(注意這里其實(shí)是從object計算出key,然后用key尋找到的),如果存在則更新緩存且分發(fā)一個update事件。可以繼續(xù)看后續(xù)對分發(fā)的這個notification的處理,都是直接處理,沒有任何去重邏輯。到這里就可以理解為啥會收到全量的update事件了,正式因為此時緩存里已經(jīng)有了對應(yīng)數(shù)據(jù),而在分發(fā)事件時并沒有比較緩存中的object是否和新來的object一致就直接當(dāng)成update處理了,導(dǎo)致客戶端收到全量的更新事件。那問題又來了,為什么重啟apiserver時會往deltafifo里全量扔一遍數(shù)據(jù),正常不應(yīng)該是從最后的resourceVersion開始重新watch嗎?繼續(xù)看下面的處理
ListAndWatch (全量數(shù)據(jù)的來源)
// 代碼位置 k8s.io/client-go/tools/cache/reflector.go// ListAndWatch first lists all items and get the resource version at the moment of call,// and then use the resource version to watch.// It returns error if ListAndWatch didn't even try to initialize watch.func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)var resourceVersion string// Explicitly set "0" as resource version - it's fine for the List()// to be served from cache and potentially be delayed relative to// etcd contents. Reflector framework will catch up via Watch() eventually.options := metav1.ListOptions{ResourceVersion: "0"}if err := func() error {initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar err errorlistCh := make(chan struct{}, 1)panicCh := make(chan interface{}, 1)go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first// list request will return the full response.pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))if r.WatchListPageSize != 0 {pager.PageSize = r.WatchListPageSize}// Pager falls back to full list if paginated list calls fail due to an "Expired" error.list, err = pager.List(context.Background(), options)close(listCh)}()select {case <-stopCh:return nilcase r := <-panicCh:panic(r)case <-listCh:}if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)}initTrace.Step("Objects listed")listMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)}resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)}initTrace.Step("Objects extracted")if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil}(); err != nil {return err}resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()resyncCh, cleanup = r.resyncChan()}}()for {// give the stopCh a chance to stop the loop, even in case of continue statements further down on errorsselect {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options = metav1.ListOptions{ResourceVersion: resourceVersion,// We want to avoid situations of hanging watchers. Stop any wachers that do not// receive any events within the timeout window.TimeoutSeconds: &timeoutSeconds,// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.// Reflector doesn't assume bookmarks are returned at all (if the server do not support// watch bookmarks, it will ignore this field).AllowWatchBookmarks: true,}w, err := r.listerWatcher.Watch(options)if err != nil {switch err {case io.EOF:// watch closed normallycase io.ErrUnexpectedEOF:klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)default:// 這里報出來的錯誤utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))}// If this is "connection refused" error, it means that most likely apiserver is not responsive.// It doesn't make sense to re-list all objects because most likely we will be able to restart// watch where we ended.// If that's the case wait and resend watch request.// 關(guān)鍵點(diǎn):正常應(yīng)該執(zhí)行continue,結(jié)果卻執(zhí)行到了return nilif urlError, ok := err.(*url.Error); ok {if opError, ok := urlError.Err.(*net.OpError); ok {if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {time.Sleep(time.Second)continue}}}return nil}if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {if err != errorStopRequested {switch {case apierrs.IsResourceExpired(err):klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)default:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)}}return nil}}}
其中有一段注釋寫的很明白,如果是"connection refused" error, 意味著很可能是apiserver無響應(yīng)了,這個時候需要做的是等一段時間然后從結(jié)束的resourceVersion開始重新watch而不是重新執(zhí)行relist。看注釋的話其實(shí)是沒有問題的,符合正常邏輯,即斷開了之后重新watch而不用同步全量數(shù)據(jù),但為什么還會收到全量的upade事件呢,原因就在下面的判斷邏輯
if urlError, ok := err.(*url.Error); ok {if opError, ok := urlError.Err.(*net.OpError); ok {if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {time.Sleep(time.Second)continue}}}
這里不太容易看出來有什么問題,猛一看肯定覺得沒問題,確實(shí)是connection refused error啊,其實(shí)不是的。實(shí)際返回的錯誤是
&url.Error{Err: &net.OpError{Err: &os.SyscallError{Err: &syscall.ECONNREFUSED}}}而上面代碼判斷的是如下的error
&url.Error{Err: &net.OpError{Err: &syscall.ECONNREFUSED}}正因為判斷失效,沒有執(zhí)行到continue而是直接return nil,這就會導(dǎo)致整個函數(shù)退出,然后外層有一個循環(huán)開始重新調(diào)用ListAndWatch,然后開始list,開始watch,全量數(shù)據(jù)就是這么來的。正常我們是期望直接執(zhí)行到continue,跳過list直接watch的,就不會有全量同步的問題了。
總結(jié)
至此,已經(jīng)清楚了具體的原因,ListAndWatch的修改很簡單,已經(jīng)給官方提了 pull request:https://github.com/kubernetes/kubernetes/pull/81634 修復(fù)這個問題,并合入1.16中,自1.16之后功能正常。
K8S 進(jìn)階訓(xùn)練營
點(diǎn)擊屏末 | 閱讀原文 | 即刻學(xué)習(xí)
