<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kube-apiserver 重啟引起的全量對象更新

          共 9121字,需瀏覽 19分鐘

           ·

          2021-04-06 19:52

          現(xiàn)象

          k8s master進(jìn)行線上升級,notifier利用client-go提供的informer機(jī)制注冊了EndPoint的Update Handler,當(dāng)kube-apiserver重啟時觸發(fā)了大量的update事件,觸發(fā)依賴的第三方服務(wù)限流。

          原因排查

          在測試環(huán)境進(jìn)行了測試,并且在注冊update事件處理函數(shù)中調(diào)用 reflect.DeepEqual(old, new) 進(jìn)行了比較,發(fā)現(xiàn)返回true,即old與new完全相同卻產(chǎn)生了update事件。

          接下來就是到事件產(chǎn)生的地方去尋找原因,主要有兩個地方,一個是reflect的ListAndWatch,相當(dāng)于元數(shù)據(jù)的生產(chǎn)者,另一個是sharedIndexedInformer的HandleDeltas,消費(fèi)元數(shù)據(jù)并生成對應(yīng)類型的事件分發(fā)下去,接下來分別看

          HandleDeltas (事件來源)

           func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {    s.blockDeltas.Lock()    defer s.blockDeltas.Unlock()        // from oldest to newest    for _, d := range obj.(Deltas) {      switch d.Type {      case Sync, Added, Updated:          isSync := d.Type == Sync          s.cacheMutationDetector.AddObject(d.Object)                // 重點(diǎn)關(guān)注          if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {            if err := s.indexer.Update(d.Object); err != nil {                return err            }            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)          } else {            if err := s.indexer.Add(d.Object); err != nil {                return err            }            s.processor.distribute(addNotification{newObj: d.Object}, isSync)          }      case Deleted:          if err := s.indexer.Delete(d.Object); err != nil {            return err          }          s.processor.distribute(deleteNotification{oldObj: d.Object}, false)      }    }    return nil }

          很容易看出來,當(dāng)delta類型為非Delete時,informer會從自己的indexer(帶索引的緩存)中獲取指定的object是否存在(注意這里其實(shí)是從object計算出key,然后用key尋找到的),如果存在則更新緩存且分發(fā)一個update事件。可以繼續(xù)看后續(xù)對分發(fā)的這個notification的處理,都是直接處理,沒有任何去重邏輯。到這里就可以理解為啥會收到全量的update事件了,正式因為此時緩存里已經(jīng)有了對應(yīng)數(shù)據(jù),而在分發(fā)事件時并沒有比較緩存中的object是否和新來的object一致就直接當(dāng)成update處理了,導(dǎo)致客戶端收到全量的更新事件。那問題又來了,為什么重啟apiserver時會往deltafifo里全量扔一遍數(shù)據(jù),正常不應(yīng)該是從最后的resourceVersion開始重新watch嗎?繼續(xù)看下面的處理

          ListAndWatch (全量數(shù)據(jù)的來源)

          // 代碼位置 k8s.io/client-go/tools/cache/reflector.go  // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {    klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)    var resourceVersion string     // Explicitly set "0" as resource version - it's fine for the List()    // to be served from cache and potentially be delayed relative to    // etcd contents. Reflector framework will catch up via Watch() eventually.    options := metav1.ListOptions{ResourceVersion: "0"}     if err := func() error {      initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})      defer initTrace.LogIfLong(10 * time.Second)      var list runtime.Object      var err error      listCh := make(chan struct{}, 1)      panicCh := make(chan interface{}, 1)      go func() {          defer func() {            if r := recover(); r != nil {                panicCh <- r            }          }()          // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first          // list request will return the full response.          pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {            return r.listerWatcher.List(opts)          }))          if r.WatchListPageSize != 0 {            pager.PageSize = r.WatchListPageSize          }          // Pager falls back to full list if paginated list calls fail due to an "Expired" error.          list, err = pager.List(context.Background(), options)          close(listCh)      }()      select {      case <-stopCh:          return nil      case r := <-panicCh:          panic(r)      case <-listCh:      }      if err != nil {          return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)      }      initTrace.Step("Objects listed")      listMetaInterface, err := meta.ListAccessor(list)      if err != nil {          return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)      }      resourceVersion = listMetaInterface.GetResourceVersion()      initTrace.Step("Resource version extracted")      items, err := meta.ExtractList(list)      if err != nil {          return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)      }      initTrace.Step("Objects extracted")      if err := r.syncWith(items, resourceVersion); err != nil {          return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)      }      initTrace.Step("SyncWith done")      r.setLastSyncResourceVersion(resourceVersion)      initTrace.Step("Resource version updated")      return nil    }(); err != nil {      return err    }     resyncerrc := make(chan error, 1)    cancelCh := make(chan struct{})    defer close(cancelCh)    go func() {      resyncCh, cleanup := r.resyncChan()      defer func() {          cleanup() // Call the last one written into cleanup      }()      for {          select {          case <-resyncCh:          case <-stopCh:            return          case <-cancelCh:            return          }          if r.ShouldResync == nil || r.ShouldResync() {            klog.V(4).Infof("%s: forcing resync", r.name)            if err := r.store.Resync(); err != nil {                resyncerrc <- err                return            }          }          cleanup()          resyncCh, cleanup = r.resyncChan()      }    }()     for {      // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors      select {      case <-stopCh:          return nil      default:      }       timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))      options = metav1.ListOptions{          ResourceVersion: resourceVersion,          // We want to avoid situations of hanging watchers. Stop any wachers that do not          // receive any events within the timeout window.          TimeoutSeconds: &timeoutSeconds,          // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.          // Reflector doesn't assume bookmarks are returned at all (if the server do not support          // watch bookmarks, it will ignore this field).          AllowWatchBookmarks: true,      }       w, err := r.listerWatcher.Watch(options)      if err != nil {          switch err {          case io.EOF:            // watch closed normally          case io.ErrUnexpectedEOF:            klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)          default:                        // 這里報出來的錯誤            utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))          }          // If this is "connection refused" error, it means that most likely apiserver is not responsive.          // It doesn't make sense to re-list all objects because most likely we will be able to restart          // watch where we ended.          // If that's the case wait and resend watch request.          // 關(guān)鍵點(diǎn):正常應(yīng)該執(zhí)行continue,結(jié)果卻執(zhí)行到了return nil          if urlError, ok := err.(*url.Error); ok {            if opError, ok := urlError.Err.(*net.OpError); ok {                if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {                  time.Sleep(time.Second)                  continue                }            }          }          return nil      }       if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {          if err != errorStopRequested {            switch {            case apierrs.IsResourceExpired(err):                klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)            default:                klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)            }          }          return nil      }    } }

          其中有一段注釋寫的很明白,如果是"connection refused" error, 意味著很可能是apiserver無響應(yīng)了,這個時候需要做的是等一段時間然后從結(jié)束的resourceVersion開始重新watch而不是重新執(zhí)行relist。看注釋的話其實(shí)是沒有問題的,符合正常邏輯,即斷開了之后重新watch而不用同步全量數(shù)據(jù),但為什么還會收到全量的upade事件呢,原因就在下面的判斷邏輯

          if urlError, ok := err.(*url.Error); ok {    if opError, ok := urlError.Err.(*net.OpError); ok {      if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {        time.Sleep(time.Second)        continue      }    }}

          這里不太容易看出來有什么問題,猛一看肯定覺得沒問題,確實(shí)是connection refused error啊,其實(shí)不是的。實(shí)際返回的錯誤是

          &url.Error{Err: &net.OpError{Err: &os.SyscallError{Err: &syscall.ECONNREFUSED}}}

          而上面代碼判斷的是如下的error

          &url.Error{Err: &net.OpError{Err: &syscall.ECONNREFUSED}}

          正因為判斷失效,沒有執(zhí)行到continue而是直接return nil,這就會導(dǎo)致整個函數(shù)退出,然后外層有一個循環(huán)開始重新調(diào)用ListAndWatch,然后開始list,開始watch,全量數(shù)據(jù)就是這么來的。正常我們是期望直接執(zhí)行到continue,跳過list直接watch的,就不會有全量同步的問題了。

          總結(jié)

          至此,已經(jīng)清楚了具體的原因,ListAndWatch的修改很簡單,已經(jīng)給官方提了 pull request:https://github.com/kubernetes/kubernetes/pull/81634 修復(fù)這個問題,并合入1.16中,自1.16之后功能正常。


          K8S 進(jìn)階訓(xùn)練營


           點(diǎn)擊屏末  | 即刻學(xué)習(xí)
          瀏覽 27
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  青娱乐在线免费观看 | 成人无码做爱视频 | 欧美操逼视屏 | 麻豆成人免费 | 中文无码字幕在线 |