client-go 之 Reflector 源碼分析
圖片來源:https://unsplash.com/photos/mFl5WwGJnTs
前面我們說了 Informer 通過對 APIServer 的資源對象執(zhí)行 List 和 Watch 操作,把獲取到的數據存儲在本地的緩存中,其中實現這個的核心功能就是 Reflector,我們可以稱其為反射器,從名字我們可以看出來它的主要功能就是反射,就是將 Etcd 里面的數據反射到本地存儲(DeltaFIFO)中。Reflector 首先通過 List 操作獲取所有的資源對象數據,保存到本地存儲,然后通過 Watch 操作監(jiān)控資源的變化,觸發(fā)相應的事件處理,比如前面示例中的 Add 事件、Update 事件、Delete 事件。
Reflector 結構體的定義位于 staging/src/k8s.io/client-go/tools/cache/reflector.go 下面:
// k8s.io/client-go/tools/cache/reflector.go// Reflector(反射器) 監(jiān)聽指定的資源,將所有的變化都反射到給定的存儲中去type Reflector struct {// name 標識這個反射器的名稱,默認為 文件:行數(比如reflector.go:125)// 默認名字通過 k8s.io/apimachinery/pkg/util/naming/from_stack.go 下面的 GetNameFromCallsite 函數生成name string// 期望放到 Store 中的類型名稱,如果提供,則是 expectedGVK 的字符串形式// 否則就是 expectedType 的字符串,它僅僅用于顯示,不用于解析或者比較。expectedTypeName string// 我們放到 Store 中的對象類型expectedType reflect.Type// 如果是非結構化的,我們期望放在 Sotre 中的對象的 GVKexpectedGVK *schema.GroupVersionKind// 與 watch 源同步的目標 Storestore Store// 用來執(zhí)行 lists 和 watches 操作的 listerWatcher 接口(最重要的)listerWatcher ListerWatcher// backoff manages backoff of ListWatchbackoffManager wait.BackoffManagerresyncPeriod time.Duration// ShouldResync 會周期性的被調用,當返回 true 的時候,就會調用 Store 的 Resync 操作ShouldResync func() boolclock clock.ClockpaginatedResult bool// Kubernetes 資源在 APIServer 中都是有版本的,對象的任何修改(添加、刪除、更新)都會造成資源版本更新,lastSyncResourceVersion 就是指的這個版本lastSyncResourceVersion string// 如果之前的 list 或 watch 帶有 lastSyncResourceVersion 的請求中是一個 HTTP 410(Gone)的失敗請求,則 isLastSyncResourceVersionGone 為 trueisLastSyncResourceVersionGone bool// lastSyncResourceVersionMutex 用于保證對 lastSyncResourceVersion 的讀/寫訪問。lastSyncResourceVersionMutex sync.RWMutexWatchListPageSize int64}// NewReflector 創(chuàng)建一個新的反射器對象,將使給定的 Store 保持與服務器中指定的資源對象的內容同步。// 反射器只把具有 expectedType 類型的對象放到 Store 中,除非 expectedType 是 nil。// 如果 resyncPeriod 是非0,那么反射器會周期性地檢查 ShouldResync 函數來決定是否調用 Store 的 Resync 操作// `ShouldResync==nil` 意味著總是要執(zhí)行 Resync 操作。// 這使得你可以使用反射器周期性地處理所有的全量和增量的對象。func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {// 默認的反射器名稱為 file:linereturn NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)}// NewNamedReflector 與 NewReflector 一樣,只是指定了一個 name 用于日志記錄func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {realClock := &clock.RealClock{}r := &Reflector{name: name,listerWatcher: lw,store: store,backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),resyncPeriod: resyncPeriod,clock: realClock,}r.setExpectedType(expectedType)return r}
從源碼中我們可以看出來通過 NewReflector 實例化反射器的時候,必須傳入一個 ListerWatcher 接口對象,這個也是反射器最核心的功能,該接口擁有 List 和 Watch 方法,用于獲取和監(jiān)控資源對象。
// k8s.io/client-go/tools/cache/listwatch.go// Lister 是知道如何執(zhí)行初始化List列表的對象type Lister interface {// List 應該返回一個列表類型的對象;// Items 字段將被解析,ResourceVersion 字段將被用于正確啟動 watch 的地方List(options metav1.ListOptions) (runtime.Object, error)}// Watcher 是知道如何執(zhí)行 watch 操作的任何對象type Watcher interface {// Watch 在指定的版本開始執(zhí)行 watch 操作Watch(options metav1.ListOptions) (watch.Interface, error)}// ListerWatcher 是任何知道如何對一個資源執(zhí)行初始化List列表和啟動Watch監(jiān)控操作的對象type ListerWatcher interface {ListerWatcher}
而 Reflector 對象通過 Run 函數來啟動監(jiān)控并處理監(jiān)控事件的:
// k8s.io/client-go/tools/cache/reflector.go// Run 函數反復使用反射器的 ListAndWatch 函數來獲取所有對象和后續(xù)的 deltas。// 當 stopCh 被關閉的時候,Run函數才會退出。func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {utilruntime.HandleError(err)}}, r.backoffManager, true, stopCh)klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)}
所以不管我們傳入的 ListWatcher 對象是如何實現的 List 和 Watch 操作,只要實現了就可以,最主要的還是看 ListAndWatch 函數是如何去實現的,如何去調用 List 和 Watch 的:
// k8s.io/client-go/tools/cache/reflector.go// ListAndWatch 函數首先列出所有的對象,并在調用的時候獲得資源版本,然后使用該資源版本來進行 watch 操作。// 如果 ListAndWatch 沒有初始化 watch 成功就會返回錯誤。func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)var resourceVersion stringoptions := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}if err := func() error {initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})defer initTrace.LogIfLong(10 * time.Second)var list runtime.Objectvar paginatedResult boolvar err errorlistCh := make(chan struct{}, 1)panicCh := make(chan interface{}, 1)go func() {defer func() {if r := recover(); r != nil {panicCh <- r}}()// 如果 listWatcher 支持,會嘗試 chunks(分塊)收集 List 列表數據// 如果不支持,第一個 List 列表請求將返回完整的響應數據。pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {return r.listerWatcher.List(opts)}))switch {case r.WatchListPageSize != 0:pager.PageSize = r.WatchListPageSizecase r.paginatedResult:// 獲得一個初始的分頁結果。假定此資源和服務器符合分頁請求,并保留默認的分頁器大小設置case options.ResourceVersion != "" && options.ResourceVersion != "0":pager.PageSize = 0}list, paginatedResult, err = pager.List(context.Background(), options)if isExpiredError(err) {r.setIsLastSyncResourceVersionExpired(true)list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})}close(listCh)}()select {case <-stopCh:return nilcase r := <-panicCh:panic(r)case <-listCh:}if err != nil {return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)}if options.ResourceVersion == "0" && paginatedResult {r.paginatedResult = true}r.setIsLastSyncResourceVersionExpired(false) // list 成功initTrace.Step("Objects listed")listMetaInterface, err := meta.ListAccessor(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)}// 獲取資源版本號resourceVersion = listMetaInterface.GetResourceVersion()initTrace.Step("Resource version extracted")// 將資源數據轉換成資源對象列表,將 runtime.Object 對象轉換成 []runtime.Object 對象items, err := meta.ExtractList(list)if err != nil {return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)}initTrace.Step("Objects extracted")// 將資源對象列表中的資源對象和資源版本號存儲在 Store 中if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)}initTrace.Step("SyncWith done")r.setLastSyncResourceVersion(resourceVersion)initTrace.Step("Resource version updated")return nil}(); err != nil {return err}resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup()}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}// 如果 ShouldResync 為 nil 或者調用返回true,則執(zhí)行 Store 的 Resync 操作if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()resyncCh, cleanup = r.resyncChan()}}()for {// stopCh 一個停止循環(huán)的機會select {case <-stopCh:return nildefault:}timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))// 設置watch的選項,因為前期列舉了全量對象,從這里只要監(jiān)聽最新版本以后的資源就可以了// 如果沒有資源變化總不能一直掛著吧?也不知道是卡死了還是怎么了,所以設置一個超時會好一點options = metav1.ListOptions{ResourceVersion: resourceVersion,TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true,}start := r.clock.Now()// 執(zhí)行 Watch 操作w, err := r.listerWatcher.Watch(options)if err != nil {switch {case isExpiredError(err):klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)case err == io.EOF:// watch closed normallycase err == io.ErrUnexpectedEOF:klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)default:utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))}if utilnet.IsConnectionRefused(err) {time.Sleep(time.Second)continue}return nil}// 調用 watchHandler 來處理分發(fā) watch 到的事件對象if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {if err != errorStopRequested {switch {case isExpiredError(err):klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)default:klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)}}return nil}}}
首先通過反射器的 relistResourceVersion 函數獲得反射器 relist 的資源版本,如果資源版本非 0,則表示根據資源版本號繼續(xù)獲取,當傳輸過程中遇到網絡故障或者其他原因導致中斷,下次再連接時,會根據資源版本號繼續(xù)傳輸未完成的部分。可以使本地緩存中的數據與Etcd集群中的數據保持一致,該函數實現如下所示:
// k8s.io/client-go/tools/cache/reflector.go// relistResourceVersion 決定了反射器應該list或者relist的資源版本。// 如果最后一次relist的結果是HTTP 410(Gone)狀態(tài)碼,則返回"",這樣relist將通過quorum讀取etcd中可用的最新資源版本。// 返回使用 lastSyncResourceVersion,這樣反射器就不會使用在relist結果或watch事件中watch到的資源版本更老的資源版本進行relist了func (r *Reflector) relistResourceVersion() string {r.lastSyncResourceVersionMutex.RLock()defer r.lastSyncResourceVersionMutex.RUnlock()if r.isLastSyncResourceVersionGone {// 因為反射器會進行分頁List請求,如果 lastSyncResourceVersion 過期了,所有的分頁列表請求就都會跳過 watch 緩存// 所以設置 ResourceVersion="",然后再次 List,重新建立反射器到最新的可用資源版本,從 etcd 中讀取,保持一致性。return ""}if r.lastSyncResourceVersion == "" {// 反射器執(zhí)行的初始 List 操作的時候使用0作為資源版本。return "0"}return r.lastSyncResourceVersion}
上面的 ListAndWatch 函數實現看上去雖然非常復雜,但其實大部分是對分頁的各種情況進行處理,最核心的還是調用 r.listerWatcher.List(opts) 獲取全量的資源對象,而這個 List 其實 ListerWatcher 實現的 List 方法,這個 ListerWatcher 接口實際上在該接口定義的同一個文件中就有一個 ListWatch 結構體實現了:
// k8s.io/client-go/tools/cache/listwatch.go// ListFunc 知道如何 List 資源type ListFunc func(options metav1.ListOptions) (runtime.Object, error)// WatchFunc 知道如何 watch 資源type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)// ListWatch 結構體知道如何 list 和 watch 資源對象,它實現了 ListerWatcher 接口。// 它為 NewReflector 使用者提供了方便的函數。其中 ListFunc 和 WatchFunc 不能為 nil。type ListWatch struct {ListFunc ListFuncWatchFunc WatchFunc// DisableChunking 對 list watcher 請求不分塊。DisableChunking bool}// 列出一組 APIServer 資源func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {return lw.ListFunc(options)}// Watch 一組 APIServer 資源func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)}
當我們真正使用一個 Informer 對象的時候,實例化的時候就會調用這里的 ListWatch 來進行初始化,比如前面我們實例中使用的 Deployment Informer,
// k8s.io/client-go/informers/apps/v1/deployment.go// NewFilteredDeploymentInformer 為 Deployment 構造一個新的 Informer。// 總是傾向于使用一個 informer 工廠來獲取一個 shared informer,而不是獲取一個獨立的 informer,這樣可以減少內存占用和服務器的連接數。func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)}func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)}func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)}
從上面代碼我們就可以看出來當我們去調用一個資源對象的 Informer() 的時候,就會去調用上面的 NewFilteredDeploymentInformer 函數進行初始化,而在初始化的使用就傳入了 cache.ListWatch 對象,其中就有 List 和 Watch 的實現操作,也就是說前面反射器在 ListAndWatch 里面調用的 ListWatcher 的 List 操作是在一個具體的資源對象的 Informer 中實現的,比如我們這里就是通過的 ClientSet 客戶端與 APIServer 交互獲取到 Deployment 的資源列表數據的,通過在 ListFunc 中的 client.AppsV1().Deployments(namespace).List(context.TODO(), options) 實現的,這下應該好理解了吧。
獲取到了全量的 List 數據過后,通過 listMetaInterface.GetResourceVersion() 來獲取資源的版本號,ResourceVersion(資源版本號)非常重要,Kubernetes 中所有的資源都擁有該字段,它標識當前資源對象的版本號,每次修改(CUD)當前資源對象時,Kubernetes API Server 都會更改 ResourceVersion,這樣 client-go 執(zhí)行 Watch 操作時可以根據ResourceVersion 來確定當前資源對象是否發(fā)生了變化。
然后通過 meta.ExtractList 函數將資源數據轉換成資源對象列表,將 runtime.Object 對象轉換成 []runtime.Object 對象,因為全量獲取的是一個資源列表。
接下來是通過反射器的 syncWith 函數將資源對象列表中的資源對象和資源版本號存儲在 Store 中,這個會在后面的章節(jié)中詳細說明。
最后處理完成后通過 r.setLastSyncResourceVersion(resourceVersion) 操作來設置最新的資源版本號,其他的就是啟動一個 goroutine 去定期檢查是否需要執(zhí)行 Resync 操作,調用存儲中的 r.store.Resync() 來執(zhí)行,關于存儲的實現在后面和大家進行講解。
緊接著就是 Watch 操作了,Watch 操作通過 HTTP 協議與 APIServer 建立長連接,接收Kubernetes API Server 發(fā)來的資源變更事件,和 List 操作一樣,Watch 的真正實現也是具體的 Informer 初始化的時候傳入的,比如上面的 Deployment Informer 中初始化的時候傳入的 WatchFunc,底層也是通過 ClientSet 客戶端對 Deployment 執(zhí)行 Watch 操作 client.AppsV1().Deployments(namespace).Watch(context.TODO(), options) 實現的。
獲得 watch 的資源數據后,通過調用 r.watchHandler 來處理資源的變更事件,當觸發(fā)Add 事件、Update 事件、Delete 事件時,將對應的資源對象更新到本地緩存(DeltaFIFO)中并更新 ResourceVersion 資源版本號。
// k8s.io/client-go/tools/cache/reflector.go// watchHandler 監(jiān)聽 w 保持資源版本最新func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {eventCount := 0defer w.Stop()loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan(): // 從 watch 中獲取事件對象if !ok {break loop}if event.Type == watch.Error {return apierrors.FromObject(event.Object)}if r.expectedType != nil {if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}if r.expectedGVK != nil {if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))continue}// 獲得當前 watch 到資源的資源版本號newResourceVersion := meta.GetResourceVersion()switch event.Type { // 分發(fā)事件case watch.Added: // Add 事件err := r.store.Add(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Modified: // Update 事件err := r.store.Update(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))}case watch.Deleted: // Delete 事件err := r.store.Delete(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))}case watch.Bookmark:// `Bookmark` 意味著 watch 已經同步到這里了,只要更新資源版本即可。default:utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))}// 更新資源版本號*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}watchDuration := r.clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)}klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)return nil}
這就是 Reflector 反射器中最核心的 ListAndWatch 實現,從上面的實現我們可以看出獲取到的數據最終都流向了本地的 Store,也就是 DeltaFIFO,所以接下來我們需要來分析 DeltaFIFO 的實現。
K8S進階訓練營,點擊下方圖片了解詳情

