<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kubernetes client-go源碼閱讀10之Informer

          共 22550字,需瀏覽 46分鐘

           ·

          2023-08-23 21:44

          只要讀k8s源代碼一定會(huì)讀informer的代碼的,因?yàn)閕nformer相當(dāng)優(yōu)秀,大多數(shù)分布式項(xiàng)目(比如OpenStack)在解決組件間通信的問(wèn)題時(shí)都會(huì)選擇如kafka,rabbitmaq之類的消息隊(duì)列,但是k8s不走尋常路,選擇了自己解決,解決的方案是informer。

          假設(shè)我們沒(méi)有informer,那么我們應(yīng)該如何從api server獲取數(shù)據(jù)?

          一般而言,我們有兩種方式, 一是全量獲取,二是增量獲取,兩種都各有優(yōu)缺點(diǎn),前者優(yōu)點(diǎn)是,每次可以獲取全量的最新狀態(tài), 邏輯簡(jiǎn)單,但是缺點(diǎn)很明顯,如果請(qǐng)求頻次過(guò)于頻繁,就會(huì)有比較大的性能消耗,  如果頻次過(guò)低就不夠?qū)崟r(shí),但是依舊有比較大的性能消耗,想象一個(gè)100節(jié)點(diǎn)的集群,1000個(gè)deployment,  1000個(gè)ReplicaSet,  5000千個(gè)pod,  加個(gè)每個(gè)對(duì)象都只占5k,  就接近50MB, 這顯然會(huì)占用比較多的帶寬,這是讓人難以接受的,而且數(shù)據(jù)的時(shí)效性不夠高也是難以接受的,所以對(duì)于一個(gè)中大型集群而言,不能使用這種方式。

          第二種方式是增量獲取更新,這種方式的優(yōu)點(diǎn)是時(shí)效性高占用資源低,但是相較于第一種方式而言,實(shí)現(xiàn)起來(lái)稍顯復(fù)雜,復(fù)雜度在于兩點(diǎn),一是我們需要有健壯的容錯(cuò)機(jī)制,比如出錯(cuò)怎么辦? 如果跳過(guò)可能導(dǎo)致狀態(tài)不一致, 比如漏掉一個(gè)更新的請(qǐng)求, 那么對(duì)應(yīng)的資源一直得不到正確的處理, 所以我們需要一種重試機(jī)制, 二是, 我們需要緩存全量的數(shù)據(jù)用于快速的檢索,  比如定時(shí)輪訓(xùn)的檢查資源,但我們不可能總是等收到增量更新才開始業(yè)務(wù)邏輯,所以增量更新的邏輯比較復(fù)雜,  并且增量更新不能單獨(dú)存在, 因?yàn)槲覀冃枰康馁Y源, 所以需要配合第一種方式。

          那么怎么平衡這兩種獲取資源的方式呢?  k8s的選擇是,**我全都要!!!**。

          e7c2ec1fecd7e3a27b99d143ef9a2bf3.webp

          我全要圖片

          快速入門

          一般來(lái)說(shuō)informer會(huì)跟workque, controller在一起,這點(diǎn)從k8s的源代碼可以很明顯的看到,不過(guò)為了簡(jiǎn)單起見,這里只看informer的部分。

                
                package main

          import (
           "context"
           "flag"
           "fmt"
           "os"
           "os/signal"
           "path/filepath"
           "syscall"
           "time"

           "k8s.io/client-go/util/homedir"
           "k8s.io/klog/v2"

           v1 "k8s.io/api/core/v1"
           "k8s.io/apimachinery/pkg/fields"
           "k8s.io/client-go/kubernetes"
           "k8s.io/client-go/tools/cache"
           "k8s.io/client-go/tools/clientcmd"
          )

          func main() {
           var kubeconfig *string
           var master string

           if home := homedir.HomeDir(); home != "" {
            kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube""config"), "(optional) absolute path to the kubeconfig file")
           } else {
            kubeconfig = flag.String("kubeconfig""""absolute path to the kubeconfig file")
           }
           flag.StringVar(&master, "master""""master url")
           flag.Parse()

           config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
           if err != nil {
            klog.Fatal(err)
           }

           clientset, err := kubernetes.NewForConfig(config)
           if err != nil {
            klog.Fatal(err)
           }

           // 1.
           podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

           // 2.
           _, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
            AddFunc: func(obj interface{}) {
                  // 3.
            AddFunc: func(obj interface{}) {
             key, err := cache.MetaNamespaceKeyFunc(obj)
             if err == nil {
              fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Add: ", key)
             }
            },
            UpdateFunc: func(old interface{}, new interface{}) {
             key, err := cache.MetaNamespaceKeyFunc(new)
             if err == nil {
              fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Update: ", key)
             }
            },
            DeleteFunc: func(obj interface{}) {
                      // 4.
             key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
             if err == nil {
              fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Delete: ", key)
             }
            },
           })
              // 5. 
           ctx, cancel := context.WithCancel(context.Background())
           defer cancel()
           ch := make(chan os.Signal, 1)
           signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
           go func() {
            <-ch
            klog.Info("Received termination, signaling shutdown")
            cancel()
           }()
              // 6.
           informer.Run(ctx.Done())
          }

          輸出如下:

                
                2023-06-03 14:19:21 Add:  default/example-fcsjwbzzf2
          2023-06-03 14:20:21 Update:  default/example-fcsjwbzzf2
          2023-06-03 14:21:21 Update:  default/example-fcsjwbzzf2

          注意: 每分鐘以Update的形式再次調(diào)用UpdateFunc

          實(shí)名吐槽Golang的時(shí)間格式化!!!

          代碼分解如下:

          1. 創(chuàng)建ListWatch對(duì)象,用于獲取資源最新列表及后續(xù)更新
          2. 創(chuàng)建informer對(duì)象,傳入必要的參數(shù)
          3. 注冊(cè)各種回調(diào)函數(shù), 如AddFunc等
          4. 刪除的對(duì)象和其他對(duì)象不同,所以需要不同的方法來(lái)獲取key
          5. 設(shè)置退出信號(hào)量,k8s的慣用操作了
          6. 啟動(dòng)informer

          通過(guò)上面的代碼可以知道,創(chuàng)建informer有兩件比較重要的事情,一是創(chuàng)建ListWatch,二是注冊(cè)回調(diào)函數(shù)。

          ListWatch

          ListWatch就如名字指明的那樣,List,Watch,前者是拉取指定資源的資源列表,比如default命名空間下的所有Pod資源,Watch是在前者拉取完成之后開始監(jiān)聽之后所有的資源變化(前者會(huì)得到一個(gè)版本號(hào),watch可以借助這個(gè)版本號(hào),只獲取版本號(hào)之后的資源),比如新增,更新,刪除等變化。

                
                podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

          func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
              // 1.
           optionsModifier := func(options *metav1.ListOptions) {
            options.FieldSelector = fieldSelector.String()
           }
           return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
          }

          func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
              // 2.
           listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
            optionsModifier(&options)
            return c.Get().
             Namespace(namespace).
             Resource(resource).
             VersionedParams(&options, metav1.ParameterCodec).
             Do(context.TODO()).
             Get()
           }
              // 3.
           watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
            options.Watch = true
            optionsModifier(&options)
            return c.Get().
             Namespace(namespace).
             Resource(resource).
             VersionedParams(&options, metav1.ParameterCodec).
             Watch(context.TODO())
           }
           return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
          }

          代碼分解如下:

          1. 設(shè)置Options, Options以函數(shù)的形式傳入也是k8s一個(gè)比較常用的模式了。
          2. 簡(jiǎn)單的在靜態(tài)客戶端的Get方法上包裝一層函數(shù)
          3. 簡(jiǎn)單的在靜態(tài)客戶端的Watch方法上包裝一層函數(shù)

          可以看到ListWatch的內(nèi)部構(gòu)造并不復(fù)雜,僅僅是將GetWatch方法組合起來(lái)而已。

          Informer

          因?yàn)楸疚闹饕治鰅nformer,所以會(huì)略過(guò)其中Store的部分,我們暫且將其作為一個(gè)存儲(chǔ)的黑盒子即可,以后有文章再詳細(xì)說(shuō)明。

                
                _, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
              // 略過(guò)代碼部分
          }

          func NewInformer(
           lw ListerWatcher,
           objType runtime.Object,
           resyncPeriod time.Duration,
           h ResourceEventHandler,
          )
           (Store, Controller)
           {
           // 1.
           clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
           return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
          }
                                           
          func newInformer(
           lw ListerWatcher,
           objType runtime.Object,
           resyncPeriod time.Duration,
           h ResourceEventHandler,
           clientState Store,
          )
           Controller
           {
           // 2.
           fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
            KnownObjects:          clientState,
            EmitDeltaTypeReplaced: true,
           })

           cfg := &Config{
            Queue:            fifo,
            ListerWatcher:    lw,
            ObjectType:       objType,
            FullResyncPeriod: resyncPeriod,
            RetryOnError:     false,
            // 3.
            Process: func(obj interface{}) error {
             // from oldest to newest
             for _, d := range obj.(Deltas) {
              switch d.Type {
              case Sync, Replaced, Added, Updated:
               if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                if err := clientState.Update(d.Object); err != nil {
                 return err
                }
                h.OnUpdate(old, d.Object)
               } else {
                if err := clientState.Add(d.Object); err != nil {
                 return err
                }
                h.OnAdd(d.Object)
               }
              case Deleted:
               if err := clientState.Delete(d.Object); err != nil {
                return err
               }
               h.OnDelete(d.Object)
              }
             }
             return nil
            },
           }
           return New(cfg)
          }
                                           
          func New(c *Config) Controller {
           ctlr := &controller{
            config: *c,
            clock:  &clock.RealClock{},
           }
           return ctlr
          }

          代碼分解如下:

          1. 創(chuàng)建一個(gè)Store, 它用來(lái)存儲(chǔ)informer獲取到的資源
          2. 將Store再包裝一層(k8s的傳統(tǒng)操作了),提供先入先出(fifo)的功能
          3. informer處理的主函數(shù),根據(jù)對(duì)象類型調(diào)用對(duì)應(yīng)的回調(diào)函數(shù),以及將對(duì)象更新到綁定的Store

          這里有一個(gè)值得注意的點(diǎn),informer是一個(gè)符合Controller接口的對(duì)象,閱讀過(guò)k8s源代碼或者寫過(guò)operator的對(duì)controller應(yīng)該不會(huì)陌生,這是k8s比較重要的對(duì)象了,或者說(shuō)模式。

          總的來(lái)說(shuō),informer的初始化過(guò)程還是比較清晰的,主要分為兩步,創(chuàng)建隊(duì)列(fifo),配置處理邏輯(Process),既然初始化不復(fù)雜,那么復(fù)雜的就是Run方法。

          Run

          那么看看informer怎么運(yùn)行的吧

                
                informer.Run(ctx.Done())

          func (c *controller) Run(stopCh <-chan struct{}) {
           defer utilruntime.HandleCrash()
           go func() {
            <-stopCh
            c.config.Queue.Close()
           }()
              // 1.
           r := NewReflector(
            c.config.ListerWatcher,
            c.config.ObjectType,
            c.config.Queue,
            c.config.FullResyncPeriod,
           )
              // 2.
           r.ShouldResync = c.config.ShouldResync
           r.WatchListPageSize = c.config.WatchListPageSize
           r.clock = c.clock
           if c.config.WatchErrorHandler != nil {
            r.watchErrorHandler = c.config.WatchErrorHandler
           }
           c.reflectorMutex.Lock()
           c.reflector = r
           c.reflectorMutex.Unlock()
           var wg wait.Group
           // 3.
           wg.StartWithChannel(stopCh, r.Run)
              // 4.
           wait.Until(c.processLoop, time.Second, stopCh)
           wg.Wait()
          }

          代碼分解如下:

          1. 創(chuàng)建Reflector, reflector負(fù)責(zé)和apiserver通信,不斷的將數(shù)據(jù)同步給informer
          2. 配置Reflector的各項(xiàng)參數(shù)
          3. 啟動(dòng)Reflector
          4. 啟動(dòng)informer的主循環(huán)

          由于processLoop比較簡(jiǎn)單,我們先看看它的源代碼。

                
                func (c *controller) processLoop() {
           for {
            obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
            if err != nil {
             if err == ErrFIFOClosed {
              return
             }
             if c.config.RetryOnError {
              // This is the safe way to re-enqueue.
              c.config.Queue.AddIfNotPresent(obj)
             }
            }
           }
          }

          代碼很簡(jiǎn)單,應(yīng)該不需要特別的說(shuō)明,就是傳入之前的Process方法用于處理隊(duì)列傳入的各個(gè)對(duì)象。

          Reflector

          Reflector的初始化并不復(fù)雜,代碼如下

                
                r := NewReflector(
            c.config.ListerWatcher,
            c.config.ObjectType,
            c.config.Queue,
            c.config.FullResyncPeriod,
          )

          func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
           return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
          }

          func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
           // 省略其他代碼
           r.setExpectedType(expectedType)
           return r
          }

          func (r *Reflector) setExpectedType(expectedType interface{}) {
           r.expectedType = reflect.TypeOf(expectedType)
           if obj, ok := expectedType.(*unstructured.Unstructured); ok {
            gvk := obj.GroupVersionKind()
            r.expectedGVK = &gvk
            r.expectedTypeName = gvk.String()
           }
          }

          上面的代碼唯一值得提的是setExpectedType,k8s的對(duì)象總是要知道gvk的。

          然后就是Reflector的運(yùn)行邏輯

                
                
          func (r *Reflector) Run(stopCh <-chan struct{}) {
              // 1.
           wait.BackoffUntil(func() {
                  // 2.
            if err := r.ListAndWatch(stopCh); err != nil {
             r.watchErrorHandler(r, err)
            }
           }, r.backoffManager, true, stopCh)
          }

          代碼分解如下:

          1. client-go提供的重試幫助函數(shù),只要沒(méi)有收到終止信號(hào)就會(huì)不斷的重試傳入的方法
          2. List And Watch, 獲取列表并監(jiān)聽資源更新

          重頭戲就是ListAndWatch了。

                
                func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
           var resourceVersion string
              // 1.
           options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

              // 2.
           if err := func() error {
            var list runtime.Object
            var paginatedResult bool
            var err error
            listCh := make(chan struct{}, 1)
            panicCh := make(chan interface{}, 1)
            go func() {
                      // 3.
             pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
              return r.listerWatcher.List(opts)
             }))
                      
             // 4.
             list, paginatedResult, err = pager.List(context.Background(), options)
             close(listCh)
            }()
            // 5.
            items, err := meta.ExtractList(list)
                  // 6.
            if err := r.syncWith(items, resourceVersion); err != nil {
             return fmt.Errorf("unable to sync list result: %v", err)
            }
                  // 7.
            r.setLastSyncResourceVersion(resourceVersion)
            return nil
           }(); err != nil {
            return err
           }

           resyncerrc := make(chan error, 1)
           cancelCh := make(chan struct{})
           defer close(cancelCh)
              // 8.
           go func() {
            resyncCh, cleanup := r.resyncChan()
                  if r.ShouldResync == nil || r.ShouldResync() {
                      klog.V(4).Infof("%s: forcing resync", r.name)
                      if err := r.store.Resync(); err != nil {
                          resyncerrc <- err
                          return
                      }
                  }
                  cleanup()
            resyncCh, cleanup = r.resyncChan()
            }
           }()
           // 9.
           for {
                  // 10.
            w, err := r.listerWatcher.Watch(options)
            // 11.
            if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
             return nil
            }
           }
          }

          func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
           found := make([]interface{}, 0len(items))
           for _, item := range items {
            found = append(found, item)
           }
           return r.store.Replace(found, resourceVersion)
          }

          代碼分解如下:

          1. 設(shè)置資源版本號(hào)ResourceVersion, k8s提供了一種以版本號(hào)過(guò)濾的資源的方式,如果是首次,那么是0,如果因?yàn)榫W(wǎng)絡(luò)等原因重試,就可以增量的獲取遺落的資源列表,而不需再次全量的獲取一遍
          2. 將List的邏輯放在一個(gè)匿名函數(shù)中統(tǒng)一處理錯(cuò)誤,常見操作了。
          3. 構(gòu)建一個(gè)分頁(yè)器,分批獲取資源列表。
          4. 開始獲取,這里的List其實(shí)就是調(diào)用之前傳入的lw.List
          5. 獲取列表,這一步會(huì)檢查列表對(duì)象是否合法以及做一定的轉(zhuǎn)換。
          6. 將數(shù)據(jù)同步到Store,使用它的Replace方法,這可以在上面源代碼的最后看到具體操作。
          7. 設(shè)置ResourceVersion,如果后續(xù)出錯(cuò),就可以從這個(gè)資源版本開始了
          8. resync, 就是將Store里面的數(shù)據(jù)以Update的事件形式再次傳入informer,會(huì)觸發(fā)UpdateFunc回調(diào)函數(shù)。
          9. 監(jiān)聽的循環(huán)
          10. 通過(guò)之前傳入的lw的Watch方法,獲得watch.Interface,這個(gè)接口會(huì)不斷的給出變更對(duì)象
          11. 處理上一步傳來(lái)的事件。

          最后就是Reflector的核心方法了。

                
                func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
          loop:
           for {
            select {
            case <-stopCh:
             return errorStopRequested
            case err := <-errc:
             return err
                  // 1.
            case event, ok := <-w.ResultChan():
             if !ok {
              break loop
             }
                      // 2.
             if event.Type == watch.Error {
              return apierrors.FromObject(event.Object)
             }
             if r.expectedType != nil {
              if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
               continue
              }
             }
             if r.expectedGVK != nil {
              if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
               continue
              }
             }
                      // 3.
             meta, err := meta.Accessor(event.Object)
             newResourceVersion := meta.GetResourceVersion()
             switch event.Type {
             case watch.Added:
              err := r.store.Add(event.Object)
             case watch.Modified:
              err := r.store.Update(event.Object)
             case watch.Deleted:
              err := r.store.Delete(event.Object)
             case watch.Bookmark:
             default:
              utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
             }
             *resourceVersion = newResourceVersion
             r.setLastSyncResourceVersion(newResourceVersion)
             if rvu, ok := r.store.(ResourceVersionUpdater); ok {
              rvu.UpdateResourceVersion(newResourceVersion)
             }
             eventCount++
            }
           }
           return nil
          }

          代碼分解如下:

          1. 獲取監(jiān)聽到的事件
          2. 判斷事件是否正常,是否符合預(yù)期的GVK等
          3. 不同的事件以不同方法更新,這樣可以觸發(fā)不同的回調(diào)函數(shù)

          總的來(lái)說(shuō),reflector做的事情就是將數(shù)據(jù)更新到Store里面,而Informer會(huì)不斷的從Store里面讀取數(shù)據(jù),當(dāng)讀到數(shù)據(jù)后就調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)。

          總結(jié)

          Informer是k8s里面非常重要的數(shù)據(jù)同步機(jī)制,理解了Informer就可以很容易找到k8s相關(guān)組件的主要業(yè)務(wù)邏輯了,可以想象的到,業(yè)務(wù)邏輯一定注冊(cè)在回調(diào)函數(shù)中,不過(guò)真實(shí)的代碼要多了一層抽象,因?yàn)閗8s源代碼里回調(diào)函數(shù)邏輯一般是判斷一下就扔進(jìn)workqueue了。

          client-go的代碼部分差不多結(jié)束了,后面閱讀k8s的源代碼。

          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产资源在线播放 | 熟妇三区 | 天天免费看黄片 | 久久久99国产精品免费 | 人人操人人 |