k8s調(diào)度器啟動流程分析 | 視頻文字稿

本文主要對 Kubernetes 調(diào)度器(v1.19.3版本)初始化啟動過程進(jìn)行分析。kube-scheduler 組件有很多可以配置的啟動參數(shù),其核心也是通過 cobra 開發(fā)的一個 CLI 工具,所以要掌握 kube-scheduler 的啟動配置,需要我們對 cobra 有一個基本的了解,kube-scheduler 主要有兩種類型的配置參數(shù):
調(diào)度策略相關(guān)的參數(shù),例如啟用那些調(diào)度插件,以及給某些調(diào)度插件配置一些參數(shù) 通用參數(shù),就是一些普通的參數(shù),比如配置服務(wù)端口號等等
這里我們主要是了解調(diào)度器的核心調(diào)度框架和算法,所以主要關(guān)注第一種參數(shù)即可。
參數(shù)配置
kube-scheduler 的啟動入口位于 cmd/kube-scheduler/scheduler.go 文件,該文件中就包含一個 main 入口函數(shù):
// cmd/kube-scheduler/scheduler.go
func main() {
rand.Seed(time.Now().UnixNano())
// 初始化 Cobra.Command 對象
command := app.NewSchedulerCommand()
// 將命令行參數(shù)進(jìn)行標(biāo)準(zhǔn)化(_替換成-)
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 初始化日志
logs.InitLogs()
defer logs.FlushLogs()
// 執(zhí)行命令
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
其中最核心的就是通過 app.NewSchedulerCommand() 或者一個 Cobra 的 Command 對象,然后最下面調(diào)用 command.Execute() 函數(shù)執(zhí)行這個命令,所以核心就是 NewSchedulerCommand 函數(shù)的實(shí)現(xiàn):
// cmd/kube-scheduler/app/server.go
// Option 配置一個 framework.Registry
type Option func(runtime.Registry) error
// NewSchedulerCommand 使用默認(rèn)參數(shù)和 registryOptions 創(chuàng)建一個 *cobra.Command 對象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
// 獲取默認(rèn)的配置信息
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `......`,
// 真正執(zhí)行的函數(shù)入口
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
......
}
......
return cmd
}
如果我們熟悉 Cobra 的基本用法的話應(yīng)該知道當(dāng)我們執(zhí)行 Cobra 的命令的時候,實(shí)際上真正執(zhí)行的是 Cobra.Command 對象中的 Run 函數(shù),也就是這里的 runCommand 函數(shù):
// cmd/kube-scheduler/app/server.go
if err := runCommand(cmd, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
其中有兩個非常重要的參數(shù) opts 與 registryOptions,opts 是一個 Options 對象,該參數(shù)包含所有的運(yùn)行 Scheduler 需要的參數(shù):
// cmd/kube-scheduler/app/options/options.go
// Options 擁有所有運(yùn)行 Scheduler 需要的參數(shù)
type Options struct {
// 默認(rèn)值,如果設(shè)置了 ConfigFile 或 InsecureServing 中的值,這些設(shè)置將被覆蓋
// KubeSchedulerConfiguration 類似與 Deployment 都是k8s的資源對象,這是這個對象是用于配置調(diào)度器使用的
ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration
SecureServing *apiserveroptions.SecureServingOptionsWithLoopback
// 可為 Healthz 和 metrics 配置兩個不安全的標(biāo)志
CombinedInsecureServing *CombinedInsecureServingOptions
Authentication *apiserveroptions.DelegatingAuthenticationOptions
Authorization *apiserveroptions.DelegatingAuthorizationOptions
Metrics *metrics.Options
Logs *logs.Options
Deprecated *DeprecatedOptions
// ConfigFile 指定是調(diào)度程序服務(wù)的配置文件的位置
ConfigFile string
// WriteConfigTo 將默認(rèn)配置寫入的文件路徑
WriteConfigTo string
Master string
}
其中第一個參數(shù) ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration 是我們需要重點(diǎn)關(guān)注的用于配置調(diào)度策略相關(guān)參數(shù)的地方,通過 NewOptions() 來獲取默認(rèn)配置參數(shù):
// cmd/kube-scheduler/app/options/options.go
// NewOptions 返回一個默認(rèn)的調(diào)度器應(yīng)用 options 參數(shù)。
func NewOptions() (*Options, error) {
cfg, err := newDefaultComponentConfig()
if err != nil {
return nil, err
}
......
o := &Options{
ComponentConfig: *cfg,
......
}
......
return o, nil
}
上面是初始化 Options 的函數(shù),這里我們只關(guān)心核心的 ComponentConfig 參數(shù),該參數(shù)是通過函數(shù) newDefaultComponentConfig() 來生成默認(rèn)的配置:
// cmd/kube-scheduler/app/options/options.go
func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
versionedCfg := kubeschedulerconfigv1beta1.KubeSchedulerConfiguration{}
// 可用于配置是否開啟 Debug 相關(guān)特性,比如 profiling
versionedCfg.DebuggingConfiguration = *configv1alpha1.NewRecommendedDebuggingConfiguration()
kubeschedulerscheme.Scheme.Default(&versionedCfg)
cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
if err := kubeschedulerscheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
return &cfg, nil
}
上面的函數(shù)會創(chuàng)建一個默認(rèn)的 KubeSchedulerConfiguration 對象,用于配置調(diào)度器,默認(rèn)配置參數(shù)通過 Options 構(gòu)造完成后,在構(gòu)造整個 cobra.Command 命令后會為其添加命令行參數(shù):
// cmd/kube-scheduler/app/server.go
// NewSchedulerCommand 使用默認(rèn)參數(shù)和 registryOptions 創(chuàng)建一個 *cobra.Command 對象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
// 獲取默認(rèn)的配置信息
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
......
}
fs := cmd.Flags()
// 調(diào)用 Options 的 Flags 方法
namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
// 將默認(rèn)的所有參數(shù)添加到 cmd.Flags 中去
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
......
return cmd
}
其中的 opts.Flags() 方法就是將默認(rèn)的 Options 配置轉(zhuǎn)換成命令行參數(shù)的函數(shù):
// cmd/kube-scheduler/app/options/options.go
func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
fs := nfs.FlagSet("misc")
fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, `The path to the configuration file. The following flags can overwrite fields in this file:
--address
--port
--use-legacy-policy-config
--policy-configmap
--policy-config-file
--algorithm-provider`)
fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
o.Authentication.AddFlags(nfs.FlagSet("authentication"))
o.Authorization.AddFlags(nfs.FlagSet("authorization"))
o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)
options.BindLeaderElectionFlags(&o.ComponentConfig.LeaderElection, nfs.FlagSet("leader election"))
utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
o.Metrics.AddFlags(nfs.FlagSet("metrics"))
o.Logs.AddFlags(nfs.FlagSet("logs"))
return nfs
}
其中第一個參數(shù) --config 就可以用來指定配置文件。到這里我們就獲取到了調(diào)度器所有默認(rèn)的配置參數(shù)了。
啟動調(diào)度器
接下來分析真正運(yùn)行調(diào)度器的 runCommand 函數(shù)的實(shí)現(xiàn)。
// cmd/kube-scheduler/app/server.go
// 運(yùn)行調(diào)度器的真正函數(shù)
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
// 比如執(zhí)行 --version 這樣的操作,則打印版本后直接退出了
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 根據(jù)命令行 args 和 options 創(chuàng)建完整的配置和調(diào)度程序
cc, sched, err := Setup(ctx, opts, registryOptions...)
if err != nil {
return err
}
// 如果指定了 WriteConfigTo 參數(shù)
if len(opts.WriteConfigTo) > 0 {
// 將配置寫入到指定的文件中
if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
return err
}
klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
return nil
}
// 真正去啟動調(diào)度器
return Run(ctx, cc, sched)
}
上面的函數(shù)首先判斷是否是執(zhí)行類似于 --version 這樣的操作,如果是這打印后直接退出,然后根據(jù)命令行參數(shù)和選項(xiàng)通過 Setup 函數(shù)構(gòu)造 CompletedConfig 配置和 Scheduler 調(diào)度器對象。
// cmd/kube-scheduler/app/server.go/
// 根據(jù)命令行參數(shù)和選項(xiàng)構(gòu)造完整的配置和調(diào)度器對象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
// 校驗(yàn)命令行選項(xiàng)
if errs := opts.Validate(); len(errs) > 0 {
return nil, nil, utilerrors.NewAggregate(errs)
}
// 獲取調(diào)度器Config對象,該對象擁有一個調(diào)度器所有的上下文信息
c, err := opts.Config()
if err != nil {
return nil, nil, err
}
// 獲取 completed 配置
cc := c.Complete()
outOfTreeRegistry := make(runtime.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return nil, nil, err
}
}
recorderFactory := getRecorderFactory(&cc)
// 創(chuàng)建調(diào)度器
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return nil, nil, err
}
return &cc, sched, nil
}
該函數(shù)首先調(diào)用 opts.Validate() 函數(shù)對所有參數(shù)進(jìn)行校驗(yàn),接著使用 opts.Config() 函數(shù)創(chuàng)建 *schedulerappconfig.Config 對象,該對象擁有一個調(diào)度器所有的上下文信息。
// cmd/kube-scheduler/app/options/options.go
// Config 返回一個調(diào)度器配置對象
func (o *Options) Config() (*schedulerappconfig.Config, error) {
......
c := &schedulerappconfig.Config{}
if err := o.ApplyTo(c); err != nil {
return nil, err
}
// 創(chuàng)建 kube 客戶端
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
......
c.Client = client
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
c.PodInformer = scheduler.NewPodInformer(client, 0)
c.LeaderElection = leaderElectionConfig
return c, nil
}
上面函數(shù)的核心是通過 o.ApplyTo(c) 函數(shù)將 Options 轉(zhuǎn)換成了 *schedulerappconfig.Config 對象,
// cmd/kube-scheduler/app/options/options.go
// 將調(diào)度程序 options 轉(zhuǎn)換成調(diào)度程序應(yīng)用配置
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
if len(o.ConfigFile) == 0 {
c.ComponentConfig = o.ComponentConfig
// 如果未加載任何配置文件,則應(yīng)用 deprecated flags(這是舊的行為)
o.Deprecated.ApplyTo(&c.ComponentConfig)
if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
return err
}
} else {
cfg, err := loadConfigFromFile(o.ConfigFile)
if err != nil {
return err
}
if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
return err
}
c.ComponentConfig = *cfg
......
}
......
return nil
}
上面的轉(zhuǎn)換函數(shù)會首先判斷是否配置了 ConfigFile(也就是 --config 參數(shù)),如果配置了則會加載對應(yīng)的配置文件轉(zhuǎn)換成對應(yīng)的 KubeSchedulerConfiguration 對象,然后校驗(yàn)有效性,如果都正常則將其賦給 schedulerappconfig.Config 的 ComponentConfig 屬性;如果沒有配置 ConfigFile,則使用舊的參數(shù)進(jìn)行配置。
接著會去調(diào)用 scheduler.New() 函數(shù)去構(gòu)造一個真正的調(diào)度器對象,該函數(shù)的具體實(shí)現(xiàn)如下所示:
// pkg/scheduler/scheduler.go
// 配置調(diào)度器
type Option func(*schedulerOptions)
var defaultSchedulerOptions = schedulerOptions{
profiles: []schedulerapi.KubeSchedulerProfile{
// Profiles 的默認(rèn)插件是從算法提供程序配置的
{SchedulerName: v1.DefaultSchedulerName}, // 默認(rèn)的調(diào)度器名稱為 default-scheduler
},
schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
Provider: defaultAlgorithmSourceProviderName(), // 默認(rèn)的算法源提供器名稱為 DefaultProvider
},
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}
// 返回一個 Scheduler 對象
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
podInformer coreinformers.PodInformer,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {
stopEverything := stopCh
if stopEverything == nil {
stopEverything = wait.NeverStop
}
// 默認(rèn)的調(diào)度器配置
options := defaultSchedulerOptions
for _, opt := range opts {
// 將默認(rèn)的調(diào)度器配置調(diào)用 Option 重新配置一次
opt(&options)
}
......
var sched *Scheduler
// SchedulerAlgorithmSource 是調(diào)度程序算法的源
// 包含Policy與Provider兩種方式,必須指定一個源字段,并且源字段是互斥的
source := options.schedulerAlgorithmSource
switch {
case source.Provider != nil:
// 從一個算法 provider 中創(chuàng)建配置,這是我們現(xiàn)在需要重點(diǎn)關(guān)注的方式
sc, err := configurator.createFromProvider(*source.Provider)
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
}
sched = sc
case source.Policy != nil:
// 從用戶指定的策略源中創(chuàng)建配置,這是以前的擴(kuò)展方式
policy := &schedulerapi.Policy{}
......
default:
return nil, fmt.Errorf("unsupported algorithm source: %v", source)
}
......
// addAllEventHandlers 是在測試和 Scheduler 中使用的幫助程序函數(shù),用于為各種 informers 添加事件處理程序
addAllEventHandlers(sched, informerFactory, podInformer)
return sched, nil
}
首先將默認(rèn)的調(diào)度器配置通過傳遞的 Option 參數(shù)進(jìn)行一一配置,然后重點(diǎn)就是根據(jù)應(yīng)用過后的配置來判斷調(diào)度算法的源是 Provider 還是 Policy 方式,我們現(xiàn)在的重點(diǎn)是調(diào)度框架,所以主要關(guān)注 Provider 這種配置,Policy 是以前的擴(kuò)展調(diào)度器的方式。所以調(diào)度器的實(shí)例化核心是通過 configurator.createFromProvider(*source.Provider) 該函數(shù)來實(shí)現(xiàn)的。
// pkg/scheduler/factory.go
// 從一組已注冊的插件集合中創(chuàng)建一個調(diào)度器
func (c *Configurator) create() (*Scheduler, error) {
var extenders []framework.Extender
var ignoredExtendedResources []string
if len(c.extenders) != 0 {
// Extender 方式擴(kuò)展調(diào)度器
......
}
......
// Profiles 需要提供有效的 queue sort 插件
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
// 將優(yōu)先級隊(duì)列初始化為調(diào)度隊(duì)列
podQueue := internalqueue.NewSchedulingQueue(
lessFn,
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
)
......
// 創(chuàng)建一個 genericScheduler 對象,該對象實(shí)現(xiàn)了 ScheduleAlgorithm 接口,具體的調(diào)度實(shí)現(xiàn)就是這個對象實(shí)現(xiàn)的
algo := core.NewGenericScheduler(
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.disablePreemption,
c.percentageOfNodesToScore,
)
return &Scheduler{
SchedulerCache: c.schedulerCache,
Algorithm: algo,
Profiles: profiles,
NextPod: internalqueue.MakeNextPodFunc(podQueue),
Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
StopEverything: c.StopEverything,
SchedulingQueue: podQueue,
}, nil
}
// createFromProvider 從注冊的算法提供器來創(chuàng)建調(diào)度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
// 獲取算法提供器集合
r := algorithmprovider.NewRegistry()
// 獲取指定算法提供器的插件集合
defaultPlugins, exist := r[providerName]
if !exist {
return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
}
for i := range c.profiles {
prof := &c.profiles[i]
plugins := &schedulerapi.Plugins{}
plugins.Append(defaultPlugins)
// Apply 合并來自自定義插件的插件配置
plugins.Apply(prof.Plugins)
prof.Plugins = plugins
}
return c.create()
}
通過上面的一些列操作后就實(shí)例化了真正的調(diào)度器對象,最后我們需要去啟動一系列的資源對象的事件監(jiān)聽程序,比如 Pod、Node 等對象,上面實(shí)例化函數(shù)中通過 addAllEventHandlers(sched, informerFactory, podInformer) 來實(shí)現(xiàn)的,關(guān)于這些資源對象對應(yīng)的 onAdd、onUpdate、onDelete 操作均在 pkg/scheduler/eventhandlers.go 文件中實(shí)現(xiàn)的,這樣比如當(dāng)創(chuàng)建一個 Pod 過后,我們的調(diào)度器通過 watch 就會在 onAdd 事件中接收到該操作,然后我們就可以根據(jù) queue sort 插件將器加入到帶調(diào)度隊(duì)列中去開始調(diào)度了。
最后就是去調(diào)用 Run 函數(shù)來真正啟動調(diào)度器了,首先會等待所有的 cache 同步完成,然后開始進(jìn)行調(diào)度操作。
// cmd/kube-scheduler/app/server.go/
// Run 根據(jù)指定的配置執(zhí)行調(diào)度程序,僅在出現(xiàn)錯誤或上下文完成時才返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
......
// 啟動 healthz 以及 metrics 相關(guān)服務(wù)
......
// 啟動所有 informers
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())
// 調(diào)度之前等待所有 caches 同步完成
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// 開啟了 leader election
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// 如果沒有開啟 Leader election,這直接調(diào)用調(diào)度器對象的 Run 函數(shù)
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}
// pkg/scheduler/scheduler.go
// 等待 cache 同步完成,然后開始調(diào)度
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
在接下來的課程中我們就接著來分析是如何進(jìn)行具體的 Pod 調(diào)度的。
點(diǎn)擊屏末 | 閱讀原文 | 學(xué)習(xí)k8s開發(fā)課程