client-go 之 DeltaFIFO 實現(xiàn)原理
從 DeltaFIFO 的名字可以看出它是一個 FIFO,也就是一個先進(jìn)先出的隊列,而 Delta 表示的是變化的資源對象存儲,包含操作資源對象的類型和數(shù)據(jù),Reflector 就是這個隊列的生產(chǎn)者。
Delta
在了解 DeltaFIFO 之前我們需要先具體了解下什么是 Delta,我們先來看看 client-go 中是如何定義的,Delta 的數(shù)據(jù)結(jié)構(gòu)定義位于staging/src/k8s.io/client-go/tools/cache/delta_fifo.go文件中。
//?k8s.io/client-go/tools/cache/delta_fifo.go
//?DeltaType?是變化的類型(添加、刪除等)
type?DeltaType?string
//?變化的類型定義
const?(
?Added???DeltaType?=?"Added"?????//?增加
?Updated?DeltaType?=?"Updated"???//?更新
?Deleted?DeltaType?=?"Deleted"???//?刪除
?//?當(dāng)遇到 watch 錯誤,不得不進(jìn)行重新list時,就會觸發(fā) Replaced。
??//?我們不知道被替換的對象是否發(fā)生了變化。
?//
??//?注意:以前版本的 DeltaFIFO 也會對 Replace 事件使用 Sync。
??//?所以只有當(dāng)選項 EmitDeltaTypeReplaced 為真時才會觸發(fā) Replaced。
?Replaced?DeltaType?=?"Replaced"
?//?Sync?是針對周期性重新同步期間的合成事件
?Sync?DeltaType?=?"Sync"??????????//?同步
)
// Delta 是 DeltaFIFO 存儲的類型。
//?它告訴你發(fā)生了什么變化,以及變化后對象的狀態(tài)。
//
//?[*]?除非變化是刪除操作,否則你將得到對象被刪除前的最終狀態(tài)。
type?Delta?struct?{
?Type???DeltaType
?Object?interface{}
}
Delta 其實就是 Kubernetes 系統(tǒng)中帶有變化類型的資源對象,如下圖所示:

