kube-scheduler 的 Cache 解析
在 kube-scheduler 的 SchedulingQueue 調度隊列中都是 Pending 狀態(tài)的 Pod,也就是未調度的 Pod,本文分析的 Cache 中都是已經調度的 Pod(包括假定調度的 Pod)。而 Cache 并不是僅僅為了存儲已調度的 Pod 方便查找,而是為調度提供能非常重要的狀態(tài)信息,甚至已經超越了 Cache 本身定義范疇。
既然定義為 Cache,需要回答如下幾個問題:
cache 誰?kubernetes 的信息都存儲在 etcd 中,而訪問 kubernetes 的 etcd 的唯一方法是通過 apiserver,所以準確的說是緩存 etcd 的信息。 cache 哪些信息?調度器需要將 Pod 調度到滿足需求的 Node 上,所以 cache 至少要緩存 Pod 和 Node 信息,這樣才能提高 kube-scheduler 訪問 apiserver 的性能。 為什么要 cache?因為 client-go 已經提供了 cache 能力,kube-scheduler 增加一層 cache 的目的是什么呢?答案很簡單,為了調度。本文的 Cache 不僅緩存了 Pod 和 Node 信息,更關鍵的是聚合了調度結果,讓調度變得更容易,也就是本文重點內容。
為了避免 Node 的翻譯失去原有的意義,本文直接引用 Node,而不是翻譯成節(jié)點、服務器等。同時,也避免 Bind 的翻譯歧義而直接引用,沒有翻譯為綁定。
本文采用的源碼是 kubenretes 的 release-1.20 分支,最新 Kubernetes 版本文檔鏈接:https://github.com/jindezgm/k8s-src-analysis/blob/master/kube-scheduler/Cache.md
Cache
Cache 的抽象
前文筆者已經回答了 Cache 的 3 個問題,那么就先從 Cache 的接口設計上能不能找到部分答案?源碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/interface.go#L58
type Cache interface {
// 獲取node的數(shù)量,用于單元測試使用,本文不做說明
NodeCount() int
// 獲取Pod的數(shù)量,用于單元測試使用,本文不做說明
PodCount() (int, error)
// 此處需要給出一個概念:假定Pod,就是將Pod假定調度到指定的Node,但還沒有Bind完成。
// 為什么要這么設計?因為kube-scheduler是通過異步的方式實現(xiàn)Bind,在Bind完成前,
// 調度器還要調度新的Pod,此時就先假定Pod調度完成了。至于什么是Bind?為什么Bind?
// 怎么Bind?筆者會在其他文章中解析,此處簡單理解為:需要將Pod的調度結果寫入etcd,
// 持久化調度結果,所以也是相對比較耗時的操作。
// AssumePod會將Pod的資源需求累加到Node上,這樣kube-scheduler在調度其他Pod的時候,
// 就不會占用這部分資源。
AssumePod(pod *v1.Pod) error
// 前面提到了,Bind是一個異步過程,當Bind完成后需要調用這個接口通知Cache,
// 如果完成Bind的Pod長時間沒有被確認(確認方法是AddPod),那么Cache就會清理掉假定過期的Pod。
FinishBinding(pod *v1.Pod) error
// 刪除假定的Pod,kube-scheduler在調用AssumePod后如果遇到其他錯誤,就需要調用這個接口
ForgetPod(pod *v1.Pod) error
// 添加Pod既確認了假定的Pod,也會將假定過期的Pod重新添加回來。
AddPod(pod *v1.Pod) error
// 更新Pod,其實就是刪除再添加
UpdatePod(oldPod, newPod *v1.Pod) error
// 刪除Pod.
RemovePod(pod *v1.Pod) error
// 獲取Pod.
GetPod(pod *v1.Pod) (*v1.Pod, error)
// 判斷Pod是否假定調度
IsAssumedPod(pod *v1.Pod) (bool, error)
// 添加Node的全部信息
AddNode(node *v1.Node) error
// 更新Node的全部信息
UpdateNode(oldNode, newNode *v1.Node) error
// 刪除Node的全部信息
RemoveNode(node *v1.Node) error
// 其實就是產生Cache的快照并輸出到nodeSnapshot中,那為什么是更新呢?
// 因為快照比較大,產生快照也是一個比較重的任務,如果能夠基于上次快照把增量的部分更新到上一次快照中,
// 就會變得沒那么重了,這就是接口名字是更新快照的原因。文章后面會重點分析這個函數(shù),
// 因為其他接口非常簡單,理解了這個接口基本上就理解了Cache的精髓所在。
UpdateSnapshot(nodeSnapshot *Snapshot) error
// Dump會快照Cache,用于調試使用,不是重點,所以本文不會對該函數(shù)做說明。
Dump() *Dump
}
從 Cache 的接口設計上可以看出,Cache 只緩存了 Pod 和 Node 信息,而 Pod 和 Node 信息存儲在 etcd 中(可以通過 kubectl 增刪改查),所以可以確認 Cache 緩存了 etcd 中的 Pod 和 Node 信息。
NodeInfo 的定義
在 SchedulingQueue 中,調度隊列定義了 QueuedPodInfo 類型,在 Pod API 基礎上擴展了與調度隊列相關的屬性。同樣的道理,Node API 只是 Node 的公共屬性,而 Cache 中的 Node 需要擴展與 Cache 相關的屬性,所以就有了 NodeInfo 這個類型。源碼連接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/framework/types.go#L189
// NodeInfo是Node層的匯聚信息
type NodeInfo struct {
// Node API對象,無需過多解釋
node *v1.Node
// 運行在Node上的所有Pod,PodInfo的定義讀者自己查看,本文不再擴展了
Pods []*PodInfo
// PodsWithAffinity是Pods的子集,所有的Pod都聲明了親和性
PodsWithAffinity []*PodInfo
// PodsWithRequiredAntiAffinity是Pods子集,所有的Pod都聲明了反親和性
PodsWithRequiredAntiAffinity []*PodInfo
// 本文無關,忽略
UsedPorts HostPortInfo
// 此Node上所有Pod的總Request資源,包括假定的Pod,調度器已發(fā)送該Pod進行綁定,但可能尚未對其進行調度。
Requested *Resource
// Pod的容器資源請求有的時候是0,kube-scheduler為這類容器設置默認的資源最小值,并累加到NonZeroRequested.
// 也就是說,NonZeroRequested等于Requested加上所有按照默認最小值累加的零資源
// 這并不反映此節(jié)點的實際資源請求,而是用于避免將許多零資源請求的Pod調度到一個Node上。
NonZeroRequested *Resource
// Node的可分配的資源量
Allocatable *Resource
// 鏡像狀態(tài),比如Node上有哪些鏡像,鏡像的大小,有多少Node相應的鏡像等。
ImageStates map[string]*ImageStateSummary
// 與本文無關,忽略
TransientInfo *TransientSchedulerInfo
// 類似于版本,NodeInfo的任何狀態(tài)變化都會使得Generation增加,比如有新的Pod調度到Node上
// 這個Generation很重要,可以用于只復制變化的Node對象,后面更新鏡像的時候會詳細說明
Generation int64
}
nodeTree
nodeTree 是按照區(qū)域(zone)將 Node 組織成樹狀結構,當需要按區(qū)域列舉或者全量列舉按照區(qū)域排序,nodeTree 就會用的上。為什么有這個需求,還是那句話,調度需要。舉一個可能不恰當?shù)睦樱罕热缍鄠€ Pod 的副本需要部署在同一個區(qū)域亦或是不同的區(qū)域。
源碼連接:https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/internal/cache/node_tree.go#L32
type nodeTree struct {
// map的鍵是zone名字,map的值是該區(qū)域內所有Node的名字。
tree map[string][]string
// 所有的zone的名字
zones []string
// Node的數(shù)量
numNodes int
}
nodeTree 只是把 Node 名字組織成樹狀,如果需要 NodeInfo 還需要根據(jù) Node 的名字查找 NodeInfo。
快照
快照是對 Cache 某一時刻的復制,隨著時間的推移,Cache 的狀態(tài)在持續(xù)更新,kube-scheduler 在調度一個 Pod 的時候需要獲取 Cache 的快照。相比于直接訪問 Cache,快照可以解決如下幾個問題:
快照不會再有任何變化,可以理解為只讀,那么訪問快照不需要加鎖保證保證原子性; 快照和 Cache 讓讀寫分離,可以避免大范圍的鎖造成 Cache 訪問性能下降;
雖然快照的狀態(tài)從創(chuàng)建開始就落后于(因為 Cache 可能隨時都會更新)Cache,但是對于 kube-scheduler 調度一個 Pod 來說是沒問題的,至于原因筆者會在解析調度流程中加以說明。
源碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/snapshot.go#L29
// 從定義上看,快照只有Node信息,沒有Pod信息,其實Node信息中已經有Pod信息了,這個在NodeInfo中已經說明了
type Snapshot struct {
// nodeInfoMap用于根據(jù)Node的key(NS+Name)快速查找Node
nodeInfoMap map[string]*framework.NodeInfo
// nodeInfoList是Cache中Node全集列表(不包含已刪除的Node),按照nodeTree排序.
nodeInfoList []*framework.NodeInfo
// 只要Node上有任何Pod聲明了親和性,那么該Node就要放入havePodsWithAffinityNodeInfoList。
// 為什么要有這個變量?當然是為了調度,比如PodA需要和PodB調度在一個Node上。
havePodsWithAffinityNodeInfoList []*framework.NodeInfo
// havePodsWithRequiredAntiAffinityNodeInfoList和havePodsWithAffinityNodeInfoList相似,
// 只是Pod聲明了反親和,比如PodA不能和PodB調度在一個Node上
havePodsWithRequiredAntiAffinityNodeInfoList []*framework.NodeInfo
// generation是所有NodeInfo.Generation的最大值,因為所有NodeInfo.Generation都源于一個全局的Generation變量,
// 那么Cache中的NodeInfo.Gerneraion大于該值的就是在快照產生后更新過的Node。
// kube-scheduler調用Cache.UpdateSnapshot的時候只需要更新快照之后有變化的Node即可
generation int64
}
Cache 實現(xiàn)
前面鋪墊了已經足夠了,現(xiàn)在開始進入重點內容,先來看看 Cache 實現(xiàn)類 schedulerCache 的定義。源碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L57
// schedulerCache實現(xiàn)了Cache接口
type schedulerCache struct {
// 這個比較好理解,用來通知schedulerCache停止的chan,說明schedulerCache有自己的協(xié)程
stop <-chan struct{}
// 假定Pod一旦完成綁定,就要在指定的時間內確認,否則就會超時,ttl就是指定的過期時間,默認30秒
ttl time.Duration
// 定時清理“假定過期”的Pod,period就是定時周期,默認是1秒鐘
// 前面提到了schedulerCache有自己的協(xié)程,就是定時清理超時的假定Pod.
period time.Duration
// 鎖,說明schedulerCache利用互斥鎖實現(xiàn)協(xié)程安全,而不是用chan與其他協(xié)程交互。
// 這一點實現(xiàn)和SchedulingQueue是一樣的。
mu sync.RWMutex
// 假定Pod集合,map的key與podStates相同,都是Pod的NS+NAME,值為true就是假定Pod
// 其實assumedPods的值沒有false的可能,感覺assumedPods用set類型(map[string]struct{}{})更合適
assumedPods map[string]bool
// 所有的Pod,此處用的是podState,后面有說明,與SchedulingQueue中提到的QueuedPodInfo類似,
// podState繼承了Pod的API定義,增加了Cache需要的屬性
podStates map[string]*podState
// 所有的Node,鍵是Node.Name,值是nodeInfoListItem,后面會有說明,只需要知道m(xù)ap類型就可以了
nodes map[string]*nodeInfoListItem
// 所有的Node再通過雙向鏈表連接起來
headNode *nodeInfoListItem
// 節(jié)點按照zone組織成樹狀,前面提到用nodeTree中Node的名字再到nodes中就可以查找到NodeInfo.
nodeTree *nodeTree
// 鏡像狀態(tài),本文不做重點說明,只需要知道Cache還統(tǒng)計了鏡像的信息就可以了。
imageStates map[string]*imageState
}
// podState與繼承了Pod的API類型定義,同時擴展了schedulerCache需要的屬性.
type podState struct {
pod *v1.Pod
// 假定Pod的超時截止時間,用于判斷假定Pod是否過期。
deadline *time.Time
// 調用Cache.AssumePod的假定Pod不是所有的都需要判斷是否過期,因為有些假定Pod可能還在Binding
// bindingFinished就是用于標記已經Bind完成的Pod,然后開始計時,計時的方法就是設置deadline
// 還記得Cache.FinishBinding接口么?就是用來設置bindingFinished和deadline的,后面代碼會有解析
bindingFinished bool
}
// nodeInfoListItem定義了nodeInfoList雙向鏈表的item,nodeInfoList的實現(xiàn)非常簡單,不多解釋。
type nodeInfoListItem struct {
info *framework.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}
問題來了,既然已經有了 nodes(map 類型)變量,為什么還要再加一個 headNode(list 類型)的變量?這不是多此一舉么?其實不然,nodes 可以根據(jù) Node 的名字快速找到 Node,而 headNode 則是根據(jù)某個規(guī)則排過序的。這一點和 SchedulingQueue 中介紹的用 map/slice 實現(xiàn)隊列是一個道理,至于為什么用 list 而不是 slice,肯定是排序方法鏈表的效率高于 slice,后面在更新 headNode 的地方再做說明,此處先排除疑慮。
從 schedulerCache 的定義基本可以猜到大部分 Cache 接口的實現(xiàn),本文對于比較簡單的接口實現(xiàn)只做簡要說明,將文字落在一些重點的函數(shù)上。PodCount 和 NodeCount 兩個函數(shù)因為用于單元測試使用,本文不做說明。
AssumePod
當 kube-scheduler 找到最優(yōu)的 Node 調度 Pod 的時候會調用 AssumePod 假定 Pod 調度,在通過另一個協(xié)程異步 Bind。假定其實就是預先占住資源,kube-scheduler 調度下一個 Pod 的時候不會把這部分資源搶走,直到收到確認消息 AddPod 確認調度成功,亦或是 Bind 失敗 ForgetPod 取消假定調度。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L361
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
// 獲取Pod的唯一key,就是NS+Name,因為kube-scheduler調度整個集群的Pod
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
// 如果Pod已經存在,則不能假定調度。因為在Cache中的Pod要么是假定調度的,要么是完成調度的
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
// 見下面代碼注釋
cache.addPod(pod)
ps := &podState{
pod: pod,
}
// 把Pod添加到map中,并標記為assumed
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) addPod(pod *v1.Pod) {
// 查找Pod調度的Node,如果不存在則創(chuàng)建一個虛Node,虛Node只是沒有Node API對象。
// 為什么會這樣?可能kube-scheduler調度Pod的時候Node被刪除了,可能很快還會添加回來
// 也可能就徹底刪除了,此時先放在這個虛的Node上,如果Node不存在后期還會被遷移。
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
// AddPod就是把Pod的資源累加到NodeInfo中,本文不做詳細說明,感興趣的讀者自行查看源碼
// 但需要知道的是n.info.AddPod(pod)會更新NodeInfo.Generation,表示NodeInfo是最新的
n.info.AddPod(pod)
// 將Node放到schedulerCache.headNode隊列頭部,因為NodeInfo當前是最新的,所以放在頭部。
// 此處可以解答為什么用list而不是slice,因為每次都是將Node直接放在第一個位置,明顯list效率更高
// 所以headNode是按照最近更新排序的
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
ForgetPod
假定 Pod 預先占用了一些資源,如果之后的操作(比如 Bind)有什么錯誤,就需要取消假定調度,釋放出資源。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L406
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
// 獲取Pod唯一key
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
// 這里有意思了,也就是說Cache假定Pod的Node名字與傳入的Pod的Node名字不一致,則返回錯誤
// 這種情況會不會發(fā)生呢?有可能,但是可能性不大,畢竟多協(xié)程修改Pod調度狀態(tài)會有各種可能性。
// 此處留挖一個坑,在解析kube-scheduler調度流程的時候看看到底什么極致的情況會觸發(fā)這種問題。
currState, ok := cache.podStates[key]
if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
}
switch {
// 只有假定Pod可以被Forget,因為Forget就是為了取消假定Pod的。
case ok && cache.assumedPods[key]:
// removePod()就是把假定Pod的資源從NodeInfo中減去,見下面代碼注釋
err := cache.removePod(pod)
if err != nil {
return err
}
// 刪除Pod和假定狀態(tài)
delete(cache.assumedPods, key)
delete(cache.podStates, key)
// 要么Pod不存在,要么Pod已確認調度,這兩者都不能夠被Forget
default:
return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
}
return nil
}
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
// 找到假定Pod調度的Node
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
return nil
}
// 減去假定Pod的資源,并從NodeInfo的Pod列表移除假定Pod
// 和n.info.AddPod相同,也會更新NodeInfo.Generation
if err := n.info.RemovePod(pod); err != nil {
return err
}
// 如果NodeInfo的Pod列表沒有任何Pod并且Node被刪除,則Node從Cache中刪除
// 否則將NodeInfo移到列表頭,因為NodeInfo被更新,需要放到表頭
// 這里需要知道的是,Node被刪除Cache不會立刻刪除該Node,需要等到Node上所有的Pod從Node中遷移后才刪除,
// 具體實現(xiàn)邏輯后續(xù)文章會給出,此處先知道即可。
if len(n.info.Pods) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil
}
FinishBinding
當假定 Pod 綁定完成后,需要調用 FinishBinding 通知 Cache 開始計時,直到假定 Pod 過期如果依然沒有收到 AddPod 的請求,則將過期假定 Pod 刪除。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L382
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
// 取當前時間
return cache.finishBinding(pod, time.Now())
}
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
// 獲取Pod唯一key
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.RLock()
defer cache.mu.RUnlock()
klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
// Pod存在并且是假定狀態(tài)才行
currState, ok := cache.podStates[key]
if ok && cache.assumedPods[key] {
// 標記為完成Binding,并且設置過期時間,還記得ttl默認是多少么?30秒。
dl := now.Add(cache.ttl)
currState.bindingFinished = true
currState.deadline = &dl
}
return nil
}
AddPod
當 Pod Bind 成功,kube-scheduler 會收到消息,然后調用 AddPod 確認調度結果。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L476
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
// 獲取Pod唯一key
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
// 以下是根據(jù)Pod在Cache中的狀態(tài)決定需要如何處理
currState, ok := cache.podStates[key]
switch {
// Pod是假定調度
case ok && cache.assumedPods[key]:
// Pod實際調度的Node和假定的不一致?
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// 如果不一致,先從假定調度的NodeInfo中減去Pod占用的資源,然后在累加到新NodeInfo中
// 這種情況會在什么時候發(fā)生?還是留給后續(xù)文章分解吧
if err = cache.removePod(currState.pod); err != nil {
klog.Errorf("removing pod error: %v", err)
}
cache.addPod(pod)
}
// 刪除假定狀態(tài)
delete(cache.assumedPods, key)
// 清空假定過期時間,理論上從cache.assumedPods刪除,假定過期時間自然也就失效了
cache.podStates[key].deadline = nil
// 這里有意思了,為什么要在賦值一次?currState中不是已經在AssumePod的時候設置了么?
// 道理很簡單,這是同一個Pod的兩個副本,而當前參數(shù)‘pod’版本更新
cache.podStates[key].pod = pod
// Pod不存在
case !ok:
// Pod可能已經假定過期被刪除了,需要重新添加回來
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
// Pod已經執(zhí)行過AddPod,有句高大上名詞叫什么來著?對了,冪等!
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
RemovePod
kube-scheduler 收到刪除 Pod 的請求,如果 Pod 在 Cache 中,就需要調用 RemovePod。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L541
func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
// 獲取Pod唯一key
key, err := framework.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
// 根據(jù)Pod在Cache中的狀態(tài)執(zhí)行相應的操作
currState, ok := cache.podStates[key]
switch {
// 只有執(zhí)行AddPod的Pod才能夠執(zhí)行RemovePod,假定Pod是不會執(zhí)行RemovePod的,為什么?
// 我只能說就是這么設計的,假定Pod是不會執(zhí)行這個函數(shù)的,這涉及到Pod刪除的全流程,
// 已經超綱了。。。,我肯定會有文章解析,此處再挖一個坑。
case ok && !cache.assumedPods[key]:
// 臥槽,Pod的Node和AddPod時的Node不一樣?這回的選擇非常直接,奔潰,已經超時異常解決范圍了
// 如果再繼續(xù)下去可能會造成調度狀態(tài)的混亂,不如重啟再來。
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
// 從NodeInfo中減去Pod的資源
err := cache.removePod(currState.pod)
if err != nil {
return err
}
// 從Cache中刪除Pod
delete(cache.podStates, key)
default:
return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
}
return nil
}
AddNode
有新的 Node 添加到集群,kube-scheduler 調用該接口通知 Cache。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L605
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
// 如果NodeInfo不存在則創(chuàng)建
n = newNodeInfoListItem(framework.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
// 已存在,先刪除鏡像狀態(tài),因為后面還會在添加回來
cache.removeNodeImageStates(n.info.Node())
}
// 將Node放到列表頭
cache.moveNodeInfoToHead(node.Name)
// 添加到nodeTree中
cache.nodeTree.addNode(node)
// 添加Node的鏡像狀態(tài),感興趣的讀者自行了解,本文不做重點
cache.addNodeImageStates(node, n.info)
// 只有SetNode的NodeInfo才是真實的Node,否則就是前文提到的虛的Node
return n.info.SetNode(node)
}
RemoveNode
Node 從集群中刪除,kube-scheduler 調用該接口通知 Cache。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L648
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// 如果Node不存在返回錯誤
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
// RemoveNode就是將*v1.Node設置為nil,此時Node就是虛的了
n.info.RemoveNode()
// 當Node上沒有運行Pod的時候刪除Node,否則把Node放在列表頭,因為Node狀態(tài)更新了
// 熟悉etcd的同學會知道,watch兩個路徑(Node和Pod)是兩個通道,這樣會造成兩個通道的事件不會按照嚴格時序到達
// 這應該是存在虛Node的原因之一。
if len(n.info.Pods) == 0 {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
// 雖然nodes只有在NodeInfo中Pod數(shù)量為零的時候才會被刪除,但是nodeTree會直接刪除
// 說明nodeTree中體現(xiàn)了實際的Node狀態(tài),kube-scheduler調度Pod的時候也是利用nodeTree
// 這樣就不會將Pod調度到已經刪除的Node上了。
if err := cache.nodeTree.removeNode(node); err != nil {
return err
}
cache.removeNodeImageStates(node)
return nil
}
后期清理協(xié)程函數(shù) run
前文提到過,Cache 有自己的協(xié)程,就是用來清理假定到期的 Pod。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L724
func (cache *schedulerCache) run() {
// 定時1秒鐘執(zhí)行一次cleanupExpiredAssumedPods
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
// 取當前時間
cache.cleanupAssumedPods(time.Now())
}
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer cache.updateMetrics()
// 遍歷假定Pod
for key := range cache.assumedPods {
// 獲取Pod
ps, ok := cache.podStates[key]
if !ok {
klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
}
// 如果Pod沒有標記為結束Binding,則忽略,說明Pod還在Binding中
// 說白了就是沒有調用FinishBinding的Pod不用處理
if !ps.bindingFinished {
klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
// 如果當前時間已經超過了Pod假定過期時間,說明Pod假定時間已過期
if now.After(*ps.deadline) {
// 此類情況屬于異常情況,所以日志等級是waning
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
// 清理假定過期的Pod
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
// 從NodeInfo中減去Pod資源、鏡像等狀態(tài)
if err := cache.removePod(ps.pod); err != nil {
return err
}
// 從Cache中刪除Pod
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
其實這里有一個比較嚴重的問題:如果假定過期的 Pod 資源剛剛會被釋放,又有新 Pod 調度到了與剛剛假定過期 Pod 相同的 Node 上,此后 Pod 被 AddPod 添加回來,可能會讓 Node 的資源過載。
UpdateSnapshot
好了,前文那么多的鋪墊,都是為了 UpdateSnapshot,因為 Cache 存在的核心目的就是給 kube-scheduler 提供 Node 鏡像,讓 kube-scheduler 根據(jù) Node 的狀態(tài)調度新的 Pod。而 Cache 中的 Pod 是為了計算 Node 的資源狀態(tài)存在的,畢竟二者在 etcd 中是兩個路徑。話不多說,直接上代碼。代碼鏈接:https://github.com/kubernetes/kubernetes/blob/release-1.20/pkg/scheduler/internal/cache/cache.go#L203
// UpdateSnapshot更新的是參數(shù)nodeSnapshot,不是更新Cache.
// 也就是Cache需要找到當前與nodeSnapshot的差異,然后更新它,這樣nodeSnapshot就與Cache狀態(tài)一致了
// 至少從函數(shù)執(zhí)行完畢后是一致的。
func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
// 與本文關系不大,鑒于不增加復雜性原則,先忽略他,從命名上看很容易立理解
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
// 獲取nodeSnapshot的版本,筆者習慣叫版本,其實就是版本的概念。
// 此處需要多說一點:kube-scheudler為Node定義了全局的generation變量,每個Node狀態(tài)變化都會造成generation+=1然后賦值給該Node
// nodeSnapshot.generation就是最新NodeInfo.Generation,就是表頭的那個NodeInfo。
snapshotGeneration := nodeSnapshot.generation
// 介紹Snapshot的時候提到了,快照中有三個列表,分別是全量、親和性和反親和性列表
// 全量列表在沒有Node添加或者刪除的時候,是不需要更新的
updateAllLists := false
// 當有Node的親和性狀態(tài)發(fā)生了變化(以前沒有任何Pod有親和性聲明現(xiàn)在有了,抑或反過來),
// 則需要更新快照中的親和性列表
updateNodesHavePodsWithAffinity := false
// 同上
updateNodesHavePodsWithRequiredAntiAffinity := false
// 遍歷Node列表,為什么不遍歷Node的map?因為Node列表是按照Generation排序的
// 只要找到大于nodeSnapshot.generation的所有Node然后把他們更新到nodeSnapshot中就可以了
for node := cache.headNode; node != nil; node = node.next {
// 說明Node的狀態(tài)已經在nodeSnapshot中了,因為但凡Node有任何更新,那么NodeInfo.Generation
// 肯定會大于snapshotGeneration,同時該Node后面的所有Node也不用在遍歷了,因為他們的版本更低
if node.info.Generation <= snapshotGeneration {
break
}
// 先忽略
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
// node.info.Node()獲取*v1.Node,前文說了,如果Node被刪除,那么該值就是為nil
// 所以只有未被刪除的Node才會被更新到nodeSnapshot,因為快照中的全量Node列表是按照nodeTree排序的
// 而nodeTree都是真實的node
if np := node.info.Node(); np != nil {
// 如果nodeSnapshot中沒有該Node,則在nodeSnapshot中創(chuàng)建Node,并標記更新全量列表,因為創(chuàng)建了新的Node
existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
if !ok {
updateAllLists = true
existing = &framework.NodeInfo{}
nodeSnapshot.nodeInfoMap[np.Name] = existing
}
// 克隆NodeInfo,這個比較好理解,肯定不能簡單的把指針設置過去,這樣會造成多協(xié)程讀寫同一個對象
// 因為克隆操作比較重,所以能少做就少做,這也是利用Generation實現(xiàn)增量更新的原因
clone := node.info.Clone()
// 如果Pod以前或者現(xiàn)在有任何親和性聲明,則需要更新nodeSnapshot中的親和性列表
if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
updateNodesHavePodsWithAffinity = true
}
// 同上,需要更新非親和性列表
if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
updateNodesHavePodsWithRequiredAntiAffinity = true
}
// 將NodeInfo的拷貝更新到nodeSnapshot中
*existing = *clone
}
}
// Cache的表頭Node的版本是最新的,所以也就代表了此時更新鏡像后鏡像的版本了
if cache.headNode != nil {
nodeSnapshot.generation = cache.headNode.info.Generation
}
// 如果nodeSnapshot中node的數(shù)量大于nodeTree中的數(shù)量,說明有node被刪除
// 所以要從快照的nodeInfoMap中刪除已刪除的Node,同時標記需要更新node的全量列表
if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
updateAllLists = true
}
// 如果需要更新Node的全量或者親和性或者反親和性列表,則更新nodeSnapshot中的Node列表
if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
}
// 如果此時nodeSnapshot的node列表與nodeTree的數(shù)量還不一致,需要再做一次node全列表更新
// 此處應該是一個保險操作,理論上不會發(fā)生,誰知道會不會有Bug發(fā)生呢?多一些容錯沒有壞處
if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
", length of NodeInfoMap=%v, length of nodes in cache=%v"+
", trying to recover",
len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
klog.Error(errMsg)
// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
return fmt.Errorf(errMsg)
}
return nil
}
// 先思考一個問題:為什么有Node添加或者刪除需要更新快照中的全量列表?如果是Node刪除了,
// 需要找到Node在全量列表中的位置,然后刪除它,最悲觀的復雜度就是遍歷一遍列表,然后再挪動它后面的Node
// 因為快照的Node列表是用slice實現(xiàn),所以一旦快照中Node列表有任何更新,復雜度都是Node的數(shù)量。
// 那如果是有新的Node添加呢?并不知道應該插在哪里,所以重新創(chuàng)建一次全量列表最為簡單有效。
// 親和性和反親和性列表道理也是一樣的。
func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
// 快照創(chuàng)建親和性和反親和性列表
snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
// 如果更新全量列表
if updateAll {
// 創(chuàng)建快照全量列表
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
nodesList, err := cache.nodeTree.list()
if err != nil {
klog.Error(err)
}
// 遍歷nodeTree的Node
for _, nodeName := range nodesList {
// 理論上快照的nodeInfoMap與nodeTree的狀態(tài)是一致,此處做了判斷用來檢測BUG,下面的錯誤日志也是這么寫的
if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
// 追加全量、親和性(按需)、反親和性列表(按需)
snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
} else {
klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
}
}
} else {
// 如果更新全量列表,只需要遍歷快照中的全量列表就可以了
for _, nodeInfo := range snapshot.nodeInfoList {
// 按需追加親和性和反親和性列表
if len(nodeInfo.PodsWithAffinity) > 0 {
snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
}
if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
}
}
}
}
想想快照中 Node 的列表用來干什么?前面應該提到了,就是 map 沒有任何序,而列表按照 nodeTree 排序,對于調度更有利,讀者應該能想明白了。
總結
Cache 緩存了 Pod 和 Node 信息,并且 Node 信息聚合了運行在該 Node 上所有 Pod 的資源量和鏡像信息;Node 有虛實之分,已刪除的 Node,Cache 不會立刻刪除它,而是繼續(xù)維護一個虛的 Node,直到 Node 上的 Pod 清零后才會被刪除;但是 nodeTree 中維護的是實際的 Node,調度使用 nodeTree 就可以避免將 Pod 調度到虛 Node 上; kube-scheduler 利用 client-go 監(jiān)控(watch)Pod 和 Node 狀態(tài),當有事件發(fā)生時調用 Cache 的 AddPod,RemovePod,UpdatePod,AddNode,RemoveNode,UpdateNode 更新 Cache 中 Pod 和 Node 的狀態(tài),這樣 kube-scheduler 開始新一輪調度的時候可以獲得最新的狀態(tài); kube-scheduler 每一輪調度都會調用 UpdateSnapshot 更新本地(局部變量)的 Node 狀態(tài),因為 Cache 中的 Node 按照最近更新排序,只需要將 Cache 中 Node.Generation 大于 kube-scheduler 本地的快照 generation 的 Node 更新到 snapshot 中即可,這樣可以避免大量不必要的拷貝; kube-scheduler 找到合適的 Node 調度 Pod 后,需要調用 Cache.AssumePod 假定 Pod 已調度,然后啟動協(xié)程異步 Bind Pod 到 Node 上,當 Pod 完成 Bind 后,調用 Cache.FinishBinding 通知 Cache; kube-scheudler 調用 Cache.AssumePod 后續(xù)的所有造作一旦有錯誤就會調用 Cache.ForgetPod 刪除假定的 Pod,釋放資源; 完成 Bind 的 Pod 默認超時為 30 秒,Cache 有一個協(xié)程定時(1 秒)清理超時的 Bind 超時的 Pod,如果超時依然沒有收到 Pod 確認消息(調用 AddPod),則將刪除超時的 Pod,進而釋放出 Cache.AssumePod 占用的資源; Cache 的核心功能就是統(tǒng)計 Node 的調度狀態(tài)(比如累加 Pod 的資源量、統(tǒng)計鏡像),然后以鏡像的形式輸出給 kube-scheduler,kube-scheduler 從調度隊列(SchedulingQueue)中取出等待調度的 Pod,根據(jù)鏡像計算最合適的 Node;
此時再來看看源碼中關于 Pod 狀態(tài)機的注釋就非常容易理解了:
// State Machine of a pod's events in scheduler's cache:
//
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
//
上面總結中描述了 kube-scheduler 大致調度一個 Pod 的流程,其實 kube-scheduler 調度一個 Pod 的流程非常復雜,此處為了方便理解 Cache 在 kube-scheduler 中的位置和作用,劇透了部分內容。筆者會在后續(xù)文章中詳細解析 kube-scheduler 調度 Pod 的詳細流程。
“原文鏈接:https://blog.csdn.net/weixin_42663840/article/details/112004228
”
K8S 進階訓練營
點擊屏末 | 閱讀原文 | 即刻學習
