KEDA 工作原理解析
文章中源碼是基于KEDA 2.0( 50bec80)來進(jìn)行分析
keda 2.0 安裝要求k8s集群版本 >=1.16
KEDA 在2020年11月4號(hào)release了2.0版本,包含了一些新的比較有用的特性,比如ScaledObject/ScaledJob中支持多觸發(fā)器、支持HPA原始的CPU、Memory scaler等。
具體的安裝使用請(qǐng)參考上一篇文章使用keda完成基于事件的彈性伸縮,這篇文章主要深入的看下KEDA內(nèi)部機(jī)制以及是如何工作的。
我們先提出幾個(gè)問題,帶著問題去看代碼,方便我們理解整個(gè)機(jī)制:
KEDA是如何獲取到多種事件的指標(biāo),以及如何判斷擴(kuò)縮容的? KEDA是如何做到將應(yīng)用的副本數(shù)縮容0,依據(jù)是什么?
代碼結(jié)構(gòu)
對(duì)一些主要目錄說明,其他一些MD文件主要是文字說明:
├──?BRANDING.md
├──?BUILD.md????//如何在本地編譯和運(yùn)行
├──?CHANGELOG.md
├──?CONTRIBUTING.md???//如何參與貢獻(xiàn)次項(xiàng)目
├──?CREATE-NEW-SCALER.md
├──?Dockerfile
├──?Dockerfile.adapter
├──?GOVERNANCE.md
├──?LICENSE
├──?MAINTAINERS.md
├──?Makefile????//?構(gòu)建編譯相關(guān)命令
├──?PROJECT
├──?README.md
├──?RELEASE-PROCESS.MD
├──?adapter??//?keda-metrics-apiserver?組件入口
├──?api?//?自定義資源定義,例如ScaledObject的定義
├──?bin?
├──?config?//組件yaml資源,通過kustomization工具生成
├──?controllers?//kubebuilder?中controller?代碼控制crd資源
├──?go.mod
├──?go.sum
├──?hack
├──?images
├──?main.go?//keda-operator??controller入口
├──?pkg???//包含組件核心代碼實(shí)現(xiàn)
├──?tests?//e2e測(cè)試
├──?tools??
├──?vendor
└──?version
keda中主要是兩個(gè)組件keda-operator以及keda-metrics-apiserver。
keda-operator :負(fù)責(zé)創(chuàng)建/更新HPA以及通過Loop控制應(yīng)用副本數(shù) keda-metrics-apiserver:實(shí)現(xiàn)external-metrics接口,以對(duì)接給HPA的external類型的指標(biāo)查詢(比如各種prometheus指標(biāo),mysql等)
keda-operator
項(xiàng)目中用到了kubebuilder SDK,用來完成這個(gè)Operator的編寫。
對(duì)于k8s中的自定義controller不了解的可以看看這邊文章:如何在Kubernetes中創(chuàng)建一個(gè)自定義Controller?。
keda controller的主要流程,畫了幅圖:
組件啟動(dòng)入口在于main.go文件中:
通過controller-runtime組件啟動(dòng)兩個(gè)自定義controller:ScaledObjectReconciler,ScaledJobReconciler:
mgr,?err?:=?ctrl.NewManager(ctrl.GetConfigOrDie(),?ctrl.Options{
??Scheme:?????????????????scheme,
??MetricsBindAddress:?????metricsAddr,
??HealthProbeBindAddress:?":8081",
??Port:???????????????????9443,
??LeaderElection:?????????enableLeaderElection,
??LeaderElectionID:???????"operator.keda.sh",
?})
...
//?Add?readiness?probe?
err?=?mgr.AddReadyzCheck("ready-ping",?healthz.Ping)
...
//?Add?liveness?probe
err?=?mgr.AddHealthzCheck("health-ping",?healthz.Ping)
....
//注冊(cè)?ScaledObject?處理的controller
if?err?=?(&controllers.ScaledObjectReconciler{
?Client:?mgr.GetClient(),
?Log:????ctrl.Log.WithName("controllers").WithName("ScaledObject"),
?Scheme:?mgr.GetScheme(),
}).SetupWithManager(mgr);?err?!=?nil?{
?setupLog.Error(err,?"unable?to?create?controller",?"controller",?"ScaledObject")
?os.Exit(1)
}
////注冊(cè)?ScaledJob?處理的controller
if?err?=?(&controllers.ScaledJobReconciler{
?Client:?mgr.GetClient(),
?Log:????ctrl.Log.WithName("controllers").WithName("ScaledJob"),
?Scheme:?mgr.GetScheme(),
}).SetupWithManager(mgr);?err?!=?nil?{
?setupLog.Error(err,?"unable?to?create?controller",?"controller",?"ScaledJob")
?os.Exit(1)
}
if?err?:=?mgr.Start(ctrl.SetupSignalHandler());?err?!=?nil?{
?setupLog.Error(err,?"problem?running?manager")
?os.Exit(1)
}
ScaledObjectReconciler 處理
我們主要關(guān)注Reconcile方法,當(dāng)ScaledObject發(fā)生變化時(shí)將會(huì)觸發(fā)該方法:方法內(nèi)部主要功能實(shí)現(xiàn):
...
//?處理刪除ScaledObject的情況
if?scaledObject.GetDeletionTimestamp()?!=?nil?{
??????//進(jìn)入垃圾回收(比如停止goroutine中Loop,恢復(fù)原有副本數(shù))
?return?ctrl.Result{},?r.finalizeScaledObject(reqLogger,?scaledObject)
}
//?給ScaledObject資源加上Finalizer:finalizer.keda.sh
if?err?:=?r.ensureFinalizer(reqLogger,?scaledObject);?err?!=?nil?{
?return?ctrl.Result{},?err
}
...
//?真正處理ScaledObject資源
msg,?err?:=?r.reconcileScaledObject(reqLogger,?scaledObject)
//?設(shè)置Status字段說明
conditions?:=?scaledObject.Status.Conditions.DeepCopy()
if?err?!=?nil?{
?reqLogger.Error(err,?msg)
?conditions.SetReadyCondition(metav1.ConditionFalse,?"ScaledObjectCheckFailed",?msg)
?conditions.SetActiveCondition(metav1.ConditionUnknown,?"UnkownState",?"ScaledObject?check?failed")
}?else?{
?reqLogger.V(1).Info(msg)
?conditions.SetReadyCondition(metav1.ConditionTrue,?"ScaledObjectReady",?msg)
}
kedacontrollerutil.SetStatusConditions(r.Client,?reqLogger,?scaledObject,?&conditions)
return?ctrl.Result{},?err
r.reconcileScaledObject方法:
這個(gè)方法中主要兩個(gè)動(dòng)作:
ensureHPAForScaledObjectExists創(chuàng)建HPA資源進(jìn)入 requestScaleLoop(不斷的檢測(cè)scaler 是否active,進(jìn)行副本數(shù)的修改)
ensureHPAForScaledObjectExists
通過跟蹤進(jìn)入到newHPAForScaledObject方法:
scaledObjectMetricSpecs,?err?:=?r.getScaledObjectMetricSpecs(logger,?scaledObject)
...省略代碼
hpa?:=?&autoscalingv2beta2.HorizontalPodAutoscaler{
?Spec:?autoscalingv2beta2.HorizontalPodAutoscalerSpec{
??MinReplicas:?getHPAMinReplicas(scaledObject),
??MaxReplicas:?getHPAMaxReplicas(scaledObject),
??Metrics:?????scaledObjectMetricSpecs,
??Behavior:????behavior,
??ScaleTargetRef:?autoscalingv2beta2.CrossVersionObjectReference{
???Name:???????scaledObject.Spec.ScaleTargetRef.Name,
???Kind:???????gvkr.Kind,
???APIVersion:?gvkr.GroupVersion().String(),
??}},
?ObjectMeta:?metav1.ObjectMeta{
??Name:??????getHPAName(scaledObject),
??Namespace:?scaledObject.Namespace,
??Labels:????labels,
?},
?TypeMeta:?metav1.TypeMeta{
??APIVersion:?"v2beta2",
?},
}
可以看到創(chuàng)建ScalerObject其實(shí)最終也是創(chuàng)建了HPA,其實(shí)還是通過HPA本身的特性來控制應(yīng)用的彈性伸縮。
其中g(shù)etScaledObjectMetricSpecs方法中就是獲取到triggers中的metrics指標(biāo)。
這里有區(qū)分一下External的metrics和resource metrics,因?yàn)镃PU/Memory scaler是通過resource metrics 來獲取的。
requestScaleLoop
requestScaleLoop方法中用來循環(huán)check Scaler中的IsActive狀態(tài)并作出對(duì)應(yīng)的處理,比如修改副本數(shù),直接來看最終的處理吧:這里有兩種模型來觸發(fā)RequestScale:
Pull模型:即主動(dòng)的調(diào)用scaler 中的 IsActive方法Push模型:由Scaler來觸發(fā), PushScaler多了一個(gè)Run方法,通過channel傳入active狀態(tài)。
IsActive是由Scaler實(shí)現(xiàn)的,比如對(duì)于prometheus來說,可能指標(biāo)為0則為false
這個(gè)具體的scaler實(shí)現(xiàn)后續(xù)再講,我們來看看RequestScale做了什么事:
//當(dāng)前副本數(shù)為0,并是所有scaler屬于active狀態(tài),則修改副本數(shù)為MinReplicaCount?或?1
if?currentScale.Spec.Replicas?==?0?&&?isActive?{
?e.scaleFromZero(ctx,?logger,?scaledObject,?currentScale)
}?else?if?!isActive?&&
?currentScale.Spec.Replicas?>?0?&&
?(scaledObject.Spec.MinReplicaCount?==?nil?||?*scaledObject.Spec.MinReplicaCount?==?0)?{
???//?所有scaler都處理not?active狀態(tài),并且當(dāng)前副本數(shù)大于0,且MinReplicaCount設(shè)定為0
???//?則縮容副本數(shù)為0
?e.scaleToZero(ctx,?logger,?scaledObject,?currentScale)
}?else?if?!isActive?&&
?scaledObject.Spec.MinReplicaCount?!=?nil?&&
?currentScale.Spec.Replicas?*scaledObject.Spec.MinReplicaCount?{
??//?所有scaler都處理not?active狀態(tài),并且當(dāng)前副本數(shù)小于MinReplicaCount,則修改為MinReplicaCount
?currentScale.Spec.Replicas?=?*scaledObject.Spec.MinReplicaCount
?err?:=?e.updateScaleOnScaleTarget(ctx,?scaledObject,?currentScale)
?....
}?else?if?isActive?{
//??處理active狀態(tài),并且副本數(shù)大于0,則更新LastActiveTime
?e.updateLastActiveTime(ctx,?logger,?scaledObject)
}?else?{
//?不處理
?logger.V(1).Info("ScaleTarget?no?change")
}
ScaledJobReconciler 處理
ScaledJobReconciler相比ScalerObject少了創(chuàng)建HPA的步驟,其余的步驟主要是通過checkScaledJobScalers,RequestJobScale兩個(gè)方法來判斷Job創(chuàng)建:
checkScaledJobScalers方法,用于計(jì)算isActive,maxValue的值RequestJobScale方法,用于負(fù)責(zé)創(chuàng)建Job,里面還涉及到三種擴(kuò)容策略
這里直接看代碼吧,不貼代碼了。
如何停止Loop
這里有個(gè)問題就是startPushScalers和startScaleLoop都是在Goroutine中處理的,所以當(dāng)ScaleObject/ScalerJob被刪除的時(shí)候,這里需要能夠被刪除,這里就用到了context.Cancel方法,在Goroutine啟動(dòng)的時(shí)候就將,context保存在scaleLoopContexts *sync.Map中(如果已經(jīng)有了,就先Cancel一次),在刪除資源的時(shí)候,進(jìn)行刪除:
func?(h?*scaleHandler)?DeleteScalableObject(scalableObject?interface{})?error?{
?withTriggers,?err?:=?asDuckWithTriggers(scalableObject)
?if?err?!=?nil?{
??h.logger.Error(err,?"error?duck?typing?object?into?withTrigger")
??return?err
?}
?key?:=?generateKey(withTriggers)
?result,?ok?:=?h.scaleLoopContexts.Load(key)
?if?ok?{
??cancel,?ok?:=?result.(context.CancelFunc)
??if?ok?{
???cancel()
??}
??h.scaleLoopContexts.Delete(key)
?}?else?{
??h.logger.V(1).Info("ScaleObject?was?not?found?in?controller?cache",?"key",?key)
?}
?return?nil
}
ps: 這里的妙啊,學(xué)到了
keda-metrics-apiserver
keda-metrics-apiserver實(shí)現(xiàn)了ExternalMetricsProvider接口:
type?ExternalMetricsProvider?interface?{
???GetExternalMetric(namespace?string,?metricSelector?labels.Selector,?info?ExternalMetricInfo)?(*external_metrics.ExternalMetricValueList,?error)
???ListAllExternalMetrics()?[]ExternalMetricInfo
}
GetExternalMetric 用于返回Scaler的指標(biāo),調(diào)用 scaler.GetMetrics方法ListAllExternalMetrics 返回所有支持的external metrics,例如prometheus,mysql等
當(dāng)代碼寫好之后,再通過apiservice注冊(cè)到apiservier上(當(dāng)然還涉及到鑒權(quán),這里不啰嗦了):
apiVersion:?apiregistration.k8s.io/v1
kind:?APIService
metadata:
??labels:
????app.kubernetes.io/name:?v1beta1.external.metrics.k8s.io
????app.kubernetes.io/version:?latest
????app.kubernetes.io/part-of:?keda-operator
??name:?v1beta1.external.metrics.k8s.io
spec:
??service:
????name:?keda-metrics-apiserver
????namespace:?keda
??group:?external.metrics.k8s.io
??version:?v1beta1
??insecureSkipTLSVerify:?true
??groupPriorityMinimum:?100
??versionPriority:?100
實(shí)現(xiàn)一個(gè)Scaler
其實(shí)有兩種Scaler,即上面將的一個(gè)pull,一個(gè)push的模型,PushScaler多了一個(gè)Run方法:
實(shí)現(xiàn)一個(gè)Scaler,主要實(shí)現(xiàn)以下接口:
//?Scaler?interface
type?Scaler?interface?{
?//?返回external_metrics.ExternalMetricValue對(duì)象,其實(shí)就是用于?keda-metrics-apiserver中獲取到scaler的指標(biāo)
?GetMetrics(ctx?context.Context,?metricName?string,?metricSelector?labels.Selector)?([]external_metrics.ExternalMetricValue,?error)
?//?返回v2beta2.MetricSpec?結(jié)構(gòu),主要用于ScalerObject描述創(chuàng)建HPA的類型和Target指標(biāo)等
?GetMetricSpecForScaling()?[]v2beta2.MetricSpec
??????//?返回該Scaler是否Active,可能會(huì)影響Loop中直接修改副本數(shù)
?IsActive(ctx?context.Context)?(bool,?error)
?//調(diào)用完一次上面的方法就會(huì)調(diào)用一次Close?
?Close()?error
}
//?PushScaler?interface
type?PushScaler?interface?{
?Scaler
?//?通過scaler實(shí)現(xiàn)Run方法,往active?channel中,寫入值,而非上面的直接調(diào)用IsActive放回
?Run(ctx?context.Context,?active?chan<-?bool)
}
總結(jié)
回過頭來我們解答下在開頭留下的問題:
KEDA是如何獲取到多種事件的指標(biāo),以及如何判斷擴(kuò)縮容的?
答:keda controler中生成了external 類型的hpa,并且實(shí)現(xiàn)了external metrics 的api,所以其實(shí)keda只是對(duì)hpa的一個(gè)更高級(jí)的抽象。
KEDA是如何做到將應(yīng)用的副本數(shù)縮容0,依據(jù)是什么?
答:keda 內(nèi)部有個(gè)loop,不斷的check isActive狀態(tài),會(huì)主動(dòng)的修改應(yīng)用副本
?點(diǎn)擊屏末?|?閱讀原文?|?即刻學(xué)習(xí)