其實也非常好理解,比如我們現(xiàn)在添加了一個 Pod,那么這個 Delta 就是帶有 Added 這個類型的 Pod,如果是刪除了一個 Deployment,那么這個 Delta 就是帶有 Deleted 類型的 Deployment,為什么要帶上類型呢?因為我們需要根據(jù)不同的類型去執(zhí)行不同的操作,增加、更新、刪除的動作顯然是不一樣的。
FIFO
上面我們解釋了什么是 Delta,接下來需要說下 FIFO,我們說 FIFO 很好理解,就是一個先進(jìn)先出的隊列,Reflector 是其生產(chǎn)者,其數(shù)據(jù)結(jié)構(gòu)定義位于 ?staging/src/k8s.io/client-go/tools/cache/fifo.go 文件中:
//?k8s.io/client-go/tools/cache/fifo.go
type?FIFO?struct?{
?lock?sync.RWMutex
?cond?sync.Cond
??//?items?中的每一個?key?也在?queue?中
?items?map[string]interface{}
?queue?[]string
?//?如果第一批?items?被?Replace()?插入或者先調(diào)用了?Deleta/Add/Update
??//?則 populated 為 true。
?populated?bool
?//?第一次調(diào)用?Replace()?時插入的?items?數(shù)
?initialPopulationCount?int
??// keyFunc 用于生成排隊的 item 插入和檢索的 key。
?keyFunc?KeyFunc
??//?標(biāo)識隊列已關(guān)閉,以便在隊列清空時控制循環(huán)可以退出。
?closed?????bool
?closedLock?sync.Mutex
}
var?(
?_?=?Queue(&FIFO{})?//?FIFO?是一個?Queue
)
上面的 FIFO 數(shù)據(jù)結(jié)構(gòu)中定義了 items 和 queue 兩個屬性來保存隊列中的數(shù)據(jù),其中 queue 中存的是資源對象的 key 列表,而 items 是一個 map 類型,其 key 就是 queue 中保存的 key,value 值是真正的資源對象數(shù)據(jù)。既然是先進(jìn)進(jìn)去的隊列,那么就要具有隊列的基本功能,結(jié)構(gòu)體下面其實就有一個類型斷言,表示當(dāng)前的 FIFO 實現(xiàn)了 Queue 這個接口,所以 FIFO 要實現(xiàn)的功能都是在 Queue 中定義的,Queue 接口和 FIFO 位于同一文件中:
//?k8s.io/client-go/tools/cache/fifo.go
//?Queue?擴(kuò)展了?Store??//?with?a?collection?of?Store?keys?to?"process".
//?每一次添加、更新或刪除都可以將對象的key放入到該集合中。
//?Queue?具有使用給定的?accumulator?來推導(dǎo)出相應(yīng)的?key?的方法
//?Queue?可以從多個?goroutine?中并發(fā)訪問
//?Queue?可以被關(guān)閉,之后?Pop?操作會返回一個錯誤
type?Queue?interface?{
?Store
?// Pop 一直阻塞,直到至少有一個key要處理或隊列被關(guān)閉,隊列被關(guān)閉會返回一個錯誤。
??//?在前面的情況下?Pop?原子性地選擇一個?key?進(jìn)行處理,從?Store?中刪除關(guān)聯(lián)(key、accumulator)的數(shù)據(jù),
??//?并處理 accumulator。Pop 會返回被處理的 accumulator 和處理的結(jié)果。
?
?//?PopProcessFunc?函數(shù)可以返回一個?ErrRequeue{inner},在這種情況下,Pop?將
??//(a)把那個(key,accumulator)關(guān)聯(lián)作為原子處理的一部分返回到?Queue?中
??//?(b)?從 Pop 返回內(nèi)部錯誤。
?Pop(PopProcessFunc)?(interface{},?error)
?//?僅當(dāng)該?key?尚未與一個非空的?accumulator?相關(guān)聯(lián)的時候,AddIfNotPresent?將給定的?accumulator?放入?Queue(與?accumulator?的?key?相關(guān)聯(lián)的)
?AddIfNotPresent(interface{})?error
?
??//?如果第一批 keys 都已經(jīng) Popped,則 HasSynced 返回 true。
??//?如果在添加、更新、刪除之前發(fā)生了第一次?Replace?操作,則第一批?keys?為?true
??//?否則為空。
?HasSynced()?bool
?//?關(guān)閉該隊列
?Close()
}
從上面的定義中可以看出 Queue 這個接口擴(kuò)展了 Store 這個接口,這個就是前面我們說的本地存儲,隊列實際上也是一種存儲,然后在 Store 的基礎(chǔ)上增加 Pop、AddIfNotPresent、HasSynced、Close 4個函數(shù)就變成了 Queue 隊列了,所以我們優(yōu)先來看下 Store 這個接口的定義,該數(shù)據(jù)結(jié)構(gòu)定義位于文件 k8s.io/client-go/tools/cache/store.go 中:
//?k8s.io/client-go/tools/cache/store.go
// Store 是一個通用的對象存儲和處理的接口。
//?Store?包含一個從字符串?keys?到?accumulators?的映射,并具有?to/from?當(dāng)前
//?給定 key 關(guān)聯(lián)的 accumulators 添加、更新和刪除給定對象的操作。
//?一個 Store 還知道如何從給定的對象中獲取 key,所以很多操作只提供對象。
//
//?在最簡單的?Store?實現(xiàn)中,每個?accumulator?只是最后指定的對象,或者刪除后為空,
//?所以 Store 只是簡單的存儲。
//
// Reflector 反射器知道如何 watch 一個服務(wù)并更新一個 Store 存儲,這個包提供了 Store 的各種實現(xiàn)。
type?Store?interface?{
?// Add 將指定對象添加到與指定對象的 key 相關(guān)的 accumulator(累加器)中。
?Add(obj?interface{})?error
?//?Update?與指定對象的?key?相關(guān)的?accumulator?中更新指定的對象
?Update(obj?interface{})?error
?//?Delete?根據(jù)指定的對象?key?刪除指定的對象
?Delete(obj?interface{})?error
?//?List?返回當(dāng)前所有非空的?accumulators?的列表
?List()?[]interface{}
?//?ListKeys?返回當(dāng)前與非空?accumulators?關(guān)聯(lián)的所有?key?的列表
?ListKeys()?[]string
?//?Get?根據(jù)指定的對象獲取關(guān)聯(lián)的?accumulator
?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)
?//?GetByKey?根據(jù)指定的對象?key?獲取關(guān)聯(lián)的?accumulator
?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)
?//?Replace?會刪除原來Store中的內(nèi)容,并將新增的list的內(nèi)容存入Store中,即完全替換數(shù)據(jù)
??// Store 擁有 list 列表的所有權(quán),在調(diào)用此函數(shù)后,不應(yīng)該引用它了。
?Replace([]interface{},?string)?error
?// Resync 在 Store 中沒有意義,但是在 DeltaFIFO 中有意義。
?Resync()?error
}
//?KeyFunc?就是從一個對象中生成一個唯一的?Key?的函數(shù),上面的?FIFO?中就有用到
type?KeyFunc?func(obj?interface{})?(string,?error)
//?MetaNamespaceKeyFunc?是默認(rèn)的?KeyFunc,生成的?key?格式為:
//?<namespace>/<name>
//?如果是全局的,則namespace為空,那么生成的?key?就是?<name>
//?當(dāng)然要從?key?拆分出?namespace?和?name?也非常簡單
func?MetaNamespaceKeyFunc(obj?interface{})?(string,?error)?{
?if?key,?ok?:=?obj.(ExplicitKey);?ok?{
??return?string(key),?nil
?}
?meta,?err?:=?meta.Accessor(obj)
?if?err?!=?nil?{
??return?"",?fmt.Errorf("object?has?no?meta:?%v",?err)
?}
?if?len(meta.GetNamespace())?>?0?{
??return?meta.GetNamespace()?+?"/"?+?meta.GetName(),?nil
?}
?return?meta.GetName(),?nil
}
Store 就是一個通用的對象存儲和處理的接口,可以用來寫入對象和獲取對象。其中 cache 數(shù)據(jù)結(jié)構(gòu)就實現(xiàn)了上面的 Store 接口,但是這個屬于后面的 Indexer 部分的知識點(diǎn),這里我們就不展開說明了。
我們說 Queue 擴(kuò)展了 Store 接口,所以 Queue 本身也是一個存儲,只是在存儲的基礎(chǔ)上增加了 Pop 這樣的函數(shù)來實現(xiàn)彈出對象,是不是就變成了一個隊列了。
FIFO 就是一個具體的 Queue 實現(xiàn),按照順序彈出對象是不是就是一個先進(jìn)先出的隊列了?如下圖所示:

