client-go 之 Indexer 的理解
前面我們講到 DeltaFIFO 中的元素通過 Pop 函數(shù)彈出后,在指定的回調(diào)函數(shù)中將元素添加到了 Indexer 中。Indexer 是什么?字面意思是索引器,它就是 Informer 中的 LocalStore 部分,我們可以和數(shù)據(jù)庫進行類比,數(shù)據(jù)庫是建立在存儲之上的,索引也是構(gòu)建在存儲之上,只是和數(shù)據(jù)做了一個映射,使得按照某些條件查詢速度會非常快,所以說 Indexer 本身也是一個存儲,只是它在存儲的基礎上擴展了索引功能。從 Indexer 接口的定義可以證明這一點:
//?k8s.io/client-go/tools/cache/indexer.go
//?Indexer?使用多個索引擴展了?Store,并限制了每個累加器只能容納當前對象
//?這里有3種字符串需要說明:
//?1.?一個存儲鍵,在?Store?接口中定義(其實就是對象鍵)
//?2.?一個索引的名稱(相當于索引分類名稱)
//?3.?索引鍵,由?IndexFunc?生成,可以是一個字段值或從對象中計算出來的任何字符串
type?Indexer?interface?{
?Store??//?繼承了?Store?存儲接口,所以說?Indexer?也是存儲
?//?indexName?是索引類名稱,obj?是對象,計算?obj?在?indexName?索引類中的索引鍵,然后通過索引鍵把所有的對象取出來
??//?獲取?obj?對象在索引類中的索引鍵相匹配的對象
?Index(indexName?string,?obj?interface{})?([]interface{},?error)
?//?indexKey?是?indexName?索引分類中的一個索引鍵
??//?函數(shù)返回?indexKey?指定的所有對象鍵?IndexKeys?
?IndexKeys(indexName,?indexedValue?string)?([]string,?error)
?ListIndexFuncValues(indexName?string)?[]string
?ByIndex(indexName,?indexedValue?string)?([]interface{},?error)
?GetIndexers()?Indexers
?//?添加新的索引在存儲中
?AddIndexers(newIndexers?Indexers)?error
}
Indexer
在去查看 Indexer 的接口具體實現(xiàn)之前,我們需要了解 Indexer 中幾個非常重要的概念:Indices、Index、Indexers 及 IndexFunc。
//?k8s.io/client-go/tools/cache/indexer.go
//?用于計算一個對象的索引鍵集合
type?IndexFunc?func(obj?interface{})?([]string,?error)
//?索引鍵與對象鍵集合的映射
type?Index?map[string]sets.String
//?索引器名稱(或者索引分類)與?IndexFunc?的映射,相當于存儲索引的各種分類
type?Indexers?map[string]IndexFunc
//?索引器名稱與?Index?索引的映射
type?Indices?map[string]Index
這4個數(shù)據(jù)結(jié)構(gòu)的命名非常容易讓大家混淆,直接查看源碼也不是那么容易的。這里我們來仔細解釋下。首先什么叫索引,索引就是為了快速查找的,比如我們需要查找某個節(jié)點上的所有 Pod,那就讓 Pod 按照節(jié)點名稱排序列舉出來,對應的就是 Index 這個類型,具體的就是 map[node]sets.pod,但是如何去查找可以有多種方式,就是上面的 Indexers 這個類型的作用。我們可以用一個比較具體的示例來解釋他們的關(guān)系和含義,如下所示:
package?main
import?(
?"fmt"
?v1?"k8s.io/api/core/v1"
?"k8s.io/apimachinery/pkg/api/meta"
?metav1?"k8s.io/apimachinery/pkg/apis/meta/v1"
?"k8s.io/client-go/tools/cache"
)
const?(
?NamespaceIndexName?=?"namespace"
?NodeNameIndexName??=?"nodeName"
)
func?NamespaceIndexFunc(obj?interface{})?([]string,?error)?{
?m,?err?:=?meta.Accessor(obj)
?if?err?!=?nil?{
??return?[]string{""},?fmt.Errorf("object?has?no?meta:?%v",?err)
?}
?return?[]string{m.GetNamespace()},?nil
}
func?NodeNameIndexFunc(obj?interface{})?([]string,?error)?{
?pod,?ok?:=?obj.(*v1.Pod)
?if?!ok?{
??return?[]string{},?nil
?}
?return?[]string{pod.Spec.NodeName},?nil
}
func?main()?{
?index?:=?cache.NewIndexer(cache.MetaNamespaceKeyFunc,?cache.Indexers{
??NamespaceIndexName:?NamespaceIndexFunc,
??NodeNameIndexName:??NodeNameIndexFunc,
?})
?pod1?:=?&v1.Pod{
??ObjectMeta:?metav1.ObjectMeta{
???Name:??????"index-pod-1",
???Namespace:?"default",
??},
??Spec:?v1.PodSpec{NodeName:?"node1"},
?}
?pod2?:=?&v1.Pod{
??ObjectMeta:?metav1.ObjectMeta{
???Name:??????"index-pod-2",
???Namespace:?"default",
??},
??Spec:?v1.PodSpec{NodeName:?"node2"},
?}
?pod3?:=?&v1.Pod{
??ObjectMeta:?metav1.ObjectMeta{
???Name:??????"index-pod-3",
???Namespace:?"kube-system",
??},
??Spec:?v1.PodSpec{NodeName:?"node2"},
?}
?_?=?index.Add(pod1)
?_?=?index.Add(pod2)
?_?=?index.Add(pod3)
?// ByIndex 兩個參數(shù):IndexName(索引器名稱)和 indexKey(需要檢索的key)
?pods,?err?:=?index.ByIndex(NamespaceIndexName,?"default")
?if?err?!=?nil?{
??panic(err)
?}
?for?_,?pod?:=?range?pods?{
??fmt.Println(pod.(*v1.Pod).Name)
?}
?fmt.Println("==========================")
?pods,?err?=?index.ByIndex(NodeNameIndexName,?"node2")
?if?err?!=?nil?{
??panic(err)
?}
?for?_,?pod?:=?range?pods?{
??fmt.Println(pod.(*v1.Pod).Name)
?}
}
//?輸出結(jié)果為:
index-pod-1
index-pod-2
==========================
index-pod-2
index-pod-3
在上面的示例中首先通過 NewIndexer 函數(shù)實例化 Indexer 對象,第一個參數(shù)就是用于計算資源對象鍵的函數(shù),這里我們使用的是 MetaNamespaceKeyFunc 這個默認的對象鍵函數(shù);第二個參數(shù)是 Indexers,也就是存儲索引器,上面我們知道 Indexers 的定義為 map[string]IndexFunc,為什么要定義成一個 map 呢?我們可以類比數(shù)據(jù)庫中,我們要查詢某項數(shù)據(jù),索引的方式是不是多種多樣啊?為了擴展,Kubernetes 中就使用一個 map 來存儲各種各樣的存儲索引器,至于存儲索引器如何生成,就使用一個 IndexFunc 暴露出去,給使用者自己實現(xiàn)即可。
這里我們定義的了兩個索引鍵生成函數(shù):NamespaceIndexFunc 與 NodeNameIndexFunc,一個根據(jù)資源對象的命名空間來進行索引,一個根據(jù)資源對象所在的節(jié)點進行索引。然后定義了3個 Pod,前兩個在 default 命名空間下面,另外一個在 kube-system 命名空間下面,然后通過 index.Add 函數(shù)添加這3個 Pod 資源對象。然后通過 index.ByIndex 函數(shù)查詢在名為 namespace 的索引器下面匹配索引鍵為 default 的 Pod 列表。也就是查詢 default 這個命名空間下面的所有 Pod,這里就是前兩個定義的 Pod。
對上面的示例如果我們理解了,那么就很容易理解上面定義的4個數(shù)據(jù)結(jié)構(gòu)了:
IndexFunc:索引器函數(shù),用于計算一個資源對象的索引值列表,上面示例是指定命名空間為索引值結(jié)果,當然我們也可以根據(jù)需求定義其他的,比如根據(jù) Label 標簽、Annotation 等屬性來生成索引值列表。 Index:存儲數(shù)據(jù),對于上面的示例,我們要查找某個命名空間下面的 Pod,那就要讓 Pod 按照其命名空間進行索引,對應的 Index 類型就是 map[namespace]sets.pod。Indexers:存儲索引器,key 為索引器名稱,value 為索引器的實現(xiàn)函數(shù),上面的示例就是 map["namespace"]MetaNamespaceIndexFunc。Indices:存儲緩存器,key 為索引器名稱,value 為緩存的數(shù)據(jù),對于上面的示例就是 map["namespace"]map[namespace]sets.pod。
可能最容易混淆的是 Indexers 和 Indices 這兩個概念,因為平時很多時候我們沒有怎么區(qū)分二者的關(guān)系,這里我們可以這樣理解:Indexers 是存儲索引(生成索引鍵)的,Indices 里面是存儲的真正的數(shù)據(jù)(對象鍵),這樣可能更好理解。

