kubernetes client-go源碼閱讀10之Informer
只要讀k8s源代碼一定會(huì)讀informer的代碼的,因?yàn)閕nformer相當(dāng)優(yōu)秀,大多數(shù)分布式項(xiàng)目(比如OpenStack)在解決組件間通信的問(wèn)題時(shí)都會(huì)選擇如kafka,rabbitmaq之類的消息隊(duì)列,但是k8s不走尋常路,選擇了自己解決,解決的方案是informer。
假設(shè)我們沒(méi)有informer,那么我們應(yīng)該如何從api server獲取數(shù)據(jù)?
一般而言,我們有兩種方式, 一是全量獲取,二是增量獲取,兩種都各有優(yōu)缺點(diǎn),前者優(yōu)點(diǎn)是,每次可以獲取全量的最新狀態(tài), 邏輯簡(jiǎn)單,但是缺點(diǎn)很明顯,如果請(qǐng)求頻次過(guò)于頻繁,就會(huì)有比較大的性能消耗, 如果頻次過(guò)低就不夠?qū)崟r(shí),但是依舊有比較大的性能消耗,想象一個(gè)100節(jié)點(diǎn)的集群,1000個(gè)deployment, 1000個(gè)ReplicaSet, 5000千個(gè)pod, 加個(gè)每個(gè)對(duì)象都只占5k, 就接近50MB, 這顯然會(huì)占用比較多的帶寬,這是讓人難以接受的,而且數(shù)據(jù)的時(shí)效性不夠高也是難以接受的,所以對(duì)于一個(gè)中大型集群而言,不能使用這種方式。
第二種方式是增量獲取更新,這種方式的優(yōu)點(diǎn)是時(shí)效性高占用資源低,但是相較于第一種方式而言,實(shí)現(xiàn)起來(lái)稍顯復(fù)雜,復(fù)雜度在于兩點(diǎn),一是我們需要有健壯的容錯(cuò)機(jī)制,比如出錯(cuò)怎么辦? 如果跳過(guò)可能導(dǎo)致狀態(tài)不一致, 比如漏掉一個(gè)更新的請(qǐng)求, 那么對(duì)應(yīng)的資源一直得不到正確的處理, 所以我們需要一種重試機(jī)制, 二是, 我們需要緩存全量的數(shù)據(jù)用于快速的檢索, 比如定時(shí)輪訓(xùn)的檢查資源,但我們不可能總是等收到增量更新才開始業(yè)務(wù)邏輯,所以增量更新的邏輯比較復(fù)雜, 并且增量更新不能單獨(dú)存在, 因?yàn)槲覀冃枰康馁Y源, 所以需要配合第一種方式。
那么怎么平衡這兩種獲取資源的方式呢? k8s的選擇是,**我全都要!!!**。

