<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          client-go 之 DeltaFIFO 實(shí)現(xiàn)原理

          共 18650字,需瀏覽 38分鐘

           ·

          2020-09-01 11:28

          前文我們講到 Reflector 中通過(guò) ListAndWatch 獲取到數(shù)據(jù)后傳入到了本地的存儲(chǔ)中,也就是 DeltaFIFO 中。從 DeltaFIFO 的名字可以看出它是一個(gè) FIFO,也就是一個(gè)先進(jìn)先出的隊(duì)列,而 Delta 表示的是變化的資源對(duì)象存儲(chǔ),包含操作資源對(duì)象的類型和數(shù)據(jù),Reflector 就是這個(gè)隊(duì)列的生產(chǎn)者。

          Delta

          在了解 DeltaFIFO 之前我們需要先具體了解下什么是 Delta,我們先來(lái)看看 client-go 中是如何定義的,Delta 的數(shù)據(jù)結(jié)構(gòu)定義位于staging/src/k8s.io/client-go/tools/cache/delta_fifo.go文件中。

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?DeltaType?是變化的類型(添加、刪除等)
          type?DeltaType?string

          //?變化的類型定義
          const?(
          ?Added???DeltaType?=?"Added"?????//?增加
          ?Updated?DeltaType?=?"Updated"???//?更新
          ?Deleted?DeltaType?=?"Deleted"???//?刪除
          ?//?當(dāng)遇到 watch 錯(cuò)誤,不得不進(jìn)行重新list時(shí),就會(huì)觸發(fā) Replaced。
          ??//?我們不知道被替換的對(duì)象是否發(fā)生了變化。
          ?//
          ??//?注意:以前版本的 DeltaFIFO 也會(huì)對(duì) Replace 事件使用 Sync。
          ??//?所以只有當(dāng)選項(xiàng) EmitDeltaTypeReplaced 為真時(shí)才會(huì)觸發(fā) Replaced。
          ?Replaced?DeltaType?=?"Replaced"
          ?//?Sync?是針對(duì)周期性重新同步期間的合成事件
          ?Sync?DeltaType?=?"Sync"??????????//?同步
          )

          // Delta 是 DeltaFIFO 存儲(chǔ)的類型。
          //?它告訴你發(fā)生了什么變化,以及變化后對(duì)象的狀態(tài)。
          //
          //?[*]?除非變化是刪除操作,否則你將得到對(duì)象被刪除前的最終狀態(tài)。
          type?Delta?struct?{
          ?Type???DeltaType
          ?Object?interface{}
          }

          Delta 其實(shí)就是 Kubernetes 系統(tǒng)中帶有變化類型的資源對(duì)象,如下圖所示:

          其實(shí)也非常好理解,比如我們現(xiàn)在添加了一個(gè) Pod,那么這個(gè) Delta 就是帶有 Added 這個(gè)類型的 Pod,如果是刪除了一個(gè) Deployment,那么這個(gè) Delta 就是帶有 Deleted 類型的 Deployment,為什么要帶上類型呢?因?yàn)槲覀冃枰鶕?jù)不同的類型去執(zhí)行不同的操作,增加、更新、刪除的動(dòng)作顯然是不一樣的。

          FIFO

          上面我們解釋了什么是 Delta,接下來(lái)需要說(shuō)下 FIFO,我們說(shuō) FIFO 很好理解,就是一個(gè)先進(jìn)先出的隊(duì)列,Reflector 是其生產(chǎn)者,其數(shù)據(jù)結(jié)構(gòu)定義位于 ?staging/src/k8s.io/client-go/tools/cache/fifo.go 文件中:

          //?k8s.io/client-go/tools/cache/fifo.go

          type?FIFO?struct?{
          ?lock?sync.RWMutex
          ?cond?sync.Cond

          ??//?items?中的每一個(gè)?key?也在?queue?中
          ?items?map[string]interface{}
          ?queue?[]string

          ?//?如果第一批?items?被?Replace()?插入或者先調(diào)用了?Deleta/Add/Update
          ??//?則 populated 為 true。
          ?populated?bool
          ?//?第一次調(diào)用?Replace()?時(shí)插入的?items?數(shù)
          ?initialPopulationCount?int

          ??// keyFunc 用于生成排隊(duì)的 item 插入和檢索的 key。
          ?keyFunc?KeyFunc

          ??//?標(biāo)識(shí)隊(duì)列已關(guān)閉,以便在隊(duì)列清空時(shí)控制循環(huán)可以退出。
          ?closed?????bool
          ?closedLock?sync.Mutex
          }

          var?(
          ?_?=?Queue(&FIFO{})?//?FIFO?是一個(gè)?Queue
          )

          上面的 FIFO 數(shù)據(jù)結(jié)構(gòu)中定義了 items 和 queue 兩個(gè)屬性來(lái)保存隊(duì)列中的數(shù)據(jù),其中 queue 中存的是資源對(duì)象的 key 列表,而 items 是一個(gè) map 類型,其 key 就是 queue 中保存的 key,value 值是真正的資源對(duì)象數(shù)據(jù)。既然是先進(jìn)進(jìn)去的隊(duì)列,那么就要具有隊(duì)列的基本功能,結(jié)構(gòu)體下面其實(shí)就有一個(gè)類型斷言,表示當(dāng)前的 FIFO 實(shí)現(xiàn)了 Queue 這個(gè)接口,所以 FIFO 要實(shí)現(xiàn)的功能都是在 Queue 中定義的,Queue 接口和 FIFO 位于同一文件中:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Queue?擴(kuò)展了?Store??//?with?a?collection?of?Store?keys?to?"process".
          //?每一次添加、更新或刪除都可以將對(duì)象的key放入到該集合中。
          //?Queue?具有使用給定的?accumulator?來(lái)推導(dǎo)出相應(yīng)的?key?的方法
          //?Queue?可以從多個(gè)?goroutine?中并發(fā)訪問(wèn)
          //?Queue?可以被關(guān)閉,之后?Pop?操作會(huì)返回一個(gè)錯(cuò)誤
          type?Queue?interface?{
          ?Store

          ?// Pop 一直阻塞,直到至少有一個(gè)key要處理或隊(duì)列被關(guān)閉,隊(duì)列被關(guān)閉會(huì)返回一個(gè)錯(cuò)誤。
          ??//?在前面的情況下?Pop?原子性地選擇一個(gè)?key?進(jìn)行處理,從?Store?中刪除關(guān)聯(lián)(key、accumulator)的數(shù)據(jù),
          ??//?并處理 accumulator。Pop 會(huì)返回被處理的 accumulator 和處理的結(jié)果。
          ?
          ?//?PopProcessFunc?函數(shù)可以返回一個(gè)?ErrRequeue{inner},在這種情況下,Pop?將
          ??//(a)把那個(gè)(key,accumulator)關(guān)聯(lián)作為原子處理的一部分返回到?Queue?中
          ??//?(b)?從 Pop 返回內(nèi)部錯(cuò)誤。
          ?Pop(PopProcessFunc)?(interface{},?error)

          ?//?僅當(dāng)該?key?尚未與一個(gè)非空的?accumulator?相關(guān)聯(lián)的時(shí)候,AddIfNotPresent?將給定的?accumulator?放入?Queue(與?accumulator?的?key?相關(guān)聯(lián)的)
          ?AddIfNotPresent(interface{})?error
          ?
          ??//?如果第一批 keys 都已經(jīng) Popped,則 HasSynced 返回 true。
          ??//?如果在添加、更新、刪除之前發(fā)生了第一次?Replace?操作,則第一批?keys?為?true
          ??//?否則為空。
          ?HasSynced()?bool

          ?//?關(guān)閉該隊(duì)列
          ?Close()
          }

          從上面的定義中可以看出 Queue 這個(gè)接口擴(kuò)展了 Store 這個(gè)接口,這個(gè)就是前面我們說(shuō)的本地存儲(chǔ),隊(duì)列實(shí)際上也是一種存儲(chǔ),然后在 Store 的基礎(chǔ)上增加 Pop、AddIfNotPresent、HasSynced、Close 4個(gè)函數(shù)就變成了 Queue 隊(duì)列了,所以我們優(yōu)先來(lái)看下 Store 這個(gè)接口的定義,該數(shù)據(jù)結(jié)構(gòu)定義位于文件 k8s.io/client-go/tools/cache/store.go 中:

          //?k8s.io/client-go/tools/cache/store.go

          // Store 是一個(gè)通用的對(duì)象存儲(chǔ)和處理的接口。
          //?Store?包含一個(gè)從字符串?keys?到?accumulators?的映射,并具有?to/from?當(dāng)前
          //?給定 key 關(guān)聯(lián)的 accumulators 添加、更新和刪除給定對(duì)象的操作。
          //?一個(gè) Store 還知道如何從給定的對(duì)象中獲取 key,所以很多操作只提供對(duì)象。
          //
          //?在最簡(jiǎn)單的?Store?實(shí)現(xiàn)中,每個(gè)?accumulator?只是最后指定的對(duì)象,或者刪除后為空,
          //?所以 Store 只是簡(jiǎn)單的存儲(chǔ)。
          //
          // Reflector 反射器知道如何 watch 一個(gè)服務(wù)并更新一個(gè) Store 存儲(chǔ),這個(gè)包提供了 Store 的各種實(shí)現(xiàn)。
          type?Store?interface?{

          ?// Add 將指定對(duì)象添加到與指定對(duì)象的 key 相關(guān)的 accumulator(累加器)中。
          ?Add(obj?interface{})?error

          ?//?Update?與指定對(duì)象的?key?相關(guān)的?accumulator?中更新指定的對(duì)象
          ?Update(obj?interface{})?error

          ?//?Delete?根據(jù)指定的對(duì)象?key?刪除指定的對(duì)象
          ?Delete(obj?interface{})?error

          ?//?List?返回當(dāng)前所有非空的?accumulators?的列表
          ?List()?[]interface{}

          ?//?ListKeys?返回當(dāng)前與非空?accumulators?關(guān)聯(lián)的所有?key?的列表
          ?ListKeys()?[]string

          ?//?Get?根據(jù)指定的對(duì)象獲取關(guān)聯(lián)的?accumulator
          ?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)

          ?//?GetByKey?根據(jù)指定的對(duì)象?key?獲取關(guān)聯(lián)的?accumulator
          ?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)

          ?//?Replace?會(huì)刪除原來(lái)Store中的內(nèi)容,并將新增的list的內(nèi)容存入Store中,即完全替換數(shù)據(jù)
          ??// Store 擁有 list 列表的所有權(quán),在調(diào)用此函數(shù)后,不應(yīng)該引用它了。
          ?Replace([]interface{},?string)?error

          ?// Resync 在 Store 中沒(méi)有意義,但是在 DeltaFIFO 中有意義。
          ?Resync()?error
          }

          //?KeyFunc?就是從一個(gè)對(duì)象中生成一個(gè)唯一的?Key?的函數(shù),上面的?FIFO?中就有用到
          type?KeyFunc?func(obj?interface{})?(string,?error)

          //?MetaNamespaceKeyFunc?是默認(rèn)的?KeyFunc,生成的?key?格式為:
          //?<namespace>/<name>
          //?如果是全局的,則namespace為空,那么生成的?key?就是?<name>
          //?當(dāng)然要從?key?拆分出?namespace?和?name?也非常簡(jiǎn)單
          func?MetaNamespaceKeyFunc(obj?interface{})?(string,?error)
          ?{
          ?if?key,?ok?:=?obj.(ExplicitKey);?ok?{
          ??return?string(key),?nil
          ?}
          ?meta,?err?:=?meta.Accessor(obj)
          ?if?err?!=?nil?{
          ??return?"",?fmt.Errorf("object?has?no?meta:?%v",?err)
          ?}
          ?if?len(meta.GetNamespace())?>?0?{
          ??return?meta.GetNamespace()?+?"/"?+?meta.GetName(),?nil
          ?}
          ?return?meta.GetName(),?nil
          }

          Store 就是一個(gè)通用的對(duì)象存儲(chǔ)和處理的接口,可以用來(lái)寫(xiě)入對(duì)象和獲取對(duì)象。其中 cache 數(shù)據(jù)結(jié)構(gòu)就實(shí)現(xiàn)了上面的 Store 接口,但是這個(gè)屬于后面的 Indexer 部分的知識(shí)點(diǎn),這里我們就不展開(kāi)說(shuō)明了。

          我們說(shuō) Queue 擴(kuò)展了 Store 接口,所以 Queue 本身也是一個(gè)存儲(chǔ),只是在存儲(chǔ)的基礎(chǔ)上增加了 Pop 這樣的函數(shù)來(lái)實(shí)現(xiàn)彈出對(duì)象,是不是就變成了一個(gè)隊(duì)列了。

          FIFO 就是一個(gè)具體的 Queue 實(shí)現(xiàn),按照順序彈出對(duì)象是不是就是一個(gè)先進(jìn)先出的隊(duì)列了?如下圖所示:

          所以我們接下來(lái)看看 FIFO 是如何實(shí)現(xiàn)存儲(chǔ)和 Pop 的功能的。首先是實(shí)現(xiàn) Store 存儲(chǔ)中最基本的方法,第一個(gè)就是添加對(duì)象:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Add 插入一個(gè)對(duì)象,將其放入隊(duì)列中,只有當(dāng)元素不在集合中時(shí)才會(huì)插入隊(duì)列。
          func?(f?*FIFO)?Add(obj?interface{})?error?{
          ??//?獲取對(duì)象的?key
          ?id,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true
          ??//?元素不在隊(duì)列中的時(shí)候才插入隊(duì)列
          ?if?_,?exists?:=?f.items[id];?!exists?{
          ??f.queue?=?append(f.queue,?id)
          ?}
          ??//?items?是一個(gè)?map,所以直接賦值給這個(gè)?key,這樣對(duì)更新元素也同樣適用
          ?f.items[id]?=?obj
          ?f.cond.Broadcast()
          ?return?nil
          }

          更新對(duì)象,實(shí)現(xiàn)非常簡(jiǎn)單,因?yàn)樯厦娴?Add 方法就包含了 Update 的實(shí)現(xiàn),因?yàn)?items 屬性是一個(gè) Map,對(duì)象有更新直接將對(duì)應(yīng) key 的 value 值替換成新的對(duì)象即可:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Update?和?Add?相同的實(shí)現(xiàn)
          func?(f?*FIFO)?Update(obj?interface{})?error?{
          ?return?f.Add(obj)
          }

          接著就是刪除 Delete 方法的實(shí)現(xiàn),這里可能大家會(huì)有一個(gè)疑問(wèn),下面的刪除實(shí)現(xiàn)只刪除了 items 中的元素,那這樣豈不是 queue 和 items 中的 key 會(huì)不一致嗎?的確會(huì)這樣,但是這是一個(gè)隊(duì)列,下面的 Pop() 函數(shù)會(huì)根據(jù) queue 里面的元素一個(gè)一個(gè)的彈出 key,沒(méi)有對(duì)象就不處理了,相當(dāng)于下面的 Pop() 函數(shù)中實(shí)現(xiàn)了 queue 的 key 的刪除:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Delete 從隊(duì)列中移除一個(gè)對(duì)象。
          //?不會(huì)添加到?queue?中去,這個(gè)實(shí)現(xiàn)是假設(shè)消費(fèi)者只關(guān)心對(duì)象
          //?不關(guān)心它們被創(chuàng)建或添加的順序。
          func?(f?*FIFO)?Delete(obj?interface{})?error?{
          ??//?獲取對(duì)象的?key
          ?id,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true
          ??//?刪除?items?中?key?為?id?的元素,就是刪除隊(duì)列中的對(duì)象
          ?delete(f.items,?id)
          ??//?為什么不直接處理 queue 這個(gè) slice 呢?
          ?return?err
          }

          然后是獲取隊(duì)列中所有對(duì)象的 List 方法的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          //?List?獲取隊(duì)列中的所有對(duì)象
          func?(f?*FIFO)?List()?[]interface{}?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]interface{},?0,?len(f.items))
          ??//?獲取所有的items的values值(items是一個(gè)Map)
          ?for?_,?item?:=?range?f.items?{
          ??list?=?append(list,?item)
          ?}
          ?return?list
          }

          接著是獲取隊(duì)列中所有元素的 key 的 ListKeys 方法實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // ListKeys 返回現(xiàn)在 FIFO 隊(duì)列中所有對(duì)象的 keys 列表。
          func?(f?*FIFO)?ListKeys()?[]string?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]string,?0,?len(f.items))
          ??//?獲取所有items的key值(items是一個(gè)Map)
          ?for?key?:=?range?f.items?{
          ??list?=?append(list,?key)
          ?}
          ?return?list
          }

          至于根據(jù)對(duì)象或者對(duì)象的 key 獲取隊(duì)列中的元素,就更簡(jiǎn)單了:

          //?k8s.io/client-go/tools/cache/fifo.go

          //?Get?獲取指定對(duì)象在隊(duì)列中的元素
          func?(f?*FIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
          ?key,?err?:=?f.keyFunc(obj)
          ?if?err?!=?nil?{
          ??return?nil,?false,?KeyError{obj,?err}
          ?}
          ??//?調(diào)用?GetByKey?實(shí)現(xiàn)
          ?return?f.GetByKey(key)
          }

          //?GetByKey?根據(jù)?key?獲取隊(duì)列中的元素
          func?(f?*FIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ??//?因?yàn)?items?是一個(gè)?Map,所以直接根據(jù)?key?獲取即可
          ?item,?exists?=?f.items[key]
          ?return?item,?exists,?nil
          }

          然后是一個(gè) Replace 替換函數(shù)的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // Replace 將刪除隊(duì)列中的內(nèi)容,'f'?擁有 map 的所有權(quán),調(diào)用該函數(shù)過(guò)后,不應(yīng)該再引用 map。
          //?'f'?的隊(duì)列也會(huì)被重置,返回時(shí),隊(duì)列將包含 map 中的元素,沒(méi)有特定的順序。
          func?(f?*FIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
          ?//?從?list?中提取出?key?然后和里面的元素重新進(jìn)行映射
          ??items?:=?make(map[string]interface{},?len(list))
          ?for?_,?item?:=?range?list?{
          ??key,?err?:=?f.keyFunc(item)
          ??if?err?!=?nil?{
          ???return?KeyError{item,?err}
          ??}
          ??items[key]?=?item
          ?}

          ?f.lock.Lock()
          ?defer?f.lock.Unlock()

          ?if?!f.populated?{
          ??f.populated?=?true
          ??f.initialPopulationCount?=?len(items)
          ?}
          ?//?重新設(shè)置?items?和?queue?的值
          ?f.items?=?items
          ?f.queue?=?f.queue[:0]
          ?for?id?:=?range?items?{
          ??f.queue?=?append(f.queue,?id)
          ?}
          ?if?len(f.queue)?>?0?{
          ??f.cond.Broadcast()
          ?}
          ?return?nil
          }

          還有 Store 存儲(chǔ)中的最后一個(gè)方法 Resync 的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // Resync 會(huì)保證 Store 中的每個(gè)對(duì)象在 queue 中都有它的 key。
          //?
          //?這應(yīng)該是禁止操作的,因?yàn)樵搶傩杂伤胁僮骶S護(hù)
          func?(f?*FIFO)?Resync()?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?將所有?queue?中的元素放到一個(gè)?set?中
          ?inQueue?:=?sets.NewString()
          ?for?_,?id?:=?range?f.queue?{
          ??inQueue.Insert(id)
          ?}
          ??//?然后將所有?items?中的?key?加回到?queue?中去
          ?for?id?:=?range?f.items?{
          ??if?!inQueue.Has(id)?{
          ???f.queue?=?append(f.queue,?id)
          ??}
          ?}
          ?if?len(f.queue)?>?0?{
          ??f.cond.Broadcast()
          ?}
          ?return?nil
          }

          上面的所有方法就實(shí)現(xiàn)了一個(gè)普通的 Store,然后要想變成一個(gè) Queue,還需要實(shí)現(xiàn)額外的 Pop 方法:

          //?k8s.io/client-go/tools/cache/fifo.go

          // Pop 會(huì)等到一個(gè)元素準(zhǔn)備好后再進(jìn)行處理,如果有多個(gè)元素準(zhǔn)備好了,則按照它們被添加或更新的順序返回。
          //
          //?在處理之前,元素會(huì)從隊(duì)列(和存儲(chǔ))中移除,所以如果沒(méi)有成功處理,應(yīng)該用 AddIfNotPresent()?函數(shù)把它添加回來(lái)。
          //?處理函數(shù)是在有鎖的情況下調(diào)用的,所以更新其中需要和隊(duì)列同步的數(shù)據(jù)結(jié)構(gòu)是安全的。
          func?(f?*FIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?for?{
          ??//?當(dāng)隊(duì)列為空時(shí),Pop()?的調(diào)用會(huì)被阻塞住,直到新的元素插入隊(duì)列后
          ??for?len(f.queue)?==?0?{
          ???if?f.IsClosed()?{
          ????return?nil,?ErrFIFOClosed
          ???}
          ???//?等待?condition?被廣播
          ???f.cond.Wait()
          ??}
          ????//?取出?queue?隊(duì)列中的第一個(gè)元素(key)
          ??id?:=?f.queue[0]
          ????//?刪除第一個(gè)元素
          ??f.queue?=?f.queue[1:]
          ??if?f.initialPopulationCount?>?0?{
          ???f.initialPopulationCount--
          ??}
          ????//?獲取被彈出的元素
          ??item,?ok?:=?f.items[id]
          ??if?!ok?{
          ???//?因?yàn)?item 可能已經(jīng)被刪除了。
          ???continue
          ??}
          ????//?刪除彈出的元素
          ??delete(f.items,?id)
          ????//?處理彈出的元素
          ??err?:=?process(item)
          ??if?e,?ok?:=?err.(ErrRequeue);?ok?{
          ??????//?如果處理沒(méi)成功,需要調(diào)用?addIfNotPresent?加回隊(duì)列
          ???f.addIfNotPresent(id,?item)
          ???err?=?e.Err
          ??}
          ??return?item,?err
          ?}
          }

          另外的一個(gè)就是上面提到的 AddIfNotPresent、HasSynced、Close 幾個(gè)函數(shù)的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/fifo.go

          // AddIfNotPresent 插入一個(gè)元素,將其放入隊(duì)列中。
          //?如果元素已經(jīng)在集合中了,則會(huì)被忽略。
          //
          //?這在單個(gè)的?生產(chǎn)者/消費(fèi)者?的場(chǎng)景下非常有用,這樣消費(fèi)者可以安全地重試
          //?而不需要與生產(chǎn)者爭(zhēng)奪,也不需要排隊(duì)等待過(guò)時(shí)的元素。
          func?(f?*FIFO)?AddIfNotPresent(obj?interface{})?error?{
          ?id,?err?:=?f.keyFunc(obj)??//?獲取對(duì)象的?key
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.addIfNotPresent(id,?obj)??//?調(diào)用?addIfNotPresent?真正的實(shí)現(xiàn)
          ?return?nil
          }

          // addIfNotPresent 會(huì)假設(shè)已經(jīng)持有 fifo 鎖了,如果不存在,則將其添加到隊(duì)列中去。
          func?(f?*FIFO)?addIfNotPresent(id?string,?obj?interface{})?{
          ?f.populated?=?true
          ??//?存在則忽略
          ?if?_,?exists?:=?f.items[id];?exists?{
          ??return
          ?}
          ??//?添加到?queue?和?items?中去
          ?f.queue?=?append(f.queue,?id)
          ?f.items[id]?=?obj
          ??//?廣播?condition
          ?f.cond.Broadcast()
          }

          //?關(guān)閉隊(duì)列
          func?(f?*FIFO)?Close()?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ??//?標(biāo)記為關(guān)閉
          ?f.closed?=?true
          ?f.cond.Broadcast()
          }

          //?如果先調(diào)用了 Add/Update/Delete/AddIfNotPresent,或者先調(diào)用了Update,但被 Replace()?插入的第一批元素已經(jīng)被彈出,則 HasSynced 返回true。
          func?(f?*FIFO)?HasSynced()?bool?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?return?f.populated?&&?f.initialPopulationCount?==?0
          }

          到這里我們就實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 FIFO 隊(duì)列,其實(shí)這里就是對(duì) FIFO 這個(gè)數(shù)據(jù)結(jié)構(gòu)的理解,沒(méi)有特別的地方,只是在隊(duì)列里面有 items 和 queue 兩個(gè)屬性來(lái)維護(hù)隊(duì)列而已。

          DeltaFIFO

          上面我們已經(jīng)實(shí)現(xiàn)了 FIFO,接下來(lái)就看下一個(gè) DeltaFIFO 是如何實(shí)現(xiàn)的,DeltaFIFO 和 FIFO 一樣也是一個(gè)隊(duì)列,但是也有不同的地方,里面的元素是一個(gè) Delta,Delta 上面我們已經(jīng)提到表示的是帶有變化類型的資源對(duì)象。

          DeltaFIFO 的數(shù)據(jù)結(jié)構(gòu)定義位于 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go 文件中:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          type?DeltaFIFO?struct?{
          ?//?lock/cond?保護(hù)訪問(wèn)的?items?和?queue
          ?lock?sync.RWMutex
          ?cond?sync.Cond

          ??//?用來(lái)存儲(chǔ)?Delta?數(shù)據(jù)?->?對(duì)象key:?Delta數(shù)組

          ?items?map[string]Deltas
          ??//?用來(lái)存儲(chǔ)資源對(duì)象的key
          ?queue?[]string

          ?//?通過(guò)?Replace()?接口將第一批對(duì)象放入隊(duì)列,或者第一次調(diào)用增、刪、改接口時(shí)標(biāo)記為true
          ?populated?bool
          ?//?通過(guò)?Replace()?接口(全量)將第一批對(duì)象放入隊(duì)列的對(duì)象數(shù)量
          ?initialPopulationCount?int

          ?//?對(duì)象鍵的計(jì)算函數(shù)
          ?keyFunc?KeyFunc

          ?//?knownObjects?列出?"known"?的鍵?--?影響到?Delete(),Replace()?和?Resync()
          ??//?knownObjects?其實(shí)就是?Indexer,里面存有已知全部的對(duì)象
          ?knownObjects?KeyListerGetter

          ?//?標(biāo)記?queue?被關(guān)閉了
          ??closed?????bool
          ?closedLock?sync.Mutex

          ?//?emitDeltaTypeReplaced?當(dāng)?Replace()?被調(diào)用的時(shí)候,是否要?emit?Replaced?或者?Sync
          ?// DeltaType(保留向后兼容)。
          ?emitDeltaTypeReplaced?bool
          }

          //?KeyListerGetter?任何知道如何列出鍵和按鍵獲取對(duì)象的東西
          type?KeyListerGetter?interface?{
          ?KeyLister
          ?KeyGetter
          }

          //?獲取所有的鍵
          type?KeyLister?interface?{
          ?ListKeys()?[]string
          }

          //?根據(jù)鍵獲取對(duì)象
          type?KeyGetter?interface?{
          ?GetByKey(key?string)?(interface{},?bool,?error)
          }

          DeltaFIFO 與 FIFO 一樣都是一個(gè) Queue,所以他們都實(shí)現(xiàn)了 Queue,所以我們這里來(lái)看下 DeltaFIFO 是如何實(shí)現(xiàn) Queue 功能的,當(dāng)然和 FIFO 一樣都是實(shí)現(xiàn) Queue 接口里面的所有方法。

          雖然實(shí)現(xiàn)流程和 FIFO 是一樣的,但是具體的實(shí)現(xiàn)是不一樣的,比如 DeltaFIFO 的對(duì)象鍵計(jì)算函數(shù)就不同:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?DeltaFIFO?的對(duì)象鍵計(jì)算函數(shù)
          func?(f?*DeltaFIFO)?KeyOf(obj?interface{})?(string,?error)?{
          ?//?用?Deltas?做一次轉(zhuǎn)換,判斷是否是?Delta?切片
          ??if?d,?ok?:=?obj.(Deltas);?ok?{
          ??if?len(d)?==?0?{
          ???return?"",?KeyError{obj,?ErrZeroLengthDeltasObject}
          ??}
          ????//?使用最新版本的對(duì)象進(jìn)行計(jì)算
          ??obj?=?d.Newest().Object
          ?}
          ?if?d,?ok?:=?obj.(DeletedFinalStateUnknown);?ok?{
          ??return?d.Key,?nil
          ?}
          ??//?具體計(jì)算還是要看初始化?DeltaFIFO?傳入的?KeyFunc?函數(shù)
          ?return?f.keyFunc(obj)
          }

          // Newest 返回最新的 Delta,如果沒(méi)有則返回 nil。
          func?(d?Deltas)?Newest()?*Delta?{
          ?if?n?:=?len(d);?n?>?0?{
          ??return?&d[n-1]
          ?}
          ?return?nil
          }

          DeltaFIFO 的計(jì)算對(duì)象鍵的函數(shù)為什么要先做一次 Deltas 的類型轉(zhuǎn)換呢?那是因?yàn)?Pop() 出去的對(duì)象很可能還要再添加進(jìn)來(lái)(比如處理失敗需要再放進(jìn)來(lái)),此時(shí)添加的對(duì)象就是已經(jīng)封裝好的 Deltas 對(duì)象了。

          然后同樣按照上面的方式來(lái)分析 DeltaFIFO 的實(shí)現(xiàn),首先查看 Store 存儲(chǔ)部分的實(shí)現(xiàn),也就是增、刪、改、查功能。

          同樣的 Add、Update 和 Delete 的實(shí)現(xiàn)方法基本上是一致的:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?Add?插入一個(gè)元素放入到隊(duì)列中
          func?(f?*DeltaFIFO)?Add(obj?interface{})?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true??//?隊(duì)列第一次寫(xiě)入操作都要設(shè)置標(biāo)記
          ?return?f.queueActionLocked(Added,?obj)
          }

          //?Update?和?Add?一樣,只是是?Updated?一個(gè)?Delta
          func?(f?*DeltaFIFO)?Update(obj?interface{})?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?f.populated?=?true??//?隊(duì)列第一次寫(xiě)入操作都要設(shè)置標(biāo)記
          ?return?f.queueActionLocked(Updated,?obj)
          }

          //?刪除和添加一樣,但會(huì)產(chǎn)生一個(gè)刪除的 Delta。如果給定的對(duì)象還不存在,它將被忽略。
          //?例如,它可能已經(jīng)被替換(重新list)刪除了。
          //?在這個(gè)方法中,`f.knownObjects`?如果不為nil,則提供(通過(guò)GetByKey)被認(rèn)為已經(jīng)存在的?_additional_?對(duì)象。
          func?(f?*DeltaFIFO)?Delete(obj?interface{})?error?{
          ?id,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?隊(duì)列第一次寫(xiě)入操作都要設(shè)置這個(gè)標(biāo)記
          ?f.populated?=?true
          ??//?相當(dāng)于沒(méi)有?Indexer?的時(shí)候,就通過(guò)自己的存儲(chǔ)對(duì)象檢查下
          ?if?f.knownObjects?==?nil?{
          ??if?_,?exists?:=?f.items[id];?!exists?{
          ???//?自己的存儲(chǔ)里面都沒(méi)有,那也就不用處理了
          ???return?nil
          ??}
          ?}?else?
          ??//?相當(dāng)于 Indexer 里面和自己的存儲(chǔ)里面都沒(méi)有這個(gè)對(duì)象,那么也就相當(dāng)于不存在了,就不處理了。
          ????_,?exists,?err?:=?f.knownObjects.GetByKey(id)
          ??_,?itemsExist?:=?f.items[id]
          ??if?err?==?nil?&&?!exists?&&?!itemsExist?{
          ???return?nil
          ??}
          ?}
          ??//?同樣調(diào)用?queueActionLocked?將數(shù)據(jù)放入隊(duì)列
          ?return?f.queueActionLocked(Deleted,?obj)
          }

          可以看出 Add 、Update、Delete 方法最終都是調(diào)用的 queueActionLocked 函數(shù)來(lái)實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          // queueActionLocked 追加到對(duì)象的 delta 列表中。
          //?調(diào)用者必須先 lock。
          func?(f?*DeltaFIFO)?queueActionLocked(actionType?DeltaType,?obj?interface{})?error?{
          ?id,?err?:=?f.KeyOf(obj)??//?獲取對(duì)象鍵
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ??
          ??//?將?actionType?和資源對(duì)象?obj?構(gòu)造成?Delta,添加到?items?中
          ?newDeltas?:=?append(f.items[id],?Delta{actionType,?obj})
          ??//?去重
          ?newDeltas?=?dedupDeltas(newDeltas)

          ?if?len(newDeltas)?>?0?{
          ????//?新對(duì)象的?key?不在隊(duì)列中則插入?queue?隊(duì)列
          ??if?_,?exists?:=?f.items[id];?!exists?{
          ???f.queue?=?append(f.queue,?id)
          ??}
          ????//?重新更新?items
          ??f.items[id]?=?newDeltas
          ????//?通知所有的消費(fèi)者解除阻塞
          ??f.cond.Broadcast()
          ?}?else?{
          ????//?這種情況不會(huì)發(fā)生,因?yàn)榻o定一個(gè)非空列表時(shí),dedupDeltas 永遠(yuǎn)不會(huì)返回一個(gè)空列表。
          ????//?但如果真的返回了一個(gè)空列表,那么我們就需要從 map 中刪除這個(gè)元素。
          ??delete(f.items,?id)
          ?}
          ?return?nil
          }

          //?==============排重==============
          //?重新list和watch可以以任何順序多次提供相同的更新。
          //?如果最近的兩個(gè) Delta 相同,則將它們合并。
          func?dedupDeltas(deltas?Deltas)?Deltas?{
          ?n?:=?len(deltas)??
          ?if?n?2?{??//?小于兩個(gè)?delta?沒(méi)必要合并了
          ??return?deltas
          ?}
          ??//?Deltas是[]Delta,新的對(duì)象是追加到Slice后面
          ??//?所以取最后兩個(gè)元素來(lái)判斷是否相同
          ?a?:=?&deltas[n-1]
          ?b?:=?&deltas[n-2]
          ??//?執(zhí)行去重操作
          ?if?out?:=?isDup(a,?b);?out?!=?nil?{
          ????//?將去重保留下來(lái)的delta追加到前面n-2個(gè)delta中去
          ??d?:=?append(Deltas{},?deltas[:n-2]...)
          ??return?append(d,?*out)
          ?}
          ?return?deltas
          }

          //?判斷兩個(gè)?Delta?是否是重復(fù)的
          func?isDup(a,?b?*Delta)?*Delta?{
          ??//?這個(gè)函數(shù)應(yīng)該應(yīng)該可以判斷多種類型的重復(fù),目前只有刪除這一種能夠合并
          ?if?out?:=?isDeletionDup(a,?b);?out?!=?nil?{
          ??return?out
          ?}
          ?return?nil
          }

          //?判斷是否為刪除類型的重復(fù)
          func?isDeletionDup(a,?b?*Delta)?*Delta?{
          ??//?二者類型都是刪除那肯定有一個(gè)是重復(fù)的,則返回一個(gè)即可
          ?if?b.Type?!=?Deleted?||?a.Type?!=?Deleted?{
          ??return?nil
          ?}
          ?//?更復(fù)雜的檢查還是這樣就夠了?
          ?if?_,?ok?:=?b.Object.(DeletedFinalStateUnknown);?ok?{
          ??return?a
          ?}
          ?return?b
          }

          因?yàn)橄到y(tǒng)對(duì)于刪除的對(duì)象有 DeletedFinalStateUnknown 這個(gè)狀態(tài),所以會(huì)存在兩次刪除的情況,但是兩次添加同一個(gè)對(duì)象由于 APIServer 可以保證對(duì)象的唯一性,所以這里沒(méi)有考慮合并兩次添加操作的情況。

          然后看看其他幾個(gè)主要方法的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?列舉接口實(shí)現(xiàn)
          func?(f?*DeltaFIFO)?List()?[]interface{}?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?return?f.listLocked()
          }
          //?真正的列舉實(shí)現(xiàn)
          func?(f?*DeltaFIFO)?listLocked()?[]interface{}?{
          ?list?:=?make([]interface{},?0,?len(f.items))
          ?for?_,?item?:=?range?f.items?{
          ??list?=?append(list,?item.Newest().Object)
          ?}
          ?return?list
          }

          //?返回現(xiàn)在 FIFO 中所有的對(duì)象鍵。
          func?(f?*DeltaFIFO)?ListKeys()?[]string?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?list?:=?make([]string,?0,?len(f.items))
          ?for?key?:=?range?f.items?{
          ??list?=?append(list,?key)
          ?}
          ?return?list
          }

          //?根據(jù)對(duì)象獲取FIFO中對(duì)應(yīng)的元素
          func?(f?*DeltaFIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
          ?key,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?nil,?false,?KeyError{obj,?err}
          ?}
          ?return?f.GetByKey(key)
          }

          //?通過(guò)對(duì)象鍵獲取FIFO中的元素(獲取到的是?Delta?數(shù)組)
          func?(f?*DeltaFIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
          ?f.lock.RLock()
          ?defer?f.lock.RUnlock()
          ?d,?exists?:=?f.items[key]
          ?if?exists?{
          ??//?復(fù)制元素的slice,這樣對(duì)這個(gè)切片的操作就不會(huì)影響返回的對(duì)象了。
          ??d?=?copyDeltas(d)
          ?}
          ?return?d,?exists,?nil
          }

          // copyDeltas 返回 d 的淺拷貝,也就是說(shuō)它拷貝的是切片,而不是切片中的對(duì)象。
          // Get/List 可以返回一個(gè)不會(huì)被后續(xù)修改影響的對(duì)象。
          func?copyDeltas(d?Deltas)?Deltas?{
          ?d2?:=?make(Deltas,?len(d))
          ?copy(d2,?d)
          ?return?d2
          }

          //?判斷隊(duì)列是否關(guān)閉了
          func?(f?*DeltaFIFO)?IsClosed()?bool?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ?return?f.closed
          }

          接下來(lái)我們來(lái)看看 Replace 函數(shù)的時(shí)候,這個(gè)也是 Store 里面的定義的接口:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          func?(f?*DeltaFIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?keys?:=?make(sets.String,?len(list))

          ?//?keep?對(duì)老客戶端的向后兼容
          ?action?:=?Sync
          ?if?f.emitDeltaTypeReplaced?{
          ??action?=?Replaced
          ?}
          ??//?遍歷?list
          ?for?_,?item?:=?range?list?{
          ????//?計(jì)算對(duì)象鍵
          ??key,?err?:=?f.KeyOf(item)
          ??if?err?!=?nil?{
          ???return?KeyError{item,?err}
          ??}
          ????//?記錄處理過(guò)的對(duì)象鍵,使用?set?集合存儲(chǔ)
          ??keys.Insert(key)
          ????//?重新同步一次對(duì)象
          ??if?err?:=?f.queueActionLocked(action,?item);?err?!=?nil?{
          ???return?fmt.Errorf("couldn't?enqueue?object:?%v",?err)
          ??}
          ?}
          ??//?如果沒(méi)有?Indexer?存儲(chǔ)的話,自己存儲(chǔ)的就是所有的老對(duì)象
          ??//?目的要看看那些老對(duì)象不在全量集合中,那么就是刪除的對(duì)象了
          ?if?f.knownObjects?==?nil?{
          ??//?針對(duì)自己的列表進(jìn)行刪除檢測(cè)。
          ??queuedDeletions?:=?0
          ????//?遍歷所有元素
          ??for?k,?oldItem?:=?range?f.items?{
          ??????//?如果元素在輸入的對(duì)象中存在就忽略了。
          ???if?keys.Has(k)?{
          ????continue
          ???}
          ??????//?到這里證明當(dāng)前的?oldItem?元素不在輸入的列表中,證明對(duì)象已經(jīng)被刪除了
          ???var?deletedObj?interface{}
          ???if?n?:=?oldItem.Newest();?n?!=?nil?{
          ????deletedObj?=?n.Object
          ???}
          ???queuedDeletions++
          ??????//?因?yàn)榭赡荜?duì)列中已經(jīng)存在?Deleted?類型的元素了,避免重復(fù),所以采用?DeletedFinalStateUnknown?來(lái)包裝下對(duì)象
          ???if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
          ????return?err
          ???}
          ??}
          ????//?如果?populated?沒(méi)有設(shè)置,說(shuō)明是第一次并且還沒(méi)有任何修改操作執(zhí)行過(guò)
          ??if?!f.populated?{
          ??????//?這個(gè)時(shí)候需要標(biāo)記下
          ???f.populated?=?true
          ???//?記錄第一次設(shè)置的對(duì)象數(shù)量
          ???f.initialPopulationCount?=?len(list)?+?queuedDeletions
          ??}

          ??return?nil
          ?}
          ?
          ??//?檢測(cè)已經(jīng)刪除但是沒(méi)有在隊(duì)列中的元素。
          ??//?從?Indexer?中獲取所有的對(duì)象鍵
          ?knownKeys?:=?f.knownObjects.ListKeys()
          ?queuedDeletions?:=?0
          ?for?_,?k?:=?range?knownKeys?{
          ????//?對(duì)象存在就忽略
          ??if?keys.Has(k)?{
          ???continue
          ??}
          ????//?到這里同樣證明當(dāng)前的對(duì)象鍵對(duì)應(yīng)的對(duì)象被刪除了
          ????//?獲取被刪除的對(duì)象鍵對(duì)應(yīng)的對(duì)象
          ??deletedObj,?exists,?err?:=?f.knownObjects.GetByKey(k)
          ??if?err?!=?nil?{
          ???deletedObj?=?nil
          ???klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?placing?DeleteFinalStateUnknown?marker?without?object",?err,?k)
          ??}?else?if?!exists?{
          ???deletedObj?=?nil
          ???klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?placing?DeleteFinalStateUnknown?marker?without?object",?k)
          ??}
          ????//?累加刪除的對(duì)象數(shù)量
          ??queuedDeletions++
          ????//?把對(duì)象刪除的?Delta?放入隊(duì)列,和上面一樣避免重復(fù),使用?DeletedFinalStateUnknown?包裝下對(duì)象
          ??if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
          ???return?err
          ??}
          ?}
          ??//?和上面一致
          ?if?!f.populated?{
          ??f.populated?=?true
          ??f.initialPopulationCount?=?len(list)?+?queuedDeletions
          ?}

          ?return?nil
          }

          Replace() 主要用于實(shí)現(xiàn)對(duì)象的全量更新,由于 DeltaFIFO 對(duì)外輸出的就是所有目標(biāo)的增量變化,所以每次全量更新都要判斷對(duì)象是否已經(jīng)刪除,因?yàn)樵谌扛虑翱赡軟](méi)有收到目標(biāo)刪除的請(qǐng)求。這一點(diǎn)與 cache 不同,cache 的Replace() 相當(dāng)于重建,因?yàn)?cache 就是對(duì)象全量的一種內(nèi)存映射,所以Replace() 就等于重建。

          接下來(lái)就是實(shí)現(xiàn) DeltaFIFO ?特性的 Pop 函數(shù)的實(shí)現(xiàn)了:

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          func?(f?*DeltaFIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?for?{
          ????//?隊(duì)列中是否有數(shù)據(jù)
          ??for?len(f.queue)?==?0?{
          ???//?如果隊(duì)列關(guān)閉了這直接返回錯(cuò)誤
          ???if?f.IsClosed()?{
          ????return?nil,?ErrFIFOClosed
          ???}
          ??????//?沒(méi)有數(shù)據(jù)就一直等待
          ???f.cond.Wait()
          ??}
          ????//?取出第一個(gè)對(duì)象鍵
          ??id?:=?f.queue[0]
          ????//?更新下queue,相當(dāng)于把第一個(gè)元素彈出去了
          ??f.queue?=?f.queue[1:]
          ????//?對(duì)象計(jì)數(shù)減一,當(dāng)減到0就說(shuō)明外部已經(jīng)全部同步完畢了
          ??if?f.initialPopulationCount?>?0?{
          ???f.initialPopulationCount--
          ??}
          ????//?取出真正的對(duì)象,queue里面是對(duì)象鍵
          ??item,?ok?:=?f.items[id]
          ??if?!ok?{
          ???// Item 可能后來(lái)被刪除了。
          ???continue
          ??}
          ????//?刪除對(duì)象
          ??delete(f.items,?id)
          ????//?調(diào)用處理對(duì)象的函數(shù)
          ??err?:=?process(item)
          ????//?如果處理出錯(cuò),那就重新入隊(duì)列
          ??if?e,?ok?:=?err.(ErrRequeue);?ok?{
          ???f.addIfNotPresent(id,?item)
          ???err?=?e.Err
          ??}
          ??//?這里不需要 copyDeltas,因?yàn)槲覀円阉袡?quán)轉(zhuǎn)移給調(diào)用者。
          ??return?item,?err
          ?}
          }

          然后再簡(jiǎn)單看下其他幾個(gè)函數(shù)的實(shí)現(xiàn):

          //?k8s.io/client-go/tools/cache/delta_fifo.go

          //?AddIfNotPresent?插入不存在的對(duì)象到隊(duì)列中
          func?(f?*DeltaFIFO)?AddIfNotPresent(obj?interface{})?error?{
          ??//?放入的必須是?Delta?數(shù)組,就是通過(guò)?Pop?彈出的對(duì)象
          ?deltas,?ok?:=?obj.(Deltas)
          ?if?!ok?{
          ??return?fmt.Errorf("object?must?be?of?type?deltas,?but?got:?%#v",?obj)
          ?}
          ??//?多個(gè)?Delta?都是同一個(gè)對(duì)象,所以用最新的來(lái)獲取對(duì)象鍵即可
          ?id,?err?:=?f.KeyOf(deltas.Newest().Object)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?調(diào)用真正的插入實(shí)現(xiàn)
          ?f.addIfNotPresent(id,?deltas)
          ?return?nil
          }

          //?插入對(duì)象的真正實(shí)現(xiàn)
          func?(f?*DeltaFIFO)?addIfNotPresent(id?string,?deltas?Deltas)?{
          ?f.populated?=?true
          ??//?如果對(duì)象已經(jīng)存在了,則忽略
          ?if?_,?exists?:=?f.items[id];?exists?{
          ??return
          ?}
          ??//?不在隊(duì)列中,則插入隊(duì)列
          ?f.queue?=?append(f.queue,?id)
          ?f.items[id]?=?deltas
          ??//?通知消費(fèi)者解除阻塞
          ?f.cond.Broadcast()
          }

          // Resync 重新同步,帶有 Sync 類型的 Delta 對(duì)象。
          func?(f?*DeltaFIFO)?Resync()?error?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ??//?Indexer?為空,重新同步無(wú)意義
          ?if?f.knownObjects?==?nil?{
          ??return?nil
          ?}
          ??//?獲取?Indexer?中所有的對(duì)象鍵
          ?keys?:=?f.knownObjects.ListKeys()
          ??//?循環(huán)對(duì)象鍵,為每個(gè)對(duì)象產(chǎn)生一個(gè)同步的?Delta
          ?for?_,?k?:=?range?keys?{
          ??if?err?:=?f.syncKeyLocked(k);?err?!=?nil?{
          ???return?err
          ??}
          ?}
          ?return?nil
          }
          //?對(duì)象同步接口的真正實(shí)現(xiàn)
          func?(f?*DeltaFIFO)?syncKeyLocked(key?string)?error?{
          ??//?獲取?Indexer?中的對(duì)象
          ?obj,?exists,?err?:=?f.knownObjects.GetByKey(key)
          ?if?err?!=?nil?{
          ??klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?unable?to?queue?object?for?sync",?err,?key)
          ??return?nil
          ?}?else?if?!exists?{
          ??klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?unable?to?queue?object?for?sync",?key)
          ??return?nil
          ?}

          ?//?計(jì)算對(duì)象的鍵值,對(duì)象鍵不是已經(jīng)傳入了么?
          ??//?其實(shí)傳入的是存在?Indexer?里面的對(duì)象鍵,可能與這里的計(jì)算方式不同
          ??id,?err?:=?f.KeyOf(obj)
          ?if?err?!=?nil?{
          ??return?KeyError{obj,?err}
          ?}
          ??//?對(duì)象已經(jīng)在存在,說(shuō)明后續(xù)會(huì)通知對(duì)象的新變化,所以再加更新也沒(méi)意義
          ?if?len(f.items[id])?>?0?{
          ??return?nil
          ?}
          ??//?添加對(duì)象同步的這個(gè)?Delta
          ?if?err?:=?f.queueActionLocked(Sync,?obj);?err?!=?nil?{
          ??return?fmt.Errorf("couldn't?queue?object:?%v",?err)
          ?}
          ?return?nil
          }

          // HasSynced 如果 Add/Update/Delete/AddIfNotPresent 第一次被調(diào)用則會(huì)返回 true。
          //?或者通過(guò) Replace 插入的元素都已經(jīng) Pop 完成了,則也會(huì)返回 true。
          func?(f?*DeltaFIFO)?HasSynced()?bool?{
          ?f.lock.Lock()
          ?defer?f.lock.Unlock()
          ?//?同步就是全量?jī)?nèi)容已經(jīng)進(jìn)入?Indexer,Indexer?已經(jīng)是系統(tǒng)中對(duì)象的全量快照了
          ??//?相當(dāng)于就是全量對(duì)象從隊(duì)列中全部彈出進(jìn)入?Indexer,證明已經(jīng)同步完成了
          ?return?f.populated?&&?f.initialPopulationCount?==?0
          }

          //?關(guān)閉隊(duì)列
          func?(f?*DeltaFIFO)?Close()?{
          ?f.closedLock.Lock()
          ?defer?f.closedLock.Unlock()
          ?f.closed?=?true
          ?f.cond.Broadcast()
          }

          這里是否已同步是根據(jù) populatedinitialPopulationCount 這兩個(gè)變量來(lái)判斷的,是否同步指的是第一次從 APIServer 中獲取全量的對(duì)象是否全部 Pop 完成,全局同步到了緩存中,也就是 Indexer 中去了,因?yàn)?Pop 一次 initialPopulationCount 就會(huì)減1,當(dāng)為0的時(shí)候就表示 Pop 完成了。

          總結(jié)

          到這里我們就將完整實(shí)現(xiàn)了 DeltaFIFO,然后加上前面的 Reflector 反射器,就可以結(jié)合起來(lái)了。

          Reflector 通過(guò) ListAndWatch 首先獲取全量的資源對(duì)象數(shù)據(jù),然后調(diào)用 DeltaFIFO 的 Replace() 方法全量插入隊(duì)列,然后后續(xù)通過(guò) Watch 操作根據(jù)資源對(duì)象的操作類型調(diào)用 DeltaFIFO 的 Add、Update、Delete 方法,將數(shù)據(jù)更新到隊(duì)列中。我們可以用下圖來(lái)總結(jié)這兩個(gè)組件之間的關(guān)系:

          至于 Pop 出來(lái)的元素如何處理,就要看 Pop 的回調(diào)函數(shù) PopProcessFunc 了。我們可以回到最初的 SharedInformer 中,在 sharedIndexInformer 的 Run 函數(shù)中就初始化了 DeltaFIFO,也配置了用于 Pop 回調(diào)處理的函數(shù):

          //?k8s.io/client-go/tools/cache/shared_informer.go

          func?(s?*sharedIndexInformer)?Run(stopCh?<-chan?struct{})?{
          ?defer?utilruntime.HandleCrash()
          ??//?初始化?DeltaFIFO,這里就可以看出來(lái)?KnownObjects?就是一個(gè)?Indexer
          ?fifo?:=?NewDeltaFIFOWithOptions(DeltaFIFOOptions{
          ??KnownObjects:??????????s.indexer,
          ??EmitDeltaTypeReplaced:?true,
          ?})

          ?cfg?:=?&Config{
          ??Queue:????????????fifo,
          ??ListerWatcher:????s.listerWatcher,
          ??ObjectType:???????s.objectType,
          ??FullResyncPeriod:?s.resyncCheckPeriod,
          ??RetryOnError:?????false,
          ??ShouldResync:?????s.processor.shouldResync,

          ??Process:?s.HandleDeltas,??//?指定?Pop?函數(shù)的回調(diào)處理函數(shù)
          ?}
          ?......
          }

          //?真正的?Pop?回調(diào)處理函數(shù)
          func?(s?*sharedIndexInformer)?HandleDeltas(obj?interface{})?error?{
          ?s.blockDeltas.Lock()
          ?defer?s.blockDeltas.Unlock()

          ?//?from?oldest?to?newest
          ?for?_,?d?:=?range?obj.(Deltas)?{
          ??switch?d.Type?{
          ??case?Sync,?Replaced,?Added,?Updated:
          ???s.cacheMutationDetector.AddObject(d.Object)
          ???if?old,?exists,?err?:=?s.indexer.Get(d.Object);?err?==?nil?&&?exists?{
          ????......
          ???}?else?{
          ????????//?將對(duì)象添加到?Indexer?中
          ????if?err?:=?s.indexer.Add(d.Object);?err?!=?nil?{
          ?????return?err
          ????}
          ????......
          ???}
          ??case?Deleted:
          ??????//?刪除?Indexer?中的對(duì)象
          ???if?err?:=?s.indexer.Delete(d.Object);?err?!=?nil?{
          ????return?err
          ???}
          ???......
          ??}
          ?}
          ?return?nil
          }

          從上面可以看出 DeltaFIFO 中的元素被彈出來(lái)后被同步到了 Indexer 存儲(chǔ)中,而在 DeltaFIFO 中的 KnownObjects 也就是這個(gè)指定的 Indexer,所以接下來(lái)我們就需要重點(diǎn)分析 Indexer 組件的實(shí)現(xiàn)了。




          K8S進(jìn)階訓(xùn)練營(yíng),點(diǎn)擊下方圖片了解詳情

          瀏覽 72
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  97AV电影 | 天天爱夜夜操 | 国产精品福利免费在线观看 | 天天干天天撸 | 婷婷五月天综合网 |