按照上面的理解我們可以得到上面示例的索引數(shù)據(jù)如下所示:
//?Indexers?就是包含的所有索引器(分類)以及對應實現(xiàn)
Indexers:?{??
??"namespace":?NamespaceIndexFunc,
??"nodeName":?NodeNameIndexFunc,
}
//?Indices?就是包含的所有索引分類中所有的索引數(shù)據(jù)
Indices:?{
?"namespace":?{??//namespace?這個索引分類下的所有索引數(shù)據(jù)
??"default":?["pod-1",?"pod-2"],??//?Index?就是一個索引鍵下所有的對象鍵列表
??"kube-system":?["pod-3"]???//?Index
?},
?"nodeName":?{??//nodeName?這個索引分類下的所有索引數(shù)據(jù)(對象鍵列表)
??"node1":?["pod-1"],??//?Index
??"node2":?["pod-2",?"pod-3"]??//?Index
?}
}
ThreadSafeMap
上面我們理解了 Indexer 中的幾個重要的數(shù)據(jù)類型,下面我們來看下 Indexer 接口的具體實現(xiàn) cache,位于文件 k8s.io/client-go/tools/cache/store.go 中:
//?[k8s.io/client-go/tools/cache/store.go](http://k8s.io/client-go/tools/cache/store.go)
//?cache?用一個?ThreadSafeStore?和一個關(guān)聯(lián)的?KeyFunc?來實現(xiàn)?Indexer
type?cache?struct?{
?//?cacheStorage?是一個線程安全的存儲
?cacheStorage?ThreadSafeStore
??//?keyFunc?用于計算對象鍵
?keyFunc?KeyFunc
}
我們可以看到這個 cache 包含一個 ThreadSafeStore 的屬性,這是一個并發(fā)安全的存儲,因為是存儲,所以自然就有存儲相關(guān)的增、刪、改、查等操作,Indexer 就是在 ThreadSafeMap 基礎上進行封裝的,實現(xiàn)了索引相關(guān)的功能。接下來我們先來看看 ThreadSafeStore 的定義,位于 k8s.io/client-go/tools/cache/thread_safe_store.go 文件中:
type?ThreadSafeStore?interface?{
?Add(key?string,?obj?interface{})
?Update(key?string,?obj?interface{})
?Delete(key?string)
?Get(key?string)?(item?interface{},?exists?bool)
?List()?[]interface{}
?ListKeys()?[]string
?Replace(map[string]interface{},?string)
?Index(indexName?string,?obj?interface{})?([]interface{},?error)
?IndexKeys(indexName,?indexKey?string)?([]string,?error)
?ListIndexFuncValues(name?string)?[]string
?ByIndex(indexName,?indexKey?string)?([]interface{},?error)
?GetIndexers()?Indexers
?AddIndexers(newIndexers?Indexers)?error
?Resync()?error
}
從接口的定義可以看出 ThreadSafeStore 和 Index 基本上差不多,但還是有一些區(qū)別的,這個接口是需要通過對象鍵來進行索引的。接下來我們來看看這個接口的具體實現(xiàn) threadSafeMap 的定義:
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?threadSafeMap?實現(xiàn)了?ThreadSafeStore
type?threadSafeMap?struct?{
?lock??sync.RWMutex
??//?存儲資源對象數(shù)據(jù),key(對象鍵)?通過?keyFunc?得到
??//?這就是真正存儲的數(shù)據(jù)(對象鍵?->?對象)
?items?map[string]interface{}
?//?indexers?索引分類與索引鍵函數(shù)的映射
?indexers?Indexers
?//?indices?通過索引可以快速找到對象鍵
?indices?Indices
}
不要把索引鍵和對象鍵搞混了,索引鍵是用于對象快速查找的;對象鍵是對象在存儲中的唯一命名,對象是通過名字+對象的方式存儲的。接下來我們來仔細看下接口的具體實現(xiàn),首先還是比較簡單的 Add、Delete、Update 幾個函數(shù)的實現(xiàn):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?添加對象
func?(c?*threadSafeMap)?Add(key?string,?obj?interface{})?{
?c.lock.Lock()
?defer?c.lock.Unlock()
??//?獲取老的對象
?oldObject?:=?c.items[key]
??//?寫入新的對象,items?中存的是?objKey?->?obj?的映射
?c.items[key]?=?obj
??//?添加了新的對象,所以要更新索引
?c.updateIndices(oldObject,?obj,?key)
}
//?更新對象,可以看到實現(xiàn)和?Add?是一樣的
func?(c?*threadSafeMap)?Update(key?string,?obj?interface{})?{
?c.lock.Lock()
?defer?c.lock.Unlock()
?oldObject?:=?c.items[key]
?c.items[key]?=?obj
?c.updateIndices(oldObject,?obj,?key)
}
//?刪除對象
func?(c?*threadSafeMap)?Delete(key?string)?{
?c.lock.Lock()
?defer?c.lock.Unlock()
??//?判斷對象是否存在,存在才執(zhí)行刪除操作
?if?obj,?exists?:=?c.items[key];?exists?{
????//?刪除對象索引
??c.deleteFromIndices(obj,?key)
????//?刪除對象本身
??delete(c.items,?key)
?}
}
可以看到基本的實現(xiàn)比較簡單,就是添加、更新、刪除對象數(shù)據(jù)后,然后更新或刪除對應的索引,所以我們需要查看下更新或刪除索引的具體實現(xiàn):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?updateIndices?更新索引
func?(c?*threadSafeMap)?updateIndices(oldObj?interface{},?newObj?interface{},?key?string)?{
?//?如果有舊的對象,需要先從索引中刪除這個對象
?if?oldObj?!=?nil?{
??c.deleteFromIndices(oldObj,?key)
?}
??//?循環(huán)所有的索引器
?for?name,?indexFunc?:=?range?c.indexers?{
????//?獲取對象的索引鍵
??indexValues,?err?:=?indexFunc(newObj)
??if?err?!=?nil?{
???panic(fmt.Errorf("unable?to?calculate?an?index?entry?for?key?%q?on?index?%q:?%v",?key,?name,?err))
??}
????//?得到當前索引器的索引
??index?:=?c.indices[name]
??if?index?==?nil?{
??????//?沒有對應的索引,則初始化一個索引
???index?=?Index{}
???c.indices[name]?=?index
??}
????//?循環(huán)所有的索引鍵
??for?_,?indexValue?:=?range?indexValues?{
??????//?得到索引鍵對應的對象鍵列表
???set?:=?index[indexValue]
???if?set?==?nil?{
????????//?沒有對象鍵列表則初始化一個空列表
????set?=?sets.String{}
????index[indexValue]?=?set
???}
??????//?將對象鍵插入到集合中,方便索引
???set.Insert(key)
??}
?}
}
//?deleteFromIndices?刪除對象索引
func?(c?*threadSafeMap)?deleteFromIndices(obj?interface{},?key?string)?{
??//?循環(huán)所有的索引器
?for?name,?indexFunc?:=?range?c.indexers?{
????//?獲取刪除對象的索引鍵列表
??indexValues,?err?:=?indexFunc(obj)
??if?err?!=?nil?{
???panic(fmt.Errorf("unable?to?calculate?an?index?entry?for?key?%q?on?index?%q:?%v",?key,?name,?err))
??}
????//?獲取當前索引器的索引
??index?:=?c.indices[name]
??if?index?==?nil?{
???continue
??}
????//?循環(huán)所有索引鍵
??for?_,?indexValue?:=?range?indexValues?{
??????//?獲取索引鍵對應的對象鍵列表
???set?:=?index[indexValue]
???if?set?!=?nil?{
????????//?從對象鍵列表中刪除當前要刪除的對象鍵
????set.Delete(key)
????//?如果當集合為空的時候不刪除set,那么具有高基數(shù)的短生命資源的 indices 會導致未使用的空集合隨時間增加內(nèi)存。
????????//?`kubernetes/kubernetes/issues/84959`.
????????if?len(set)?==?0?{
?????delete(index,?indexValue)
????}
???}
??}
?}
}
添加索引和刪除索引的實現(xiàn)都挺簡單的,其實主要還是要對 indices、indexs 這些數(shù)據(jù)結(jié)構(gòu)非常了解,這樣就非常容易了,我們可以將 indexFunc 當成當前對象的命名空間來看待,這樣對于上面的索引更新和刪除的理解就肯定沒問題了。
然后接下來就是幾個查詢相關(guān)的接口實現(xiàn):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?獲取對象
func?(c?*threadSafeMap)?Get(key?string)?(item?interface{},?exists?bool)?{
?c.lock.RLock()??//?只需要讀鎖
?defer?c.lock.RUnlock()
??//?直接從?map?中讀取值
?item,?exists?=?c.items[key]
?return?item,?exists
}
//?對象列舉
func?(c?*threadSafeMap)?List()?[]interface{}?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
?list?:=?make([]interface{},?0,?len(c.items))
?for?_,?item?:=?range?c.items?{
??list?=?append(list,?item)
?}
?return?list
}
//?返回?threadSafeMap?中所有的對象鍵列表
func?(c?*threadSafeMap)?ListKeys()?[]string?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
?list?:=?make([]string,?0,?len(c.items))
?for?key?:=?range?c.items?{
??list?=?append(list,?key)
?}
?return?list
}
//?替換所有對象,相當于重新構(gòu)建索引
func?(c?*threadSafeMap)?Replace(items?map[string]interface{},?resourceVersion?string)?{
?c.lock.Lock()
?defer?c.lock.Unlock()
??//?直接覆蓋之前的對象
?c.items?=?items
?//?重新構(gòu)建索引
?c.indices?=?Indices{}
?for?key,?item?:=?range?c.items?{
????//?更新元素的索引
??c.updateIndices(nil,?item,?key)
?}
}
然后接下來就是和索引相關(guān)的幾個接口實現(xiàn),第一個就是 Index 函數(shù):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?通過指定的索引器和對象獲取符合這個對象特征的所有對象
func?(c?*threadSafeMap)?Index(indexName?string,?obj?interface{})?([]interface{},?error)?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
??//?獲得索引器?indexName?的索引鍵計算函數(shù)
?indexFunc?:=?c.indexers[indexName]
?if?indexFunc?==?nil?{
??return?nil,?fmt.Errorf("Index?with?name?%s?does?not?exist",?indexName)
?}
??//?獲取指定?obj?對象的索引鍵
?indexedValues,?err?:=?indexFunc(obj)
?if?err?!=?nil?{
??return?nil,?err
?}
??//?獲得索引器?indexName?的所有索引
?index?:=?c.indices[indexName]
?
??//?用來存儲對象鍵的集合
?var?storeKeySet?sets.String
?if?len(indexedValues)?==?1?{
????//?大多數(shù)情況下只有一個值匹配(默認獲取的索引鍵就是對象的?namespace)
????//?直接拿到這個索引鍵的對象鍵集合
??storeKeySet?=?index[indexedValues[0]]
?}?else?{
????//?由于有多個索引鍵,則可能有重復的對象鍵出現(xiàn),索引需要去重
??storeKeySet?=?sets.String{}
????//?循環(huán)索引鍵
??for?_,?indexedValue?:=?range?indexedValues?{
??????//?循環(huán)索引鍵下面的對象鍵,因為要去重
???for?key?:=?range?index[indexedValue]?{
????storeKeySet.Insert(key)
???}
??}
?}
??//?拿到了所有的對象鍵集合過后,循環(huán)拿到所有的對象集合
?list?:=?make([]interface{},?0,?storeKeySet.Len())
?for?storeKey?:=?range?storeKeySet?{
??list?=?append(list,?c.items[storeKey])
?}
?return?list,?nil
}
這個 Index 函數(shù)就是獲取一個指定對象的索引鍵,然后把這個索引鍵下面的所有的對象全部獲取到,比如我們要獲取一個 Pod 所在命名空間下面的所有 Pod,如果更抽象一點,就是符合對象某些特征的所有對象,而這個特征就是我們指定的索引鍵函數(shù)計算出來的。然后接下來就是一個比較重要的 ByIndex 函數(shù)的實現(xiàn):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?和上面的?Index?函數(shù)類似,只是是直接指定的索引鍵
func?(c?*threadSafeMap)?ByIndex(indexName,?indexedValue?string)?([]interface{},?error)?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
??
??//?獲得索引器?indexName?的索引鍵計算函數(shù)
?indexFunc?:=?c.indexers[indexName]
?if?indexFunc?==?nil?{
??return?nil,?fmt.Errorf("Index?with?name?%s?does?not?exist",?indexName)
?}
??//?獲得索引器?indexName?的所有索引
?index?:=?c.indices[indexName]
??//?獲取指定索引鍵的所有所有對象鍵
?set?:=?index[indexedValue]
??//?然后根據(jù)對象鍵遍歷獲取對象
?list?:=?make([]interface{},?0,?set.Len())
?for?key?:=?range?set?{
??list?=?append(list,?c.items[key])
?}
?return?list,?nil
}
可以很清楚地看到 ByIndex 函數(shù)和 Index 函數(shù)比較類似,但是更簡單了,直接獲取一個指定的索引鍵的全部資源對象。然后是其他幾個索引相關(guān)的函數(shù):
//?k8s.io/client-go/tools/cache/thread_safe_store.go
//?IndexKeys?和上面的?ByIndex?幾乎是一樣的,只是這里是直接返回對象鍵列表
func?(c?*threadSafeMap)?IndexKeys(indexName,?indexedValue?string)?([]string,?error)?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
??//?獲取索引器?indexName?的索引鍵計算函數(shù)
?indexFunc?:=?c.indexers[indexName]
?if?indexFunc?==?nil?{
??return?nil,?fmt.Errorf("Index?with?name?%s?does?not?exist",?indexName)
?}
??//?獲取索引器?indexName?的所有索引
?index?:=?c.indices[indexName]
?//?直接獲取指定索引鍵的對象鍵集合
?set?:=?index[indexedValue]
?return?set.List(),?nil
}
//?獲取索引器下面的所有索引鍵
func?(c?*threadSafeMap)?ListIndexFuncValues(indexName?string)?[]string?{
?c.lock.RLock()
?defer?c.lock.RUnlock()
??//?獲取索引器?indexName?的所有索引
?index?:=?c.indices[indexName]
?names?:=?make([]string,?0,?len(index))
??//?遍歷索引得到索引鍵
?for?key?:=?range?index?{
??names?=?append(names,?key)
?}
?return?names
}
//?直接返回?indexers
func?(c?*threadSafeMap)?GetIndexers()?Indexers?{
?return?c.indexers
}
//?添加一個新的?Indexers
func?(c?*threadSafeMap)?AddIndexers(newIndexers?Indexers)?error?{
?c.lock.Lock()
?defer?c.lock.Unlock()
?if?len(c.items)?>?0?{
??return?fmt.Errorf("cannot?add?indexers?to?running?index")
?}
??//?獲取舊的索引器和新的索引器keys
?oldKeys?:=?sets.StringKeySet(c.indexers)
?newKeys?:=?sets.StringKeySet(newIndexers)
??
??//?如果包含新的索引器,則提示沖突
?if?oldKeys.HasAny(newKeys.List()...)?{
??return?fmt.Errorf("indexer?conflict:?%v",?oldKeys.Intersection(newKeys))
?}
??//?將新的索引器添加到?Indexers?中
?for?k,?v?:=?range?newIndexers?{
??c.indexers[k]?=?v
?}
?return?nil
}
//?沒有真正實現(xiàn)?Resync?操作?
func?(c?*threadSafeMap)?Resync()?error?{
?return?nil
}
這里我們就將 ThreadSafeMap 的實現(xiàn)進行了分析說明。整體來說比較方便,一個就是將對象數(shù)據(jù)存入到一個 map 中,然后就是維護索引,方便根據(jù)索引來查找到對應的對象。
cache
接下來再回過頭去看 cache 的實現(xiàn)就非常簡單了,因為 cache 就是對 ThreadSafeStore 的一個再次封裝,很多操作都是直接調(diào)用的 ThreadSafeStore 的操作實現(xiàn)的,如下所示:
//?k8s.io/client-go/tools/cache/store.go
//?Add?插入一個元素到?cache?中
func?(c?*cache)?Add(obj?interface{})?error?{
?key,?err?:=?c.keyFunc(obj)??//?生成對象鍵
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
??//?將對象添加到底層的?ThreadSafeStore?中
?c.cacheStorage.Add(key,?obj)
?return?nil
}
//?更新cache中的對象
func?(c?*cache)?Update(obj?interface{})?error?{
?key,?err?:=?c.keyFunc(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?c.cacheStorage.Update(key,?obj)
?return?nil
}
//?刪除cache中的對象
func?(c?*cache)?Delete(obj?interface{})?error?{
?key,?err?:=?c.keyFunc(obj)
?if?err?!=?nil?{
??return?KeyError{obj,?err}
?}
?c.cacheStorage.Delete(key)
?return?nil
}
//?得到cache中所有的對象
func?(c?*cache)?List()?[]interface{}?{
?return?c.cacheStorage.List()
}
//?得到cache中所有的對象鍵
func?(c?*cache)?ListKeys()?[]string?{
?return?c.cacheStorage.ListKeys()
}
//?得到cache中的Indexers
func?(c?*cache)?GetIndexers()?Indexers?{
?return?c.cacheStorage.GetIndexers()
}
//?得到對象obj與indexName索引器關(guān)聯(lián)的所有對象
func?(c?*cache)?Index(indexName?string,?obj?interface{})?([]interface{},?error)?{
?return?c.cacheStorage.Index(indexName,?obj)
}
func?(c?*cache)?IndexKeys(indexName,?indexKey?string)?([]string,?error)?{
?return?c.cacheStorage.IndexKeys(indexName,?indexKey)
}
func?(c?*cache)?ListIndexFuncValues(indexName?string)?[]string?{
?return?c.cacheStorage.ListIndexFuncValues(indexName)
}
func?(c?*cache)?ByIndex(indexName,?indexKey?string)?([]interface{},?error)?{
?return?c.cacheStorage.ByIndex(indexName,?indexKey)
}
func?(c?*cache)?AddIndexers(newIndexers?Indexers)?error?{
?return?c.cacheStorage.AddIndexers(newIndexers)
}
func?(c?*cache)?Get(obj?interface{})?(item?interface{},?exists?bool,?err?error)?{
?key,?err?:=?c.keyFunc(obj)
?if?err?!=?nil?{
??return?nil,?false,?KeyError{obj,?err}
?}
?return?c.GetByKey(key)
}
func?(c?*cache)?GetByKey(key?string)?(item?interface{},?exists?bool,?err?error)?{
?item,?exists?=?c.cacheStorage.Get(key)
?return?item,?exists,?nil
}
//?替換cache中所有的對象
func?(c?*cache)?Replace(list?[]interface{},?resourceVersion?string)?error?{
?items?:=?make(map[string]interface{},?len(list))
?for?_,?item?:=?range?list?{
??key,?err?:=?c.keyFunc(item)
??if?err?!=?nil?{
???return?KeyError{item,?err}
??}
??items[key]?=?item
?}
?c.cacheStorage.Replace(items,?resourceVersion)
?return?nil
}
func?(c?*cache)?Resync()?error?{
?return?nil
}
可以看到 cache 沒有自己獨特的實現(xiàn)方式,都是調(diào)用的包含的 ThreadSafeStore 操作接口。
總結(jié)
前面我們已經(jīng)知道了 Reflector 通過 ListAndWatch 把數(shù)據(jù)傳入 DeltaFIFO 后,經(jīng)過 DeltaFIFO 的 Pop 函數(shù)將資源對象存入到了本地的一個存儲 Indexer 中,而這個底層真正的存儲其實就是上面的 ThreadSafeStore。
要理解 Indexer 組件,最主要就是要把索引、索引器(索引分類)、索引鍵、對象鍵這幾個概念弄清楚,有時候確實容易混亂,我們將上面的示例理解了應該就很好理解了,比如我們按照命名空間來進行索引,可以簡單的理解為這個 Indexer 就是簡單的把相同命名空間的對象放在一個集合中,然后基于命名空間來查找對象。
K8S進階訓練營,點擊下方圖片了解詳情

