<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          client-go 之 DeltaFIFO 實現(xiàn)原理

          共 18583字,需瀏覽 38分鐘

           ·

          2020-09-02 10:32

          從 DeltaFIFO 的名字可以看出它是一個 FIFO,也就是一個先進(jìn)先出的隊列,而 Delta 表示的是變化的資源對象存儲,包含操作資源對象的類型和數(shù)據(jù),Reflector 就是這個隊列的生產(chǎn)者。

          Delta

          在了解 DeltaFIFO 之前我們需要先具體了解下什么是 Delta,我們先來看看 client-go 中是如何定義的,Delta 的數(shù)據(jù)結(jié)構(gòu)定義位于staging/src/k8s.io/client-go/tools/cache/delta_fifo.go文件中。

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?DeltaType?是變化的類型(添加、刪除等)
          type?DeltaType?string

          //?變化的類型定義
          const?(
          ?Added???DeltaType?=?"Added"?????//?增加
          ?Updated?DeltaType?=?"Updated"???//?更新
          ?Deleted?DeltaType?=?"Deleted"???//?刪除
          ?//?當(dāng)遇到 watch 錯誤,不得不進(jìn)行重新list時,就會觸發(fā) Replaced。
          ??//?我們不知道被替換的對象是否發(fā)生了變化。
          ?//
          ??//?注意:以前版本的 DeltaFIFO 也會對 Replace 事件使用 Sync。
          ??//?所以只有當(dāng)選項 EmitDeltaTypeReplaced 為真時才會觸發(fā) Replaced。
          ?Replaced?DeltaType?=?"Replaced"
          ?//?Sync?是針對周期性重新同步期間的合成事件
          ?Sync?DeltaType?=?"Sync"??????????//?同步
          )

          // Delta 是 DeltaFIFO 存儲的類型。
          //?它告訴你發(fā)生了什么變化,以及變化后對象的狀態(tài)。
          //
          //?[*]?除非變化是刪除操作,否則你將得到對象被刪除前的最終狀態(tài)。
          type?Delta?struct?{
          ?Type???DeltaType
          ?Object?interface{}
          }

          Delta 其實就是 Kubernetes 系統(tǒng)中帶有變化類型的資源對象,如下圖所示:

          其實也非常好理解,比如我們現(xiàn)在添加了一個 Pod,那么這個 Delta 就是帶有 Added 這個類型的 Pod,如果是刪除了一個 Deployment,那么這個 Delta 就是帶有 Deleted 類型的 Deployment,為什么要帶上類型呢?因為我們需要根據(jù)不同的類型去執(zhí)行不同的操作,增加、更新、刪除的動作顯然是不一樣的。

          FIFO

          上面我們解釋了什么是 Delta,接下來需要說下 FIFO,我們說 FIFO 很好理解,就是一個先進(jìn)先出的隊列,Reflector 是其生產(chǎn)者,其數(shù)據(jù)結(jié)構(gòu)定義位于 ?staging/src/k8s.io/client-go/tools/cache/fifo.go 文件中:

          //?k8s.io/client-go/tools/cache/fifo.go

          type?FIFO?struct?{
          ?lock?sync.RWMutex
          ?cond?sync.Cond

          ??//?items?中的每一個?key?也在?queue?中
          ?items?map[string]interface{}
          ?queue?[]string

          ?//?如果第一批?items?被?Replace()?插入或者先調(diào)用了?Deleta/Add/Update
          ??//?則 populated 為 true。
          ?populated?bool
          ?//?第一次調(diào)用?Replace()?時插入的?items?數(shù)
          ?initialPopulationCount?int

          ??// keyFunc 用于生成排隊的 item 插入和檢索的 key。
          ?keyFunc?KeyFunc

          ??//?標(biāo)識隊列已關(guān)閉,以便在隊列清空時控制循環(huán)可以退出。
          ?closed?????bool
          ?closedLock?sync.Mutex
          }

          var?(
          ?_?=?Queue(&FIFO{})?//?FIFO?是一個?Queue
          )

          上面的 FIFO 數(shù)據(jù)結(jié)構(gòu)中定義了 items 和 queue 兩個屬性來保存隊列中的數(shù)據(jù),其中 queue 中存的是資源對象的 key 列表,而 items 是一個 map 類型,其 key 就是 queue 中保存的 key,value 值是真正的資源對象數(shù)據(jù)。既然是先進(jìn)進(jìn)去的隊列,那么就要具有隊列的基本功能,結(jié)構(gòu)體下面其實就有一個類型斷言,表示當(dāng)前的 FIFO 實現(xiàn)了 Queue 這個接口,所以 FIFO 要實現(xiàn)的功能都是在 Queue 中定義的,Queue 接口和 FIFO 位于同一文件中:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Queue?擴(kuò)展了?Store??//?with?a?collection?of?Store?keys?to?"process".
          //?每一次添加、更新或刪除都可以將對象的key放入到該集合中。
          //?Queue?具有使用給定的?accumulator?來推導(dǎo)出相應(yīng)的?key?的方法
          //?Queue?可以從多個?goroutine?中并發(fā)訪問
          //?Queue?可以被關(guān)閉,之后?Pop?操作會返回一個錯誤
          type?Queue?interface?{
          ?Store

          ?// Pop 一直阻塞,直到至少有一個key要處理或隊列被關(guān)閉,隊列被關(guān)閉會返回一個錯誤。
          ??//?在前面的情況下?Pop?原子性地選擇一個?key?進(jìn)行處理,從?Store?中刪除關(guān)聯(lián)(key、accumulator)的數(shù)據(jù),
          ??//?并處理 accumulator。Pop 會返回被處理的 accumulator 和處理的結(jié)果。
          ?
          ?//?PopProcessFunc?函數(shù)可以返回一個?ErrRequeue{inner},在這種情況下,Pop?將
          ??//(a)把那個(key,accumulator)關(guān)聯(lián)作為原子處理的一部分返回到?Queue?中
          ??//?(b)?從 Pop 返回內(nèi)部錯誤。
          ?Pop(PopProcessFunc)?(interface{},?error)

          ?//?僅當(dāng)該?key?尚未與一個非空的?accumulator?相關(guān)聯(lián)的時候,AddIfNotPresent?將給定的?accumulator?放入?Queue(與?accumulator?的?key?相關(guān)聯(lián)的)
          ?AddIfNotPresent(interface{})?error
          ?
          ??//?如果第一批 keys 都已經(jīng) Popped,則 HasSynced 返回 true。
          ??//?如果在添加、更新、刪除之前發(fā)生了第一次?Replace?操作,則第一批?keys?為?true
          ??//?否則為空。
          ?HasSynced()?bool

          ?//?關(guān)閉該隊列
          ?Close()
          }

          從上面的定義中可以看出 Queue 這個接口擴(kuò)展了 Store 這個接口,這個就是前面我們說的本地存儲,隊列實際上也是一種存儲,然后在 Store 的基礎(chǔ)上增加 Pop、AddIfNotPresent、HasSynced、Close 4個函數(shù)就變成了 Queue 隊列了,所以我們優(yōu)先來看下 Store 這個接口的定義,該數(shù)據(jù)結(jié)構(gòu)定義位于文件 k8s.io/client-go/tools/cache/store.go 中:

          //?k8s.io/client-go/tools/cache/store.go

          // Store 是一個通用的對象存儲和處理的接口。
          //?Store?包含一個從字符串?keys?到?accumulators?的映射,并具有?to/from?當(dāng)前
          //?給定 key 關(guān)聯(lián)的 accumulators 添加、更新和刪除給定對象的操作。
          //?一個 Store 還知道如何從給定的對象中獲取 key,所以很多操作只提供對象。
          //
          //?在最簡單的?Store?實現(xiàn)中,每個?accumulator?只是最后指定的對象,或者刪除后為空,
          //?所以 Store 只是簡單的存儲。
          //
          // Reflector 反射器知道如何 watch 一個服務(wù)并更新一個 Store 存儲,這個包提供了 Store 的各種實現(xiàn)。
          type?Store?interface?{

          ?// Add 將指定對象添加到與指定對象的 key 相關(guān)的 accumulator(累加器)中。
          ?Add(obj?interface{})?error

          ?//?Update?與指定對象的?key?相關(guān)的?accumulator?中更新指定的對象
          ?Update(obj?interface{})?error

          ?//?Delete?根據(jù)指定的對象?key?刪除指定的對象
          ?Delete(obj?interface{})?error

          ?//?List?返回當(dāng)前所有非空的?accumulators?的列表
          ?List()?[]interface{}

          ?//?ListKeys?返回當(dāng)前與非空?accumulators?關(guān)聯(lián)的所有?key?的列表
          ?ListKeys()?[]string

          ?//?Get?根據(jù)指定的對象獲取關(guān)聯(lián)的?accumulator
          ?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)

          ?//?GetByKey?根據(jù)指定的對象?key?獲取關(guān)聯(lián)的?accumulator
          ?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)

          ?//?Replace?會刪除原來Store中的內(nèi)容,并將新增的list的內(nèi)容存入Store中,即完全替換數(shù)據(jù)
          ??// Store 擁有 list 列表的所有權(quán),在調(diào)用此函數(shù)后,不應(yīng)該引用它了。
          ?Replace([]interface{},?string)?error

          ?// Resync 在 Store 中沒有意義,但是在 DeltaFIFO 中有意義。
          ?Resync()?error
          }

          //?KeyFunc?就是從一個對象中生成一個唯一的?Key?的函數(shù),上面的?FIFO?中就有用到
          type?KeyFunc?func(obj?interface{})?(string,?error)

          //?MetaNamespaceKeyFunc?是默認(rèn)的?KeyFunc,生成的?key?格式為:
          //?<namespace>/<name>
          //?如果是全局的,則namespace為空,那么生成的?key?就是?<name>
          //?當(dāng)然要從?key?拆分出?namespace?和?name?也非常簡單
          func?MetaNamespaceKeyFunc(obj?interface{})?(string,?error)
          ?{
          ?if?key,?ok?:=?obj.(ExplicitKey);?ok?{
          ??return?string(key),?nil
          ?}
          ?meta,?err?:=?meta.Accessor(obj)
          ?if?err?!=?nil?{
          ??return?"",?fmt.Errorf("object?has?no?meta:?%v",?err)
          ?}
          ?if?len(meta.GetNamespace())?>?0?{
          ??return?meta.GetNamespace()?+?"/"?+?meta.GetName(),?nil
          ?}
          ?return?meta.GetName(),?nil
          }

          Store 就是一個通用的對象存儲和處理的接口,可以用來寫入對象和獲取對象。其中 cache 數(shù)據(jù)結(jié)構(gòu)就實現(xiàn)了上面的 Store 接口,但是這個屬于后面的 Indexer 部分的知識點(diǎn),這里我們就不展開說明了。

          我們說 Queue 擴(kuò)展了 Store 接口,所以 Queue 本身也是一個存儲,只是在存儲的基礎(chǔ)上增加了 Pop 這樣的函數(shù)來實現(xiàn)彈出對象,是不是就變成了一個隊列了。

          FIFO 就是一個具體的 Queue 實現(xiàn),按照順序彈出對象是不是就是一個先進(jìn)先出的隊列了?如下圖所示:

          所以我們接下來看看 FIFO 是如何實現(xiàn)存儲和 Pop 的功能的。首先是實現(xiàn) Store 存儲中最基本的方法,第一個就是添加對象:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Add 插入一個對象,將其放入隊列中,只有當(dāng)元素不在集合中時才會插入隊列。
          func?(f?*FIFO)?Add(obj?interface{})?error?{
          ??//?獲取對象的?key
          ?id,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true
          ??//?元素不在隊列中的時候才插入隊列
          ?if?_,?exists?:=?f.items[id];?!exists?{
          ??f.queue?=?append(f.queue,?id)
          ?}
          ??//?items?是一個?map,所以直接賦值給這個?key,這樣對更新元素也同樣適用
          ?f.items[id]?=?obj
          ?f.cond.Broadcast()
          ?return?nil
          }

          更新對象,實現(xiàn)非常簡單,因為上面的 Add 方法就包含了 Update 的實現(xiàn),因為 items 屬性是一個 Map,對象有更新直接將對應(yīng) key 的 value 值替換成新的對象即可:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Update?和?Add?相同的實現(xiàn)
          func?(f?*FIFO)?Update(obj?interface{})?error?{
          ?return?f.Add(obj)
          }

          接著就是刪除 Delete 方法的實現(xiàn),這里可能大家會有一個疑問,下面的刪除實現(xiàn)只刪除了 items 中的元素,那這樣豈不是 queue 和 items 中的 key 會不一致嗎?的確會這樣,但是這是一個隊列,下面的 Pop() 函數(shù)會根據(jù) queue 里面的元素一個一個的彈出 key,沒有對象就不處理了,相當(dāng)于下面的 Pop() 函數(shù)中實現(xiàn)了 queue 的 key 的刪除:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Delete 從隊列中移除一個對象。
          //?不會添加到?queue?中去,這個實現(xiàn)是假設(shè)消費(fèi)者只關(guān)心對象
          //?不關(guān)心它們被創(chuàng)建或添加的順序。
          func?(f?*FIFO)?Delete(obj?interface{})?error?{
          ??//?獲取對象的?key
          ?id,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true
          ??//?刪除?items?中?key?為?id?的元素,就是刪除隊列中的對象
          ?delete(f.items,?id)
          ??//?為什么不直接處理 queue 這個 slice 呢?
          ?return?err
          }

          然后是獲取隊列中所有對象的 List 方法的實現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          //?List?獲取隊列中的所有對象
          func?(f?*FIFO)?List()?[]interface{}?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]interface{},?0,?len(f.items))
          ??//?獲取所有的items的values值(items是一個Map)
          ?for?_,?item?:=?range?f.items?{
          ??list?=?append(list,?item)
          ?}
          ?return?list
          }

          接著是獲取隊列中所有元素的 key 的 ListKeys 方法實現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // ListKeys 返回現(xiàn)在 FIFO 隊列中所有對象的 keys 列表。
          func?(f?*FIFO)?ListKeys()?[]string?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]string,?0,?len(f.items))
          ??//?獲取所有items的key值(items是一個Map)
          ?for?key?:=?range?f.items?{
          ??list?=?append(list,?key)
          ?}
          ?return?list
          }

          至于根據(jù)對象或者對象的 key 獲取隊列中的元素,就更簡單了:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Get?獲取指定對象在隊列中的元素
          func?(f?*FIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
          ?key,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?nil,?false,?KeyError{obj,?err}
          ?}
          ??//?調(diào)用?GetByKey?實現(xiàn)
          ?return?f.GetByKey(key)
          }

          //?GetByKey?根據(jù)?key?獲取隊列中的元素
          func?(f?*FIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ??//?因為?items?是一個?Map,所以直接根據(jù)?key?獲取即可
          ?item,?exists?=?f.items[key]
          ?return?item,?exists,?nil
          }

          然后是一個 Replace 替換函數(shù)的實現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // Replace 將刪除隊列中的內(nèi)容,'f'?擁有 map 的所有權(quán),調(diào)用該函數(shù)過后,不應(yīng)該再引用 map。
          //?'f'?的隊列也會被重置,返回時,隊列將包含 map 中的元素,沒有特定的順序。
          func?(f?*FIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
          ?//?從?list?中提取出?key?然后和里面的元素重新進(jìn)行映射
          ??items?:=?make(map[string]interface{},?len(list))
          ?for?_,?item?:=?range?list?{
          ??key,?err?:=?f.keyFunc(item)
          ??if?err?!=?nil?{
          ???return?KeyError{item,?err}
          ??}
          ??items[key]?=?item
          ?}

          ?f.lock.Lock()
          ?defer?f.lock.Unlock()

          ?if?!f.populated?{
          ??f.populated?=?true
          ??f.initialPopulationCount?=?len(items)
          ?}
          ?//?重新設(shè)置?items?和?queue?的值
          ?f.items?=?items
          ?f.queue?=?f.queue[:0]
          ?for?id?:=?range?items?{
          ??f.queue?=?append(f.queue,?id)
          ?}
          ?if?len(f.queue)?>?0?{
          ??f.cond.Broadcast()
          ?}
          ?return?nil
          }

          還有 Store 存儲中的最后一個方法 Resync 的實現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // Resync 會保證 Store 中的每個對象在 queue 中都有它的 key。
          //?
          //?這應(yīng)該是禁止操作的,因為該屬性由所有操作維護(hù)
          func?(f?*FIFO)?Resync()?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?將所有?queue?中的元素放到一個?set?中
          ?inQueue?:=?sets.NewString()
          ?for?_,?id?:=?range?f.queue?{
          ??inQueue.Insert(id)
          ?}
          ??//?然后將所有?items?中的?key?加回到?queue?中去
          ?for?id?:=?range?f.items?{
          ??if?!inQueue.Has(id)?{
          ???f.queue?=?append(f.queue,?id)
          ??}
          ?}
          ?if?len(f.queue)?>?0?{
          ??f.cond.Broadcast()
          ?}
          ?return?nil
          }

          上面的所有方法就實現(xiàn)了一個普通的 Store,然后要想變成一個 Queue,還需要實現(xiàn)額外的 Pop 方法:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Pop 會等到一個元素準(zhǔn)備好后再進(jìn)行處理,如果有多個元素準(zhǔn)備好了,則按照它們被添加或更新的順序返回。
          //
          //?在處理之前,元素會從隊列(和存儲)中移除,所以如果沒有成功處理,應(yīng)該用 AddIfNotPresent()?函數(shù)把它添加回來。
          //?處理函數(shù)是在有鎖的情況下調(diào)用的,所以更新其中需要和隊列同步的數(shù)據(jù)結(jié)構(gòu)是安全的。
          func?(f?*FIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?for?{
          ??//?當(dāng)隊列為空時,Pop()?的調(diào)用會被阻塞住,直到新的元素插入隊列后
          ??for?len(f.queue)?==?0?{
          ???if?f.IsClosed()?{
          ????return?nil,?ErrFIFOClosed
          ???}
          ???//?等待?condition?被廣播
          ???f.cond.Wait()
          ??}
          ????//?取出?queue?隊列中的第一個元素(key)
          ??id?:=?f.queue[0]
          ????//?刪除第一個元素
          ??f.queue?=?f.queue[1:]
          ??if?f.initialPopulationCount?>?0?{
          ???f.initialPopulationCount--
          ??}
          ????//?獲取被彈出的元素
          ??item,?ok?:=?f.items[id]
          ??if?!ok?{
          ???//?因為 item 可能已經(jīng)被刪除了。
          ???continue
          ??}
          ????//?刪除彈出的元素
          ??delete(f.items,?id)
          ????//?處理彈出的元素
          ??err?:=?process(item)
          ??if?e,?ok?:=?err.(ErrRequeue);?ok?{
          ??????//?如果處理沒成功,需要調(diào)用?addIfNotPresent?加回隊列
          ???f.addIfNotPresent(id,?item)
          ???err?=?e.Err
          ??}
          ??return?item,?err
          ?}
          }

          另外的一個就是上面提到的 AddIfNotPresent、HasSynced、Close 幾個函數(shù)的實現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // AddIfNotPresent 插入一個元素,將其放入隊列中。
          //?如果元素已經(jīng)在集合中了,則會被忽略。
          //
          //?這在單個的?生產(chǎn)者/消費(fèi)者?的場景下非常有用,這樣消費(fèi)者可以安全地重試
          //?而不需要與生產(chǎn)者爭奪,也不需要排隊等待過時的元素。
          func?(f?*FIFO)?AddIfNotPresent(obj?interface{})?error?{
          ?id,?err?:=?f.keyFunc(obj)??//?獲取對象的?key
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.addIfNotPresent(id,?obj)??//?調(diào)用?addIfNotPresent?真正的實現(xiàn)
          ?return?nil
          }

          // addIfNotPresent 會假設(shè)已經(jīng)持有 fifo 鎖了,如果不存在,則將其添加到隊列中去。
          func?(f?*FIFO)?addIfNotPresent(id?string,?obj?interface{})?{
          ?f.populated?=?true
          ??//?存在則忽略
          ?if?_,?exists?:=?f.items[id];?exists?{
          ??return
          ?}
          ??//?添加到?queue?和?items?中去
          ?f.queue?=?append(f.queue,?id)
          ?f.items[id]?=?obj
          ??//?廣播?condition
          ?f.cond.Broadcast()
          }

          //?關(guān)閉隊列
          func?(f?*FIFO)?Close()?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ??//?標(biāo)記為關(guān)閉
          ?f.closed?=?true
          ?f.cond.Broadcast()
          }

          //?如果先調(diào)用了 Add/Update/Delete/AddIfNotPresent,或者先調(diào)用了Update,但被 Replace()?插入的第一批元素已經(jīng)被彈出,則 HasSynced 返回true。
          func?(f?*FIFO)?HasSynced()?bool?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?return?f.populated?&&?f.initialPopulationCount?==?0
          }

          到這里我們就實現(xiàn)了一個簡單的 FIFO 隊列,其實這里就是對 FIFO 這個數(shù)據(jù)結(jié)構(gòu)的理解,沒有特別的地方,只是在隊列里面有 items 和 queue 兩個屬性來維護(hù)隊列而已。

          DeltaFIFO

          上面我們已經(jīng)實現(xiàn)了 FIFO,接下來就看下一個 DeltaFIFO 是如何實現(xiàn)的,DeltaFIFO 和 FIFO 一樣也是一個隊列,但是也有不同的地方,里面的元素是一個 Delta,Delta 上面我們已經(jīng)提到表示的是帶有變化類型的資源對象。

          DeltaFIFO 的數(shù)據(jù)結(jié)構(gòu)定義位于 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go 文件中:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          type?DeltaFIFO?struct?{
          ?//?lock/cond?保護(hù)訪問的?items?和?queue
          ?lock?sync.RWMutex
          ?cond?sync.Cond

          ??//?用來存儲?Delta?數(shù)據(jù)?->?對象key:?Delta數(shù)組

          ?items?map[string]Deltas
          ??//?用來存儲資源對象的key
          ?queue?[]string

          ?//?通過?Replace()?接口將第一批對象放入隊列,或者第一次調(diào)用增、刪、改接口時標(biāo)記為true
          ?populated?bool
          ?//?通過?Replace()?接口(全量)將第一批對象放入隊列的對象數(shù)量
          ?initialPopulationCount?int

          ?//?對象鍵的計算函數(shù)
          ?keyFunc?KeyFunc

          ?//?knownObjects?列出?"known"?的鍵?--?影響到?Delete(),Replace()?和?Resync()
          ??//?knownObjects?其實就是?Indexer,里面存有已知全部的對象
          ?knownObjects?KeyListerGetter

          ?//?標(biāo)記?queue?被關(guān)閉了
          ??closed?????bool
          ?closedLock?sync.Mutex

          ?//?emitDeltaTypeReplaced?當(dāng)?Replace()?被調(diào)用的時候,是否要?emit?Replaced?或者?Sync
          ?// DeltaType(保留向后兼容)。
          ?emitDeltaTypeReplaced?bool
          }

          //?KeyListerGetter?任何知道如何列出鍵和按鍵獲取對象的東西
          type?KeyListerGetter?interface?{
          ?KeyLister
          ?KeyGetter
          }

          //?獲取所有的鍵
          type?KeyLister?interface?{
          ?ListKeys()?[]string
          }

          //?根據(jù)鍵獲取對象
          type?KeyGetter?interface?{
          ?GetByKey(key?string)?(interface{},?bool,?error)
          }

          DeltaFIFO 與 FIFO 一樣都是一個 Queue,所以他們都實現(xiàn)了 Queue,所以我們這里來看下 DeltaFIFO 是如何實現(xiàn) Queue 功能的,當(dāng)然和 FIFO 一樣都是實現(xiàn) Queue 接口里面的所有方法。

          雖然實現(xiàn)流程和 FIFO 是一樣的,但是具體的實現(xiàn)是不一樣的,比如 DeltaFIFO 的對象鍵計算函數(shù)就不同:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?DeltaFIFO?的對象鍵計算函數(shù)
          func?(f?*DeltaFIFO)?KeyOf(obj?interface{})?(string,?error)?{
          ?//?用?Deltas?做一次轉(zhuǎn)換,判斷是否是?Delta?切片
          ??if?d,?ok?:=?obj.(Deltas);?ok?{
          ??if?len(d)?==?0?{
          ???return?"",?KeyError{obj,?ErrZeroLengthDeltasObject}
          ??}
          ????//?使用最新版本的對象進(jìn)行計算
          ??obj?=?d.Newest().Object
          ?}
          ?if?d,?ok?:=?obj.(DeletedFinalStateUnknown);?ok?{
          ??return?d.Key,?nil
          ?}
          ??//?具體計算還是要看初始化?DeltaFIFO?傳入的?KeyFunc?函數(shù)
          ?return?f.keyFunc(obj)
          }

          // Newest 返回最新的 Delta,如果沒有則返回 nil。
          func?(d?Deltas)?Newest()?*Delta?{
          ?if?n?:=?len(d);?n?>?0?{
          ??return?&d[n-1]
          ?}
          ?return?nil
          }

          DeltaFIFO 的計算對象鍵的函數(shù)為什么要先做一次 Deltas 的類型轉(zhuǎn)換呢?那是因為 Pop() 出去的對象很可能還要再添加進(jìn)來(比如處理失敗需要再放進(jìn)來),此時添加的對象就是已經(jīng)封裝好的 Deltas 對象了。

          然后同樣按照上面的方式來分析 DeltaFIFO 的實現(xiàn),首先查看 Store 存儲部分的實現(xiàn),也就是增、刪、改、查功能。

          同樣的 Add、Update 和 Delete 的實現(xiàn)方法基本上是一致的:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?Add?插入一個元素放入到隊列中
          func?(f?*DeltaFIFO)?Add(obj?interface{})?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true??//?隊列第一次寫入操作都要設(shè)置標(biāo)記
          ?return?f.queueActionLocked(Added,?obj)
          }

          //?Update?和?Add?一樣,只是是?Updated?一個?Delta
          func?(f?*DeltaFIFO)?Update(obj?interface{})?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true??//?隊列第一次寫入操作都要設(shè)置標(biāo)記
          ?return?f.queueActionLocked(Updated,?obj)
          }

          //?刪除和添加一樣,但會產(chǎn)生一個刪除的 Delta。如果給定的對象還不存在,它將被忽略。
          //?例如,它可能已經(jīng)被替換(重新list)刪除了。
          //?在這個方法中,`f.knownObjects`?如果不為nil,則提供(通過GetByKey)被認(rèn)為已經(jīng)存在的?_additional_?對象。
          func?(f?*DeltaFIFO)?Delete(obj?interface{})?error?{
          ?id,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?隊列第一次寫入操作都要設(shè)置這個標(biāo)記
          ?f.populated?=?true
          ??//?相當(dāng)于沒有?Indexer?的時候,就通過自己的存儲對象檢查下
          ?if?f.knownObjects?==?nil?{
          ??if?_,?exists?:=?f.items[id];?!exists?{
          ???//?自己的存儲里面都沒有,那也就不用處理了
          ???return?nil
          ??}
          ?}?else?
          ??//?相當(dāng)于 Indexer 里面和自己的存儲里面都沒有這個對象,那么也就相當(dāng)于不存在了,就不處理了。
          ????_,?exists,?err?:=?f.knownObjects.GetByKey(id)
          ??_,?itemsExist?:=?f.items[id]
          ??if?err?==?nil?&&?!exists?&&?!itemsExist?{
          ???return?nil
          ??}
          ?}
          ??//?同樣調(diào)用?queueActionLocked?將數(shù)據(jù)放入隊列
          ?return?f.queueActionLocked(Deleted,?obj)
          }

          可以看出 Add 、Update、Delete 方法最終都是調(diào)用的 queueActionLocked 函數(shù)來實現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          // queueActionLocked 追加到對象的 delta 列表中。
          //?調(diào)用者必須先 lock。
          func?(f?*DeltaFIFO)?queueActionLocked(actionType?DeltaType,?obj?interface{})?error?{
          ?id,?err?:=?f.KeyOf(obj)??//?獲取對象鍵
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ??
          ??//?將?actionType?和資源對象?obj?構(gòu)造成?Delta,添加到?items?中
          ?newDeltas?:=?append(f.items[id],?Delta{actionType,?obj})
          ??//?去重
          ?newDeltas?=?dedupDeltas(newDeltas)

          ?if?len(newDeltas)?>?0?{
          ????//?新對象的?key?不在隊列中則插入?queue?隊列
          ??if?_,?exists?:=?f.items[id];?!exists?{
          ???f.queue?=?append(f.queue,?id)
          ??}
          ????//?重新更新?items
          ??f.items[id]?=?newDeltas
          ????//?通知所有的消費(fèi)者解除阻塞
          ??f.cond.Broadcast()
          ?}?else?{
          ????//?這種情況不會發(fā)生,因為給定一個非空列表時,dedupDeltas 永遠(yuǎn)不會返回一個空列表。
          ????//?但如果真的返回了一個空列表,那么我們就需要從 map 中刪除這個元素。
          ??delete(f.items,?id)
          ?}
          ?return?nil
          }

          //?==============排重==============
          //?重新list和watch可以以任何順序多次提供相同的更新。
          //?如果最近的兩個 Delta 相同,則將它們合并。
          func?dedupDeltas(deltas?Deltas)?Deltas?{
          ?n?:=?len(deltas)??
          ?if?n?2?{??//?小于兩個?delta?沒必要合并了
          ??return?deltas
          ?}
          ??//?Deltas是[]Delta,新的對象是追加到Slice后面
          ??//?所以取最后兩個元素來判斷是否相同
          ?a?:=?&deltas[n-1]
          ?b?:=?&deltas[n-2]
          ??//?執(zhí)行去重操作
          ?if?out?:=?isDup(a,?b);?out?!=?nil?{
          ????//?將去重保留下來的delta追加到前面n-2個delta中去
          ??d?:=?append(Deltas{},?deltas[:n-2]...)
          ??return?append(d,?*out)
          ?}
          ?return?deltas
          }

          //?判斷兩個?Delta?是否是重復(fù)的
          func?isDup(a,?b?*Delta)?*Delta?{
          ??//?這個函數(shù)應(yīng)該應(yīng)該可以判斷多種類型的重復(fù),目前只有刪除這一種能夠合并
          ?if?out?:=?isDeletionDup(a,?b);?out?!=?nil?{
          ??return?out
          ?}
          ?return?nil
          }

          //?判斷是否為刪除類型的重復(fù)
          func?isDeletionDup(a,?b?*Delta)?*Delta?{
          ??//?二者類型都是刪除那肯定有一個是重復(fù)的,則返回一個即可
          ?if?b.Type?!=?Deleted?||?a.Type?!=?Deleted?{
          ??return?nil
          ?}
          ?//?更復(fù)雜的檢查還是這樣就夠了?
          ?if?_,?ok?:=?b.Object.(DeletedFinalStateUnknown);?ok?{
          ??return?a
          ?}
          ?return?b
          }

          因為系統(tǒng)對于刪除的對象有 DeletedFinalStateUnknown 這個狀態(tài),所以會存在兩次刪除的情況,但是兩次添加同一個對象由于 APIServer 可以保證對象的唯一性,所以這里沒有考慮合并兩次添加操作的情況。

          然后看看其他幾個主要方法的實現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?列舉接口實現(xiàn)
          func?(f?*DeltaFIFO)?List()?[]interface{}?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?return?f.listLocked()
          }
          //?真正的列舉實現(xiàn)
          func?(f?*DeltaFIFO)?listLocked()?[]interface{}?{
          ?list?:=?make([]interface{},?0,?len(f.items))
          ?for?_,?item?:=?range?f.items?{
          ??list?=?append(list,?item.Newest().Object)
          ?}
          ?return?list
          }

          //?返回現(xiàn)在 FIFO 中所有的對象鍵。
          func?(f?*DeltaFIFO)?ListKeys()?[]string?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]string,?0,?len(f.items))
          ?for?key?:=?range?f.items?{
          ??list?=?append(list,?key)
          ?}
          ?return?list
          }

          //?根據(jù)對象獲取FIFO中對應(yīng)的元素
          func?(f?*DeltaFIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
          ?key,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?nil,?false,?KeyError{obj,?err}
          ?}
          ?return?f.GetByKey(key)
          }

          //?通過對象鍵獲取FIFO中的元素(獲取到的是?Delta?數(shù)組)
          func?(f?*DeltaFIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?d,?exists?:=?f.items[key]
          ?if?exists?{
          ??//?復(fù)制元素的slice,這樣對這個切片的操作就不會影響返回的對象了。
          ??d?=?copyDeltas(d)
          ?}
          ?return?d,?exists,?nil
          }

          // copyDeltas 返回 d 的淺拷貝,也就是說它拷貝的是切片,而不是切片中的對象。
          // Get/List 可以返回一個不會被后續(xù)修改影響的對象。
          func?copyDeltas(d?Deltas)?Deltas?{
          ?d2?:=?make(Deltas,?len(d))
          ?copy(d2,?d)
          ?return?d2
          }

          //?判斷隊列是否關(guān)閉了
          func?(f?*DeltaFIFO)?IsClosed()?bool?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ?return?f.closed
          }

          接下來我們來看看 Replace 函數(shù)的時候,這個也是 Store 里面的定義的接口:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          func?(f?*DeltaFIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?keys?:=?make(sets.String,?len(list))

          ?//?keep?對老客戶端的向后兼容
          ?action?:=?Sync
          ?if?f.emitDeltaTypeReplaced?{
          ??action?=?Replaced
          ?}
          ??//?遍歷?list
          ?for?_,?item?:=?range?list?{
          ????//?計算對象鍵
          ??key,?err?:=?f.KeyOf(item)
          ??if?err?!=?nil?{
          ???return?KeyError{item,?err}
          ??}
          ????//?記錄處理過的對象鍵,使用?set?集合存儲
          ??keys.Insert(key)
          ????//?重新同步一次對象
          ??if?err?:=?f.queueActionLocked(action,?item);?err?!=?nil?{
          ???return?fmt.Errorf("couldn't?enqueue?object:?%v",?err)
          ??}
          ?}
          ??//?如果沒有?Indexer?存儲的話,自己存儲的就是所有的老對象
          ??//?目的要看看那些老對象不在全量集合中,那么就是刪除的對象了
          ?if?f.knownObjects?==?nil?{
          ??//?針對自己的列表進(jìn)行刪除檢測。
          ??queuedDeletions?:=?0
          ????//?遍歷所有元素
          ??for?k,?oldItem?:=?range?f.items?{
          ??????//?如果元素在輸入的對象中存在就忽略了。
          ???if?keys.Has(k)?{
          ????continue
          ???}
          ??????//?到這里證明當(dāng)前的?oldItem?元素不在輸入的列表中,證明對象已經(jīng)被刪除了
          ???var?deletedObj?interface{}
          ???if?n?:=?oldItem.Newest();?n?!=?nil?{
          ????deletedObj?=?n.Object
          ???}
          ???queuedDeletions++
          ??????//?因為可能隊列中已經(jīng)存在?Deleted?類型的元素了,避免重復(fù),所以采用?DeletedFinalStateUnknown?來包裝下對象
          ???if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
          ????return?err
          ???}
          ??}
          ????//?如果?populated?沒有設(shè)置,說明是第一次并且還沒有任何修改操作執(zhí)行過
          ??if?!f.populated?{
          ??????//?這個時候需要標(biāo)記下
          ???f.populated?=?true
          ???//?記錄第一次設(shè)置的對象數(shù)量
          ???f.initialPopulationCount?=?len(list)?+?queuedDeletions
          ??}

          ??return?nil
          ?}
          ?
          ??//?檢測已經(jīng)刪除但是沒有在隊列中的元素。
          ??//?從?Indexer?中獲取所有的對象鍵
          ?knownKeys?:=?f.knownObjects.ListKeys()
          ?queuedDeletions?:=?0
          ?for?_,?k?:=?range?knownKeys?{
          ????//?對象存在就忽略
          ??if?keys.Has(k)?{
          ???continue
          ??}
          ????//?到這里同樣證明當(dāng)前的對象鍵對應(yīng)的對象被刪除了
          ????//?獲取被刪除的對象鍵對應(yīng)的對象
          ??deletedObj,?exists,?err?:=?f.knownObjects.GetByKey(k)
          ??if?err?!=?nil?{
          ???deletedObj?=?nil
          ???klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?placing?DeleteFinalStateUnknown?marker?without?object",?err,?k)
          ??}?else?if?!exists?{
          ???deletedObj?=?nil
          ???klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?placing?DeleteFinalStateUnknown?marker?without?object",?k)
          ??}
          ????//?累加刪除的對象數(shù)量
          ??queuedDeletions++
          ????//?把對象刪除的?Delta?放入隊列,和上面一樣避免重復(fù),使用?DeletedFinalStateUnknown?包裝下對象
          ??if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
          ???return?err
          ??}
          ?}
          ??//?和上面一致
          ?if?!f.populated?{
          ??f.populated?=?true
          ??f.initialPopulationCount?=?len(list)?+?queuedDeletions
          ?}

          ?return?nil
          }

          Replace() 主要用于實現(xiàn)對象的全量更新,由于 DeltaFIFO 對外輸出的就是所有目標(biāo)的增量變化,所以每次全量更新都要判斷對象是否已經(jīng)刪除,因為在全量更新前可能沒有收到目標(biāo)刪除的請求。這一點(diǎn)與 cache 不同,cache 的Replace() 相當(dāng)于重建,因為 cache 就是對象全量的一種內(nèi)存映射,所以Replace() 就等于重建。

          接下來就是實現(xiàn) DeltaFIFO ?特性的 Pop 函數(shù)的實現(xiàn)了:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          func?(f?*DeltaFIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?for?{
          ????//?隊列中是否有數(shù)據(jù)
          ??for?len(f.queue)?==?0?{
          ???//?如果隊列關(guān)閉了這直接返回錯誤
          ???if?f.IsClosed()?{
          ????return?nil,?ErrFIFOClosed
          ???}
          ??????//?沒有數(shù)據(jù)就一直等待
          ???f.cond.Wait()
          ??}
          ????//?取出第一個對象鍵
          ??id?:=?f.queue[0]
          ????//?更新下queue,相當(dāng)于把第一個元素彈出去了
          ??f.queue?=?f.queue[1:]
          ????//?對象計數(shù)減一,當(dāng)減到0就說明外部已經(jīng)全部同步完畢了
          ??if?f.initialPopulationCount?>?0?{
          ???f.initialPopulationCount--
          ??}
          ????//?取出真正的對象,queue里面是對象鍵
          ??item,?ok?:=?f.items[id]
          ??if?!ok?{
          ???// Item 可能后來被刪除了。
          ???continue
          ??}
          ????//?刪除對象
          ??delete(f.items,?id)
          ????//?調(diào)用處理對象的函數(shù)
          ??err?:=?process(item)
          ????//?如果處理出錯,那就重新入隊列
          ??if?e,?ok?:=?err.(ErrRequeue);?ok?{
          ???f.addIfNotPresent(id,?item)
          ???err?=?e.Err
          ??}
          ??//?這里不需要 copyDeltas,因為我們要把所有權(quán)轉(zhuǎn)移給調(diào)用者。
          ??return?item,?err
          ?}
          }

          然后再簡單看下其他幾個函數(shù)的實現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?AddIfNotPresent?插入不存在的對象到隊列中
          func?(f?*DeltaFIFO)?AddIfNotPresent(obj?interface{})?error?{
          ??//?放入的必須是?Delta?數(shù)組,就是通過?Pop?彈出的對象
          ?deltas,?ok?:=?obj.(Deltas)
          ?if?!ok?{
          ??return?fmt.Errorf("object?must?be?of?type?deltas,?but?got:?%#v",?obj)
          ?}
          ??//?多個?Delta?都是同一個對象,所以用最新的來獲取對象鍵即可
          ?id,?err?:=?f.KeyOf(deltas.Newest().Object)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?調(diào)用真正的插入實現(xiàn)
          ?f.addIfNotPresent(id,?deltas)
          ?return?nil
          }

          //?插入對象的真正實現(xiàn)
          func?(f?*DeltaFIFO)?addIfNotPresent(id?string,?deltas?Deltas)?{
          ?f.populated?=?true
          ??//?如果對象已經(jīng)存在了,則忽略
          ?if?_,?exists?:=?f.items[id];?exists?{
          ??return
          ?}
          ??//?不在隊列中,則插入隊列
          ?f.queue?=?append(f.queue,?id)
          ?f.items[id]?=?deltas
          ??//?通知消費(fèi)者解除阻塞
          ?f.cond.Broadcast()
          }

          // Resync 重新同步,帶有 Sync 類型的 Delta 對象。
          func?(f?*DeltaFIFO)?Resync()?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?Indexer?為空,重新同步無意義
          ?if?f.knownObjects?==?nil?{
          ??return?nil
          ?}
          ??//?獲取?Indexer?中所有的對象鍵
          ?keys?:=?f.knownObjects.ListKeys()
          ??//?循環(huán)對象鍵,為每個對象產(chǎn)生一個同步的?Delta
          ?for?_,?k?:=?range?keys?{
          ??if?err?:=?f.syncKeyLocked(k);?err?!=?nil?{
          ???return?err
          ??}
          ?}
          ?return?nil
          }
          //?對象同步接口的真正實現(xiàn)
          func?(f?*DeltaFIFO)?syncKeyLocked(key?string)?error?{
          ??//?獲取?Indexer?中的對象
          ?obj,?exists,?err?:=?f.knownObjects.GetByKey(key)
          ?if?err?!=?nil?{
          ??klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?unable?to?queue?object?for?sync",?err,?key)
          ??return?nil
          ?}?else?if?!exists?{
          ??klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?unable?to?queue?object?for?sync",?key)
          ??return?nil
          ?}

          ?//?計算對象的鍵值,對象鍵不是已經(jīng)傳入了么?
          ??//?其實傳入的是存在?Indexer?里面的對象鍵,可能與這里的計算方式不同
          ??id,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ??//?對象已經(jīng)在存在,說明后續(xù)會通知對象的新變化,所以再加更新也沒意義
          ?if?len(f.items[id])?>?0?{
          ??return?nil
          ?}
          ??//?添加對象同步的這個?Delta
          ?if?err?:=?f.queueActionLocked(Sync,?obj);?err?!=?nil?{
          ??return?fmt.Errorf("couldn't?queue?object:?%v",?err)
          ?}
          ?return?nil
          }

          // HasSynced 如果 Add/Update/Delete/AddIfNotPresent 第一次被調(diào)用則會返回 true。
          //?或者通過 Replace 插入的元素都已經(jīng) Pop 完成了,則也會返回 true。
          func?(f?*DeltaFIFO)?HasSynced()?bool?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?//?同步就是全量內(nèi)容已經(jīng)進(jìn)入?Indexer,Indexer?已經(jīng)是系統(tǒng)中對象的全量快照了
          ??//?相當(dāng)于就是全量對象從隊列中全部彈出進(jìn)入?Indexer,證明已經(jīng)同步完成了
          ?return?f.populated?&&?f.initialPopulationCount?==?0
          }

          //?關(guān)閉隊列
          func?(f?*DeltaFIFO)?Close()?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ?f.closed?=?true
          ?f.cond.Broadcast()
          }

          這里是否已同步是根據(jù) populatedinitialPopulationCount 這兩個變量來判斷的,是否同步指的是第一次從 APIServer 中獲取全量的對象是否全部 Pop 完成,全局同步到了緩存中,也就是 Indexer 中去了,因為 Pop 一次 initialPopulationCount 就會減1,當(dāng)為0的時候就表示 Pop 完成了。

          總結(jié)

          到這里我們就將完整實現(xiàn)了 DeltaFIFO,然后加上前面的 Reflector 反射器,就可以結(jié)合起來了。

          Reflector 通過 ListAndWatch 首先獲取全量的資源對象數(shù)據(jù),然后調(diào)用 DeltaFIFO 的 Replace() 方法全量插入隊列,然后后續(xù)通過 Watch 操作根據(jù)資源對象的操作類型調(diào)用 DeltaFIFO 的 Add、Update、Delete 方法,將數(shù)據(jù)更新到隊列中。我們可以用下圖來總結(jié)這兩個組件之間的關(guān)系:

          至于 Pop 出來的元素如何處理,就要看 Pop 的回調(diào)函數(shù) PopProcessFunc 了。我們可以回到最初的 SharedInformer 中,在 sharedIndexInformer 的 Run 函數(shù)中就初始化了 DeltaFIFO,也配置了用于 Pop 回調(diào)處理的函數(shù):

          //?k8s.io/client-go/tools/cache/shared_informer.go

          func?(s?*sharedIndexInformer)?Run(stopCh?<-chan?struct{})?{
          ?defer?utilruntime.HandleCrash()
          ??//?初始化?DeltaFIFO,這里就可以看出來?KnownObjects?就是一個?Indexer
          ?fifo?:=?NewDeltaFIFOWithOptions(DeltaFIFOOptions{
          ??KnownObjects:??????????s.indexer,
          ??EmitDeltaTypeReplaced:?true,
          ?})

          ?cfg?:=?&Config{
          ??Queue:????????????fifo,
          ??ListerWatcher:????s.listerWatcher,
          ??ObjectType:???????s.objectType,
          ??FullResyncPeriod:?s.resyncCheckPeriod,
          ??RetryOnError:?????false,
          ??ShouldResync:?????s.processor.shouldResync,

          ??Process:?s.HandleDeltas,??//?指定?Pop?函數(shù)的回調(diào)處理函數(shù)
          ?}
          ?......
          }

          //?真正的?Pop?回調(diào)處理函數(shù)
          func?(s?*sharedIndexInformer)?HandleDeltas(obj?interface{})?error?{
          ?s.blockDeltas.Lock()
          ?defer?s.blockDeltas.Unlock()

          ?//?from?oldest?to?newest
          ?for?_,?d?:=?range?obj.(Deltas)?{
          ??switch?d.Type?{
          ??case?Sync,?Replaced,?Added,?Updated:
          ???s.cacheMutationDetector.AddObject(d.Object)
          ???if?old,?exists,?err?:=?s.indexer.Get(d.Object);?err?==?nil?&&?exists?{
          ????......
          ???}?else?{
          ????????//?將對象添加到?Indexer?中
          ????if?err?:=?s.indexer.Add(d.Object);?err?!=?nil?{
          ?????return?err
          ????}
          ????......
          ???}
          ??case?Deleted:
          ??????//?刪除?Indexer?中的對象
          ???if?err?:=?s.indexer.Delete(d.Object);?err?!=?nil?{
          ????return?err
          ???}
          ???......
          ??}
          ?}
          ?return?nil
          }

          從上面可以看出 DeltaFIFO 中的元素被彈出來后被同步到了 Indexer 存儲中,而在 DeltaFIFO 中的 KnownObjects 也就是這個指定的 Indexer,所以接下來我們就需要重點(diǎn)分析 Indexer 組件的實現(xiàn)了。

          瀏覽 46
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  波多野成人无码精品69 | 色黄视频免费观看 | 三级精品 | 图片区偷拍区小说区 | 在线国产中文字幕 |