所以我們接下來看看 FIFO 是如何實現(xiàn)存儲和 Pop 的功能的。首先是實現(xiàn) Store 存儲中最基本的方法,第一個就是添加對象:
//?k8s.io/client-go/tools/cache/fifo.go
// Add 插入一個對象,將其放入隊列中,只有當(dāng)元素不在集合中時才會插入隊列。
func?(f?*FIFO)?Add(obj?interface{})?error?{
??//?獲取對象的?key
?id,?err?:=?f.keyFunc(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
?f.populated?=?true
??//?元素不在隊列中的時候才插入隊列
?if?_,?exists?:=?f.items[id];?!exists?{
??f.queue?=?append(f.queue,?id)
?}
??//?items?是一個?map,所以直接賦值給這個?key,這樣對更新元素也同樣適用
?f.items[id]?=?obj
?f.cond.Broadcast()
?return?nil
}
更新對象,實現(xiàn)非常簡單,因為上面的 Add 方法就包含了 Update 的實現(xiàn),因為 items 屬性是一個 Map,對象有更新直接將對應(yīng) key 的 value 值替換成新的對象即可:
//?k8s.io/client-go/tools/cache/fifo.go
//?Update?和?Add?相同的實現(xiàn)
func?(f?*FIFO)?Update(obj?interface{})?error?{
?return?f.Add(obj)
}
接著就是刪除 Delete 方法的實現(xiàn),這里可能大家會有一個疑問,下面的刪除實現(xiàn)只刪除了 items 中的元素,那這樣豈不是 queue 和 items 中的 key 會不一致嗎?的確會這樣,但是這是一個隊列,下面的 Pop() 函數(shù)會根據(jù) queue 里面的元素一個一個的彈出 key,沒有對象就不處理了,相當(dāng)于下面的 Pop() 函數(shù)中實現(xiàn)了 queue 的 key 的刪除:
//?k8s.io/client-go/tools/cache/fifo.go
// Delete 從隊列中移除一個對象。
//?不會添加到?queue?中去,這個實現(xiàn)是假設(shè)消費(fèi)者只關(guān)心對象
//?不關(guān)心它們被創(chuàng)建或添加的順序。
func?(f?*FIFO)?Delete(obj?interface{})?error?{
??//?獲取對象的?key
?id,?err?:=?f.keyFunc(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
?f.populated?=?true
??//?刪除?items?中?key?為?id?的元素,就是刪除隊列中的對象
?delete(f.items,?id)
??//?為什么不直接處理 queue 這個 slice 呢?
?return?err
}
然后是獲取隊列中所有對象的 List 方法的實現(xiàn):
//?k8s.io/client-go/tools/cache/fifo.go
//?List?獲取隊列中的所有對象
func?(f?*FIFO)?List()?[]interface{}?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
?list?:=?make([]interface{},?0,?len(f.items))
??//?獲取所有的items的values值(items是一個Map)
?for?_,?item?:=?range?f.items?{
??list?=?append(list,?item)
?}
?return?list
}
接著是獲取隊列中所有元素的 key 的 ListKeys 方法實現(xiàn):
//?k8s.io/client-go/tools/cache/fifo.go
// ListKeys 返回現(xiàn)在 FIFO 隊列中所有對象的 keys 列表。
func?(f?*FIFO)?ListKeys()?[]string?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
?list?:=?make([]string,?0,?len(f.items))
??//?獲取所有items的key值(items是一個Map)
?for?key?:=?range?f.items?{
??list?=?append(list,?key)
?}
?return?list
}
至于根據(jù)對象或者對象的 key 獲取隊列中的元素,就更簡單了:
//?k8s.io/client-go/tools/cache/fifo.go
//?Get?獲取指定對象在隊列中的元素
func?(f?*FIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
?key,?err?:=?f.keyFunc(obj)
?if?err?!=?nil?{
??return?nil,?false,?KeyError{obj,?err}
?}
??//?調(diào)用?GetByKey?實現(xiàn)
?return?f.GetByKey(key)
}
//?GetByKey?根據(jù)?key?獲取隊列中的元素
func?(f?*FIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
??//?因為?items?是一個?Map,所以直接根據(jù)?key?獲取即可
?item,?exists?=?f.items[key]
?return?item,?exists,?nil
}
然后是一個 Replace 替換函數(shù)的實現(xiàn):
//?k8s.io/client-go/tools/cache/fifo.go
// Replace 將刪除隊列中的內(nèi)容,'f'?擁有 map 的所有權(quán),調(diào)用該函數(shù)過后,不應(yīng)該再引用 map。
//?'f'?的隊列也會被重置,返回時,隊列將包含 map 中的元素,沒有特定的順序。
func?(f?*FIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
?//?從?list?中提取出?key?然后和里面的元素重新進(jìn)行映射
??items?:=?make(map[string]interface{},?len(list))
?for?_,?item?:=?range?list?{
??key,?err?:=?f.keyFunc(item)
??if?err?!=?nil?{
???return?KeyError{item,?err}
??}
??items[key]?=?item
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
?if?!f.populated?{
??f.populated?=?true
??f.initialPopulationCount?=?len(items)
?}
?//?重新設(shè)置?items?和?queue?的值
?f.items?=?items
?f.queue?=?f.queue[:0]
?for?id?:=?range?items?{
??f.queue?=?append(f.queue,?id)
?}
?if?len(f.queue)?>?0?{
??f.cond.Broadcast()
?}
?return?nil
}
還有 Store 存儲中的最后一個方法 Resync 的實現(xiàn):
//?k8s.io/client-go/tools/cache/fifo.go
// Resync 會保證 Store 中的每個對象在 queue 中都有它的 key。
//?
//?這應(yīng)該是禁止操作的,因為該屬性由所有操作維護(hù)
func?(f?*FIFO)?Resync()?error?{
?f.lock.Lock()
?defer?f.lock.Unlock()
??//?將所有?queue?中的元素放到一個?set?中
?inQueue?:=?sets.NewString()
?for?_,?id?:=?range?f.queue?{
??inQueue.Insert(id)
?}
??//?然后將所有?items?中的?key?加回到?queue?中去
?for?id?:=?range?f.items?{
??if?!inQueue.Has(id)?{
???f.queue?=?append(f.queue,?id)
??}
?}
?if?len(f.queue)?>?0?{
??f.cond.Broadcast()
?}
?return?nil
}
上面的所有方法就實現(xiàn)了一個普通的 Store,然后要想變成一個 Queue,還需要實現(xiàn)額外的 Pop 方法:
//?k8s.io/client-go/tools/cache/fifo.go
// Pop 會等到一個元素準(zhǔn)備好后再進(jìn)行處理,如果有多個元素準(zhǔn)備好了,則按照它們被添加或更新的順序返回。
//
//?在處理之前,元素會從隊列(和存儲)中移除,所以如果沒有成功處理,應(yīng)該用 AddIfNotPresent()?函數(shù)把它添加回來。
//?處理函數(shù)是在有鎖的情況下調(diào)用的,所以更新其中需要和隊列同步的數(shù)據(jù)結(jié)構(gòu)是安全的。
func?(f?*FIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?for?{
??//?當(dāng)隊列為空時,Pop()?的調(diào)用會被阻塞住,直到新的元素插入隊列后
??for?len(f.queue)?==?0?{
???if?f.IsClosed()?{
????return?nil,?ErrFIFOClosed
???}
???//?等待?condition?被廣播
???f.cond.Wait()
??}
????//?取出?queue?隊列中的第一個元素(key)
??id?:=?f.queue[0]
????//?刪除第一個元素
??f.queue?=?f.queue[1:]
??if?f.initialPopulationCount?>?0?{
???f.initialPopulationCount--
??}
????//?獲取被彈出的元素
??item,?ok?:=?f.items[id]
??if?!ok?{
???//?因為 item 可能已經(jīng)被刪除了。
???continue
??}
????//?刪除彈出的元素
??delete(f.items,?id)
????//?處理彈出的元素
??err?:=?process(item)
??if?e,?ok?:=?err.(ErrRequeue);?ok?{
??????//?如果處理沒成功,需要調(diào)用?addIfNotPresent?加回隊列
???f.addIfNotPresent(id,?item)
???err?=?e.Err
??}
??return?item,?err
?}
}
另外的一個就是上面提到的 AddIfNotPresent、HasSynced、Close 幾個函數(shù)的實現(xiàn):
//?k8s.io/client-go/tools/cache/fifo.go
// AddIfNotPresent 插入一個元素,將其放入隊列中。
//?如果元素已經(jīng)在集合中了,則會被忽略。
//
//?這在單個的?生產(chǎn)者/消費(fèi)者?的場景下非常有用,這樣消費(fèi)者可以安全地重試
//?而不需要與生產(chǎn)者爭奪,也不需要排隊等待過時的元素。
func?(f?*FIFO)?AddIfNotPresent(obj?interface{})?error?{
?id,?err?:=?f.keyFunc(obj)??//?獲取對象的?key
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
?f.addIfNotPresent(id,?obj)??//?調(diào)用?addIfNotPresent?真正的實現(xiàn)
?return?nil
}
// addIfNotPresent 會假設(shè)已經(jīng)持有 fifo 鎖了,如果不存在,則將其添加到隊列中去。
func?(f?*FIFO)?addIfNotPresent(id?string,?obj?interface{})?{
?f.populated?=?true
??//?存在則忽略
?if?_,?exists?:=?f.items[id];?exists?{
??return
?}
??//?添加到?queue?和?items?中去
?f.queue?=?append(f.queue,?id)
?f.items[id]?=?obj
??//?廣播?condition
?f.cond.Broadcast()
}
//?關(guān)閉隊列
func?(f?*FIFO)?Close()?{
?f.closedLock.Lock()
?defer?f.closedLock.Unlock()
??//?標(biāo)記為關(guān)閉
?f.closed?=?true
?f.cond.Broadcast()
}
//?如果先調(diào)用了 Add/Update/Delete/AddIfNotPresent,或者先調(diào)用了Update,但被 Replace()?插入的第一批元素已經(jīng)被彈出,則 HasSynced 返回true。
func?(f?*FIFO)?HasSynced()?bool?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?return?f.populated?&&?f.initialPopulationCount?==?0
}
到這里我們就實現(xiàn)了一個簡單的 FIFO 隊列,其實這里就是對 FIFO 這個數(shù)據(jù)結(jié)構(gòu)的理解,沒有特別的地方,只是在隊列里面有 items 和 queue 兩個屬性來維護(hù)隊列而已。
DeltaFIFO
上面我們已經(jīng)實現(xiàn)了 FIFO,接下來就看下一個 DeltaFIFO 是如何實現(xiàn)的,DeltaFIFO 和 FIFO 一樣也是一個隊列,但是也有不同的地方,里面的元素是一個 Delta,Delta 上面我們已經(jīng)提到表示的是帶有變化類型的資源對象。
DeltaFIFO 的數(shù)據(jù)結(jié)構(gòu)定義位于 staging/src/k8s.io/client-go/tools/cache/delta_fifo.go 文件中:
//?k8s.io/client-go/tools/cache/delta_fifo.go
type?DeltaFIFO?struct?{
?//?lock/cond?保護(hù)訪問的?items?和?queue
?lock?sync.RWMutex
?cond?sync.Cond
??//?用來存儲?Delta?數(shù)據(jù)?->?對象key:?Delta數(shù)組
?items?map[string]Deltas
??//?用來存儲資源對象的key
?queue?[]string
?//?通過?Replace()?接口將第一批對象放入隊列,或者第一次調(diào)用增、刪、改接口時標(biāo)記為true
?populated?bool
?//?通過?Replace()?接口(全量)將第一批對象放入隊列的對象數(shù)量
?initialPopulationCount?int
?//?對象鍵的計算函數(shù)
?keyFunc?KeyFunc
?//?knownObjects?列出?"known"?的鍵?--?影響到?Delete(),Replace()?和?Resync()
??//?knownObjects?其實就是?Indexer,里面存有已知全部的對象
?knownObjects?KeyListerGetter
?//?標(biāo)記?queue?被關(guān)閉了
??closed?????bool
?closedLock?sync.Mutex
?//?emitDeltaTypeReplaced?當(dāng)?Replace()?被調(diào)用的時候,是否要?emit?Replaced?或者?Sync
?// DeltaType(保留向后兼容)。
?emitDeltaTypeReplaced?bool
}
//?KeyListerGetter?任何知道如何列出鍵和按鍵獲取對象的東西
type?KeyListerGetter?interface?{
?KeyLister
?KeyGetter
}
//?獲取所有的鍵
type?KeyLister?interface?{
?ListKeys()?[]string
}
//?根據(jù)鍵獲取對象
type?KeyGetter?interface?{
?GetByKey(key?string)?(interface{},?bool,?error)
}
DeltaFIFO 與 FIFO 一樣都是一個 Queue,所以他們都實現(xiàn)了 Queue,所以我們這里來看下 DeltaFIFO 是如何實現(xiàn) Queue 功能的,當(dāng)然和 FIFO 一樣都是實現(xiàn) Queue 接口里面的所有方法。
雖然實現(xiàn)流程和 FIFO 是一樣的,但是具體的實現(xiàn)是不一樣的,比如 DeltaFIFO 的對象鍵計算函數(shù)就不同:
//?k8s.io/client-go/tools/cache/delta_fifo.go
//?DeltaFIFO?的對象鍵計算函數(shù)
func?(f?*DeltaFIFO)?KeyOf(obj?interface{})?(string,?error)?{
?//?用?Deltas?做一次轉(zhuǎn)換,判斷是否是?Delta?切片
??if?d,?ok?:=?obj.(Deltas);?ok?{
??if?len(d)?==?0?{
???return?"",?KeyError{obj,?ErrZeroLengthDeltasObject}
??}
????//?使用最新版本的對象進(jìn)行計算
??obj?=?d.Newest().Object
?}
?if?d,?ok?:=?obj.(DeletedFinalStateUnknown);?ok?{
??return?d.Key,?nil
?}
??//?具體計算還是要看初始化?DeltaFIFO?傳入的?KeyFunc?函數(shù)
?return?f.keyFunc(obj)
}
// Newest 返回最新的 Delta,如果沒有則返回 nil。
func?(d?Deltas)?Newest()?*Delta?{
?if?n?:=?len(d);?n?>?0?{
??return?&d[n-1]
?}
?return?nil
}
DeltaFIFO 的計算對象鍵的函數(shù)為什么要先做一次 Deltas 的類型轉(zhuǎn)換呢?那是因為 Pop() 出去的對象很可能還要再添加進(jìn)來(比如處理失敗需要再放進(jìn)來),此時添加的對象就是已經(jīng)封裝好的 Deltas 對象了。
然后同樣按照上面的方式來分析 DeltaFIFO 的實現(xiàn),首先查看 Store 存儲部分的實現(xiàn),也就是增、刪、改、查功能。
同樣的 Add、Update 和 Delete 的實現(xiàn)方法基本上是一致的:
//?k8s.io/client-go/tools/cache/delta_fifo.go
//?Add?插入一個元素放入到隊列中
func?(f?*DeltaFIFO)?Add(obj?interface{})?error?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?f.populated?=?true??//?隊列第一次寫入操作都要設(shè)置標(biāo)記
?return?f.queueActionLocked(Added,?obj)
}
//?Update?和?Add?一樣,只是是?Updated?一個?Delta
func?(f?*DeltaFIFO)?Update(obj?interface{})?error?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?f.populated?=?true??//?隊列第一次寫入操作都要設(shè)置標(biāo)記
?return?f.queueActionLocked(Updated,?obj)
}
//?刪除和添加一樣,但會產(chǎn)生一個刪除的 Delta。如果給定的對象還不存在,它將被忽略。
//?例如,它可能已經(jīng)被替換(重新list)刪除了。
//?在這個方法中,`f.knownObjects`?如果不為nil,則提供(通過GetByKey)被認(rèn)為已經(jīng)存在的?_additional_?對象。
func?(f?*DeltaFIFO)?Delete(obj?interface{})?error?{
?id,?err?:=?f.KeyOf(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
??//?隊列第一次寫入操作都要設(shè)置這個標(biāo)記
?f.populated?=?true
??//?相當(dāng)于沒有?Indexer?的時候,就通過自己的存儲對象檢查下
?if?f.knownObjects?==?nil?{
??if?_,?exists?:=?f.items[id];?!exists?{
???//?自己的存儲里面都沒有,那也就不用處理了
???return?nil
??}
?}?else?
??//?相當(dāng)于 Indexer 里面和自己的存儲里面都沒有這個對象,那么也就相當(dāng)于不存在了,就不處理了。
????_,?exists,?err?:=?f.knownObjects.GetByKey(id)
??_,?itemsExist?:=?f.items[id]
??if?err?==?nil?&&?!exists?&&?!itemsExist?{
???return?nil
??}
?}
??//?同樣調(diào)用?queueActionLocked?將數(shù)據(jù)放入隊列
?return?f.queueActionLocked(Deleted,?obj)
}
可以看出 Add 、Update、Delete 方法最終都是調(diào)用的 queueActionLocked 函數(shù)來實現(xiàn):
//?k8s.io/client-go/tools/cache/delta_fifo.go
// queueActionLocked 追加到對象的 delta 列表中。
//?調(diào)用者必須先 lock。
func?(f?*DeltaFIFO)?queueActionLocked(actionType?DeltaType,?obj?interface{})?error?{
?id,?err?:=?f.KeyOf(obj)??//?獲取對象鍵
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
??
??//?將?actionType?和資源對象?obj?構(gòu)造成?Delta,添加到?items?中
?newDeltas?:=?append(f.items[id],?Delta{actionType,?obj})
??//?去重
?newDeltas?=?dedupDeltas(newDeltas)
?if?len(newDeltas)?>?0?{
????//?新對象的?key?不在隊列中則插入?queue?隊列
??if?_,?exists?:=?f.items[id];?!exists?{
???f.queue?=?append(f.queue,?id)
??}
????//?重新更新?items
??f.items[id]?=?newDeltas
????//?通知所有的消費(fèi)者解除阻塞
??f.cond.Broadcast()
?}?else?{
????//?這種情況不會發(fā)生,因為給定一個非空列表時,dedupDeltas 永遠(yuǎn)不會返回一個空列表。
????//?但如果真的返回了一個空列表,那么我們就需要從 map 中刪除這個元素。
??delete(f.items,?id)
?}
?return?nil
}
//?==============排重==============
//?重新list和watch可以以任何順序多次提供相同的更新。
//?如果最近的兩個 Delta 相同,則將它們合并。
func?dedupDeltas(deltas?Deltas)?Deltas?{
?n?:=?len(deltas)??
?if?n?2?{??//?小于兩個?delta?沒必要合并了
??return?deltas
?}
??//?Deltas是[]Delta,新的對象是追加到Slice后面
??//?所以取最后兩個元素來判斷是否相同
?a?:=?&deltas[n-1]
?b?:=?&deltas[n-2]
??//?執(zhí)行去重操作
?if?out?:=?isDup(a,?b);?out?!=?nil?{
????//?將去重保留下來的delta追加到前面n-2個delta中去
??d?:=?append(Deltas{},?deltas[:n-2]...)
??return?append(d,?*out)
?}
?return?deltas
}
//?判斷兩個?Delta?是否是重復(fù)的
func?isDup(a,?b?*Delta)?*Delta?{
??//?這個函數(shù)應(yīng)該應(yīng)該可以判斷多種類型的重復(fù),目前只有刪除這一種能夠合并
?if?out?:=?isDeletionDup(a,?b);?out?!=?nil?{
??return?out
?}
?return?nil
}
//?判斷是否為刪除類型的重復(fù)
func?isDeletionDup(a,?b?*Delta)?*Delta?{
??//?二者類型都是刪除那肯定有一個是重復(fù)的,則返回一個即可
?if?b.Type?!=?Deleted?||?a.Type?!=?Deleted?{
??return?nil
?}
?//?更復(fù)雜的檢查還是這樣就夠了?
?if?_,?ok?:=?b.Object.(DeletedFinalStateUnknown);?ok?{
??return?a
?}
?return?b
}
因為系統(tǒng)對于刪除的對象有 DeletedFinalStateUnknown 這個狀態(tài),所以會存在兩次刪除的情況,但是兩次添加同一個對象由于 APIServer 可以保證對象的唯一性,所以這里沒有考慮合并兩次添加操作的情況。
然后看看其他幾個主要方法的實現(xiàn):
//?k8s.io/client-go/tools/cache/delta_fifo.go
//?列舉接口實現(xiàn)
func?(f?*DeltaFIFO)?List()?[]interface{}?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
?return?f.listLocked()
}
//?真正的列舉實現(xiàn)
func?(f?*DeltaFIFO)?listLocked()?[]interface{}?{
?list?:=?make([]interface{},?0,?len(f.items))
?for?_,?item?:=?range?f.items?{
??list?=?append(list,?item.Newest().Object)
?}
?return?list
}
//?返回現(xiàn)在 FIFO 中所有的對象鍵。
func?(f?*DeltaFIFO)?ListKeys()?[]string?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
?list?:=?make([]string,?0,?len(f.items))
?for?key?:=?range?f.items?{
??list?=?append(list,?key)
?}
?return?list
}
//?根據(jù)對象獲取FIFO中對應(yīng)的元素
func?(f?*DeltaFIFO)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
?key,?err?:=?f.KeyOf(obj)
?if?err?!=?nil?{
??return?nil,?false,?KeyError{obj,?err}
?}
?return?f.GetByKey(key)
}
//?通過對象鍵獲取FIFO中的元素(獲取到的是?Delta?數(shù)組)
func?(f?*DeltaFIFO)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
?f.lock.RLock()
?defer?f.lock.RUnlock()
?d,?exists?:=?f.items[key]
?if?exists?{
??//?復(fù)制元素的slice,這樣對這個切片的操作就不會影響返回的對象了。
??d?=?copyDeltas(d)
?}
?return?d,?exists,?nil
}
// copyDeltas 返回 d 的淺拷貝,也就是說它拷貝的是切片,而不是切片中的對象。
// Get/List 可以返回一個不會被后續(xù)修改影響的對象。
func?copyDeltas(d?Deltas)?Deltas?{
?d2?:=?make(Deltas,?len(d))
?copy(d2,?d)
?return?d2
}
//?判斷隊列是否關(guān)閉了
func?(f?*DeltaFIFO)?IsClosed()?bool?{
?f.closedLock.Lock()
?defer?f.closedLock.Unlock()
?return?f.closed
}
接下來我們來看看 Replace 函數(shù)的時候,這個也是 Store 里面的定義的接口:
//?k8s.io/client-go/tools/cache/delta_fifo.go
func?(f?*DeltaFIFO)?Replace(list?[]interface{},?resourceVersion?string)?error?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?keys?:=?make(sets.String,?len(list))
?//?keep?對老客戶端的向后兼容
?action?:=?Sync
?if?f.emitDeltaTypeReplaced?{
??action?=?Replaced
?}
??//?遍歷?list
?for?_,?item?:=?range?list?{
????//?計算對象鍵
??key,?err?:=?f.KeyOf(item)
??if?err?!=?nil?{
???return?KeyError{item,?err}
??}
????//?記錄處理過的對象鍵,使用?set?集合存儲
??keys.Insert(key)
????//?重新同步一次對象
??if?err?:=?f.queueActionLocked(action,?item);?err?!=?nil?{
???return?fmt.Errorf("couldn't?enqueue?object:?%v",?err)
??}
?}
??//?如果沒有?Indexer?存儲的話,自己存儲的就是所有的老對象
??//?目的要看看那些老對象不在全量集合中,那么就是刪除的對象了
?if?f.knownObjects?==?nil?{
??//?針對自己的列表進(jìn)行刪除檢測。
??queuedDeletions?:=?0
????//?遍歷所有元素
??for?k,?oldItem?:=?range?f.items?{
??????//?如果元素在輸入的對象中存在就忽略了。
???if?keys.Has(k)?{
????continue
???}
??????//?到這里證明當(dāng)前的?oldItem?元素不在輸入的列表中,證明對象已經(jīng)被刪除了
???var?deletedObj?interface{}
???if?n?:=?oldItem.Newest();?n?!=?nil?{
????deletedObj?=?n.Object
???}
???queuedDeletions++
??????//?因為可能隊列中已經(jīng)存在?Deleted?類型的元素了,避免重復(fù),所以采用?DeletedFinalStateUnknown?來包裝下對象
???if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
????return?err
???}
??}
????//?如果?populated?沒有設(shè)置,說明是第一次并且還沒有任何修改操作執(zhí)行過
??if?!f.populated?{
??????//?這個時候需要標(biāo)記下
???f.populated?=?true
???//?記錄第一次設(shè)置的對象數(shù)量
???f.initialPopulationCount?=?len(list)?+?queuedDeletions
??}
??return?nil
?}
?
??//?檢測已經(jīng)刪除但是沒有在隊列中的元素。
??//?從?Indexer?中獲取所有的對象鍵
?knownKeys?:=?f.knownObjects.ListKeys()
?queuedDeletions?:=?0
?for?_,?k?:=?range?knownKeys?{
????//?對象存在就忽略
??if?keys.Has(k)?{
???continue
??}
????//?到這里同樣證明當(dāng)前的對象鍵對應(yīng)的對象被刪除了
????//?獲取被刪除的對象鍵對應(yīng)的對象
??deletedObj,?exists,?err?:=?f.knownObjects.GetByKey(k)
??if?err?!=?nil?{
???deletedObj?=?nil
???klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?placing?DeleteFinalStateUnknown?marker?without?object",?err,?k)
??}?else?if?!exists?{
???deletedObj?=?nil
???klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?placing?DeleteFinalStateUnknown?marker?without?object",?k)
??}
????//?累加刪除的對象數(shù)量
??queuedDeletions++
????//?把對象刪除的?Delta?放入隊列,和上面一樣避免重復(fù),使用?DeletedFinalStateUnknown?包裝下對象
??if?err?:=?f.queueActionLocked(Deleted,?DeletedFinalStateUnknown{k,?deletedObj});?err?!=?nil?{
???return?err
??}
?}
??//?和上面一致
?if?!f.populated?{
??f.populated?=?true
??f.initialPopulationCount?=?len(list)?+?queuedDeletions
?}
?return?nil
}
Replace() 主要用于實現(xiàn)對象的全量更新,由于 DeltaFIFO 對外輸出的就是所有目標(biāo)的增量變化,所以每次全量更新都要判斷對象是否已經(jīng)刪除,因為在全量更新前可能沒有收到目標(biāo)刪除的請求。這一點(diǎn)與 cache 不同,cache 的Replace() 相當(dāng)于重建,因為 cache 就是對象全量的一種內(nèi)存映射,所以Replace() 就等于重建。
接下來就是實現(xiàn) DeltaFIFO ?特性的 Pop 函數(shù)的實現(xiàn)了:
//?k8s.io/client-go/tools/cache/delta_fifo.go
func?(f?*DeltaFIFO)?Pop(process?PopProcessFunc)?(interface{},?error)?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?for?{
????//?隊列中是否有數(shù)據(jù)
??for?len(f.queue)?==?0?{
???//?如果隊列關(guān)閉了這直接返回錯誤
???if?f.IsClosed()?{
????return?nil,?ErrFIFOClosed
???}
??????//?沒有數(shù)據(jù)就一直等待
???f.cond.Wait()
??}
????//?取出第一個對象鍵
??id?:=?f.queue[0]
????//?更新下queue,相當(dāng)于把第一個元素彈出去了
??f.queue?=?f.queue[1:]
????//?對象計數(shù)減一,當(dāng)減到0就說明外部已經(jīng)全部同步完畢了
??if?f.initialPopulationCount?>?0?{
???f.initialPopulationCount--
??}
????//?取出真正的對象,queue里面是對象鍵
??item,?ok?:=?f.items[id]
??if?!ok?{
???// Item 可能后來被刪除了。
???continue
??}
????//?刪除對象
??delete(f.items,?id)
????//?調(diào)用處理對象的函數(shù)
??err?:=?process(item)
????//?如果處理出錯,那就重新入隊列
??if?e,?ok?:=?err.(ErrRequeue);?ok?{
???f.addIfNotPresent(id,?item)
???err?=?e.Err
??}
??//?這里不需要 copyDeltas,因為我們要把所有權(quán)轉(zhuǎn)移給調(diào)用者。
??return?item,?err
?}
}
然后再簡單看下其他幾個函數(shù)的實現(xiàn):
//?k8s.io/client-go/tools/cache/delta_fifo.go
//?AddIfNotPresent?插入不存在的對象到隊列中
func?(f?*DeltaFIFO)?AddIfNotPresent(obj?interface{})?error?{
??//?放入的必須是?Delta?數(shù)組,就是通過?Pop?彈出的對象
?deltas,?ok?:=?obj.(Deltas)
?if?!ok?{
??return?fmt.Errorf("object?must?be?of?type?deltas,?but?got:?%#v",?obj)
?}
??//?多個?Delta?都是同一個對象,所以用最新的來獲取對象鍵即可
?id,?err?:=?f.KeyOf(deltas.Newest().Object)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?f.lock.Lock()
?defer?f.lock.Unlock()
??//?調(diào)用真正的插入實現(xiàn)
?f.addIfNotPresent(id,?deltas)
?return?nil
}
//?插入對象的真正實現(xiàn)
func?(f?*DeltaFIFO)?addIfNotPresent(id?string,?deltas?Deltas)?{
?f.populated?=?true
??//?如果對象已經(jīng)存在了,則忽略
?if?_,?exists?:=?f.items[id];?exists?{
??return
?}
??//?不在隊列中,則插入隊列
?f.queue?=?append(f.queue,?id)
?f.items[id]?=?deltas
??//?通知消費(fèi)者解除阻塞
?f.cond.Broadcast()
}
// Resync 重新同步,帶有 Sync 類型的 Delta 對象。
func?(f?*DeltaFIFO)?Resync()?error?{
?f.lock.Lock()
?defer?f.lock.Unlock()
??//?Indexer?為空,重新同步無意義
?if?f.knownObjects?==?nil?{
??return?nil
?}
??//?獲取?Indexer?中所有的對象鍵
?keys?:=?f.knownObjects.ListKeys()
??//?循環(huán)對象鍵,為每個對象產(chǎn)生一個同步的?Delta
?for?_,?k?:=?range?keys?{
??if?err?:=?f.syncKeyLocked(k);?err?!=?nil?{
???return?err
??}
?}
?return?nil
}
//?對象同步接口的真正實現(xiàn)
func?(f?*DeltaFIFO)?syncKeyLocked(key?string)?error?{
??//?獲取?Indexer?中的對象
?obj,?exists,?err?:=?f.knownObjects.GetByKey(key)
?if?err?!=?nil?{
??klog.Errorf("Unexpected?error?%v?during?lookup?of?key?%v,?unable?to?queue?object?for?sync",?err,?key)
??return?nil
?}?else?if?!exists?{
??klog.Infof("Key?%v?does?not?exist?in?known?objects?store,?unable?to?queue?object?for?sync",?key)
??return?nil
?}
?//?計算對象的鍵值,對象鍵不是已經(jīng)傳入了么?
??//?其實傳入的是存在?Indexer?里面的對象鍵,可能與這里的計算方式不同
??id,?err?:=?f.KeyOf(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
??//?對象已經(jīng)在存在,說明后續(xù)會通知對象的新變化,所以再加更新也沒意義
?if?len(f.items[id])?>?0?{
??return?nil
?}
??//?添加對象同步的這個?Delta
?if?err?:=?f.queueActionLocked(Sync,?obj);?err?!=?nil?{
??return?fmt.Errorf("couldn't?queue?object:?%v",?err)
?}
?return?nil
}
// HasSynced 如果 Add/Update/Delete/AddIfNotPresent 第一次被調(diào)用則會返回 true。
//?或者通過 Replace 插入的元素都已經(jīng) Pop 完成了,則也會返回 true。
func?(f?*DeltaFIFO)?HasSynced()?bool?{
?f.lock.Lock()
?defer?f.lock.Unlock()
?//?同步就是全量內(nèi)容已經(jīng)進(jìn)入?Indexer,Indexer?已經(jīng)是系統(tǒng)中對象的全量快照了
??//?相當(dāng)于就是全量對象從隊列中全部彈出進(jìn)入?Indexer,證明已經(jīng)同步完成了
?return?f.populated?&&?f.initialPopulationCount?==?0
}
//?關(guān)閉隊列
func?(f?*DeltaFIFO)?Close()?{
?f.closedLock.Lock()
?defer?f.closedLock.Unlock()
?f.closed?=?true
?f.cond.Broadcast()
}
這里是否已同步是根據(jù) populated 和 initialPopulationCount 這兩個變量來判斷的,是否同步指的是第一次從 APIServer 中獲取全量的對象是否全部 Pop 完成,全局同步到了緩存中,也就是 Indexer 中去了,因為 Pop 一次 initialPopulationCount 就會減1,當(dāng)為0的時候就表示 Pop 完成了。
總結(jié)
到這里我們就將完整實現(xiàn)了 DeltaFIFO,然后加上前面的 Reflector 反射器,就可以結(jié)合起來了。
Reflector 通過 ListAndWatch 首先獲取全量的資源對象數(shù)據(jù),然后調(diào)用 DeltaFIFO 的 Replace() 方法全量插入隊列,然后后續(xù)通過 Watch 操作根據(jù)資源對象的操作類型調(diào)用 DeltaFIFO 的 Add、Update、Delete 方法,將數(shù)據(jù)更新到隊列中。我們可以用下圖來總結(jié)這兩個組件之間的關(guān)系:

至于 Pop 出來的元素如何處理,就要看 Pop 的回調(diào)函數(shù) PopProcessFunc 了。我們可以回到最初的 SharedInformer 中,在 sharedIndexInformer 的 Run 函數(shù)中就初始化了 DeltaFIFO,也配置了用于 Pop 回調(diào)處理的函數(shù):
//?k8s.io/client-go/tools/cache/shared_informer.go
func?(s?*sharedIndexInformer)?Run(stopCh?<-chan?struct{})?{
?defer?utilruntime.HandleCrash()
??//?初始化?DeltaFIFO,這里就可以看出來?KnownObjects?就是一個?Indexer
?fifo?:=?NewDeltaFIFOWithOptions(DeltaFIFOOptions{
??KnownObjects:??????????s.indexer,
??EmitDeltaTypeReplaced:?true,
?})
?cfg?:=?&Config{
??Queue:????????????fifo,
??ListerWatcher:????s.listerWatcher,
??ObjectType:???????s.objectType,
??FullResyncPeriod:?s.resyncCheckPeriod,
??RetryOnError:?????false,
??ShouldResync:?????s.processor.shouldResync,
??Process:?s.HandleDeltas,??//?指定?Pop?函數(shù)的回調(diào)處理函數(shù)
?}
?......
}
//?真正的?Pop?回調(diào)處理函數(shù)
func?(s?*sharedIndexInformer)?HandleDeltas(obj?interface{})?error?{
?s.blockDeltas.Lock()
?defer?s.blockDeltas.Unlock()
?//?from?oldest?to?newest
?for?_,?d?:=?range?obj.(Deltas)?{
??switch?d.Type?{
??case?Sync,?Replaced,?Added,?Updated:
???s.cacheMutationDetector.AddObject(d.Object)
???if?old,?exists,?err?:=?s.indexer.Get(d.Object);?err?==?nil?&&?exists?{
????......
???}?else?{
????????//?將對象添加到?Indexer?中
????if?err?:=?s.indexer.Add(d.Object);?err?!=?nil?{
?????return?err
????}
????......
???}
??case?Deleted:
??????//?刪除?Indexer?中的對象
???if?err?:=?s.indexer.Delete(d.Object);?err?!=?nil?{
????return?err
???}
???......
??}
?}
?return?nil
}
從上面可以看出 DeltaFIFO 中的元素被彈出來后被同步到了 Indexer 存儲中,而在 DeltaFIFO 中的 KnownObjects 也就是這個指定的 Indexer,所以接下來我們就需要重點(diǎn)分析 Indexer 組件的實現(xiàn)了。