快速入門
一般來(lái)說(shuō)informer會(huì)跟workque, controller在一起,這點(diǎn)從k8s的源代碼可以很明顯的看到,不過(guò)為了簡(jiǎn)單起見,這里只看informer的部分。
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
var kubeconfig *string
var master string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&master, "master", "", "master url")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
if err != nil {
klog.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
// 1.
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 2.
_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 3.
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Add: ", key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Update: ", key)
}
},
DeleteFunc: func(obj interface{}) {
// 4.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Delete: ", key)
}
},
})
// 5.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// 6.
informer.Run(ctx.Done())
}
輸出如下:
2023-06-03 14:19:21 Add: default/example-fcsjwbzzf2
2023-06-03 14:20:21 Update: default/example-fcsjwbzzf2
2023-06-03 14:21:21 Update: default/example-fcsjwbzzf2
注意: 每分鐘以Update的形式再次調(diào)用UpdateFunc
實(shí)名吐槽Golang的時(shí)間格式化!!!
代碼分解如下:
- 創(chuàng)建
ListWatch對(duì)象,用于獲取資源最新列表及后續(xù)更新 - 創(chuàng)建informer對(duì)象,傳入必要的參數(shù)
- 注冊(cè)各種回調(diào)函數(shù), 如AddFunc等
- 刪除的對(duì)象和其他對(duì)象不同,所以需要不同的方法來(lái)獲取key
- 設(shè)置退出信號(hào)量,k8s的慣用操作了
- 啟動(dòng)informer
通過(guò)上面的代碼可以知道,創(chuàng)建informer有兩件比較重要的事情,一是創(chuàng)建ListWatch,二是注冊(cè)回調(diào)函數(shù)。
ListWatch
ListWatch就如名字指明的那樣,List,Watch,前者是拉取指定資源的資源列表,比如default命名空間下的所有Pod資源,Watch是在前者拉取完成之后開始監(jiān)聽之后所有的資源變化(前者會(huì)得到一個(gè)版本號(hào),watch可以借助這個(gè)版本號(hào),只獲取版本號(hào)之后的資源),比如新增,更新,刪除等變化。
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
// 1.
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
// 2.
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
// 3.
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
代碼分解如下:
- 設(shè)置Options, Options以函數(shù)的形式傳入也是k8s一個(gè)比較常用的模式了。
- 簡(jiǎn)單的在靜態(tài)客戶端的
Get方法上包裝一層函數(shù) - 簡(jiǎn)單的在靜態(tài)客戶端的
Watch方法上包裝一層函數(shù)
可以看到ListWatch的內(nèi)部構(gòu)造并不復(fù)雜,僅僅是將Get和Watch方法組合起來(lái)而已。
Informer
因?yàn)楸疚闹饕治鰅nformer,所以會(huì)略過(guò)其中Store的部分,我們暫且將其作為一個(gè)存儲(chǔ)的黑盒子即可,以后有文章再詳細(xì)說(shuō)明。
_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
// 略過(guò)代碼部分
}
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// 1.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// 2.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
// 3.
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
代碼分解如下:
- 創(chuàng)建一個(gè)Store, 它用來(lái)存儲(chǔ)informer獲取到的資源
- 將Store再包裝一層(k8s的傳統(tǒng)操作了),提供先入先出(fifo)的功能
- informer處理的主函數(shù),根據(jù)對(duì)象類型調(diào)用對(duì)應(yīng)的回調(diào)函數(shù),以及將對(duì)象更新到綁定的Store
這里有一個(gè)值得注意的點(diǎn),informer是一個(gè)符合Controller接口的對(duì)象,閱讀過(guò)k8s源代碼或者寫過(guò)operator的對(duì)controller應(yīng)該不會(huì)陌生,這是k8s比較重要的對(duì)象了,或者說(shuō)模式。
總的來(lái)說(shuō),informer的初始化過(guò)程還是比較清晰的,主要分為兩步,創(chuàng)建隊(duì)列(fifo),配置處理邏輯(Process),既然初始化不復(fù)雜,那么復(fù)雜的就是Run方法。
Run
那么看看informer怎么運(yùn)行的吧
informer.Run(ctx.Done())
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// 2.
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 3.
wg.StartWithChannel(stopCh, r.Run)
// 4.
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
代碼分解如下:
- 創(chuàng)建Reflector, reflector負(fù)責(zé)和apiserver通信,不斷的將數(shù)據(jù)同步給informer
- 配置Reflector的各項(xiàng)參數(shù)
- 啟動(dòng)Reflector
- 啟動(dòng)informer的主循環(huán)
由于processLoop比較簡(jiǎn)單,我們先看看它的源代碼。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
代碼很簡(jiǎn)單,應(yīng)該不需要特別的說(shuō)明,就是傳入之前的Process方法用于處理隊(duì)列傳入的各個(gè)對(duì)象。
Reflector
Reflector的初始化并不復(fù)雜,代碼如下
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// 省略其他代碼
r.setExpectedType(expectedType)
return r
}
func (r *Reflector) setExpectedType(expectedType interface{}) {
r.expectedType = reflect.TypeOf(expectedType)
if obj, ok := expectedType.(*unstructured.Unstructured); ok {
gvk := obj.GroupVersionKind()
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
}
}
上面的代碼唯一值得提的是setExpectedType,k8s的對(duì)象總是要知道gvk的。
然后就是Reflector的運(yùn)行邏輯
func (r *Reflector) Run(stopCh <-chan struct{}) {
// 1.
wait.BackoffUntil(func() {
// 2.
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
}
代碼分解如下:
- client-go提供的重試幫助函數(shù),只要沒(méi)有收到終止信號(hào)就會(huì)不斷的重試傳入的方法
- List And Watch, 獲取列表并監(jiān)聽資源更新
重頭戲就是ListAndWatch了。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
// 1.
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
// 2.
if err := func() error {
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
// 3.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 4.
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh)
}()
// 5.
items, err := meta.ExtractList(list)
// 6.
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// 7.
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 8.
go func() {
resyncCh, cleanup := r.resyncChan()
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 9.
for {
// 10.
w, err := r.listerWatcher.Watch(options)
// 11.
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
return nil
}
}
}
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
代碼分解如下:
- 設(shè)置資源版本號(hào)
ResourceVersion, k8s提供了一種以版本號(hào)過(guò)濾的資源的方式,如果是首次,那么是0,如果因?yàn)榫W(wǎng)絡(luò)等原因重試,就可以增量的獲取遺落的資源列表,而不需再次全量的獲取一遍 - 將List的邏輯放在一個(gè)匿名函數(shù)中統(tǒng)一處理錯(cuò)誤,常見操作了。
- 構(gòu)建一個(gè)分頁(yè)器,分批獲取資源列表。
- 開始獲取,這里的List其實(shí)就是調(diào)用之前傳入的
lw.List。 - 獲取列表,這一步會(huì)檢查列表對(duì)象是否合法以及做一定的轉(zhuǎn)換。
- 將數(shù)據(jù)同步到Store,使用它的
Replace方法,這可以在上面源代碼的最后看到具體操作。 - 設(shè)置
ResourceVersion,如果后續(xù)出錯(cuò),就可以從這個(gè)資源版本開始了 - resync, 就是將Store里面的數(shù)據(jù)以Update的事件形式再次傳入informer,會(huì)觸發(fā)UpdateFunc回調(diào)函數(shù)。
- 監(jiān)聽的循環(huán)
- 通過(guò)之前傳入的lw的Watch方法,獲得
watch.Interface,這個(gè)接口會(huì)不斷的給出變更對(duì)象 - 處理上一步傳來(lái)的事件。
最后就是Reflector的核心方法了。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
// 1.
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
// 2.
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
// 3.
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
return nil
}
代碼分解如下:
- 獲取監(jiān)聽到的事件
- 判斷事件是否正常,是否符合預(yù)期的GVK等
- 不同的事件以不同方法更新,這樣可以觸發(fā)不同的回調(diào)函數(shù)
總的來(lái)說(shuō),reflector做的事情就是將數(shù)據(jù)更新到Store里面,而Informer會(huì)不斷的從Store里面讀取數(shù)據(jù),當(dāng)讀到數(shù)據(jù)后就調(diào)用對(duì)應(yīng)的回調(diào)函數(shù)。
總結(jié)
Informer是k8s里面非常重要的數(shù)據(jù)同步機(jī)制,理解了Informer就可以很容易找到k8s相關(guān)組件的主要業(yè)務(wù)邏輯了,可以想象的到,業(yè)務(wù)邏輯一定注冊(cè)在回調(diào)函數(shù)中,不過(guò)真實(shí)的代碼要多了一層抽象,因?yàn)閗8s源代碼里回調(diào)函數(shù)邏輯一般是判斷一下就扔進(jìn)workqueue了。
client-go的代碼部分差不多結(jié)束了,后面閱讀k8s的源代碼。
