Pod 拓撲分布約束使用及調(diào)度原理
在 k8s 集群調(diào)度中,“親和性”相關的概念本質(zhì)上都是控制 Pod 如何被調(diào)度 - 堆疊或是打散。目前 k8s 提供了 podAffinity 以及 podAntiAffinity 兩個特性對 Pod 在不同拓撲域的分布進行了一些控制,podAffinity 可以將無數(shù)個 Pod 調(diào)度到特定的某一個拓撲域,這是堆疊的體現(xiàn);podAntiAffinity 則可以控制一個拓撲域只存在一個 Pod,這是打散的體現(xiàn)。但這兩種情況都太極端了,在不少場景下都無法達到理想的效果,例如為了實現(xiàn)容災和高可用,將業(yè)務 Pod 盡可能均勻的分布在不同可用區(qū)就很難實現(xiàn)。
PodTopologySpread 特性的提出正是為了對 Pod 的調(diào)度分布提供更精細的控制,以提高服務可用性以及資源利用率,PodTopologySpread 由 EvenPodsSpread 特性門所控制,在 v1.16 版本第一次發(fā)布,并在 v1.18 版本進入 beta 階段默認啟用。再了解這個插件是如何實現(xiàn)之前,我們首先需要搞清楚這個特性是如何使用的。
使用規(guī)范
在 Pod 的 Spec 規(guī)范中新增了一個 topologySpreadConstraints 字段:
spec:
topologySpreadConstraints:
- maxSkew: <integer>
topologyKey: <string>
whenUnsatisfiable: <string>
labelSelector: <object>
由于這個新增的字段是在 Pod spec 層面添加,因此更高層級的控制 (Deployment、DaemonSet、StatefulSet) 也能使用 PodTopologySpread 功能。

讓我們結(jié)合上圖來理解 topologySpreadConstraints 中各個字段的含義和作用:
labelSelector: 用來查找匹配的 Pod,我們能夠計算出每個拓撲域中匹配該 label selector 的 Pod 數(shù)量,在上圖中,假如 label selector 是 app:foo,那么 zone1 的匹配個數(shù)為 2, zone2 的匹配個數(shù)為 0。topologyKey: 是 Node label 的 key,如果兩個 Node 的 label 同時具有該 key 并且 label 值相同,就說它們在同一個拓撲域。在上圖中,指定 topologyKey 為 zone, 具有 zone=zone1標簽的 Node 被分在一個拓撲域,具有zone=zone2標簽的 Node 被分在另一個拓撲域。maxSkew: 描述了 Pod 在不同拓撲域中不均勻分布的最大程度,maxSkew 的取值必須大于 0。每個拓撲域都有一個 skew,計算的公式是: skew[i] = 拓撲域[i]中匹配的 Pod 個數(shù) - min{其他拓撲域中匹配的 Pod 個數(shù)}。在上圖中,我們新建一個帶有app=foo標簽的 Pod:如果該 Pod 被調(diào)度到 zone1,那么 zone1 中 Node 的 skew 值變?yōu)?3,zone2 中 Node 的 skew 值變?yōu)?0 (zone1 有 3 個匹配的 Pod,zone2 有 0 個匹配的 Pod ) 如果該 Pod 被調(diào)度到 zone2,那么 zone1 中 Node 的 skew 值變?yōu)?1,zone2 中 Node 的 skew 值變?yōu)?0 (zone2 有 1 個匹配的 Pod,擁有全局最小匹配 Pod 數(shù)的拓撲域正是 zone2 自己 ) whenUnsatisfiable: 描述了如果 Pod 不滿足分布約束條件該采取何種策略: DoNotSchedule (默認) 告訴調(diào)度器不要調(diào)度該 Pod,因此也可以叫作硬策略; ScheduleAnyway 告訴調(diào)度器根據(jù)每個 Node 的 skew 值打分排序后仍然調(diào)度,因此也可以叫作軟策略。
下面我們用兩個實際的示例來進一步說明。
單個 TopologySpreadConstraint
假設你擁有一個 4 節(jié)點集群,其中標記為 foo:bar 的 3 個 Pod 分別位于 node1、node2 和 node3 中:

如果希望新來的 Pod 均勻分布在現(xiàn)有的可用區(qū)域,則可以按如下設置其約束:
kind: Pod
apiVersion: v1
metadata:
name: mypod
labels:
foo: bar
spec:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
foo: bar
containers:
- name: pause
image: k8s.gcr.io/pause:3.1
topologyKey: zone 意味著均勻分布將只應用于存在標簽鍵值對為 zone:<any value> 的節(jié)點。 whenUnsatisfiable: DoNotSchedule 告訴調(diào)度器如果新的 Pod 不滿足約束,則不可調(diào)度。如果調(diào)度器將新的 Pod 放入 "zoneA",Pods 分布將變?yōu)?[3, 1],因此實際的偏差為 2(3 - 1),這違反了 maxSkew: 1 的約定。此示例中,新 Pod 只能放置在 "zoneB" 上:

或者

你可以調(diào)整 Pod 約束以滿足各種要求:
將 maxSkew更改為更大的值,比如 "2",這樣新的 Pod 也可以放在 "zoneA" 上。將 topologyKey更改為 "node",以便將 Pod 均勻分布在節(jié)點上而不是區(qū)域中。在上面的例子中,如果maxSkew保持為 "1",那么傳入的 Pod 只能放在 "node4" 上。將 whenUnsatisfiable: DoNotSchedule更改為whenUnsatisfiable: ScheduleAnyway, 以確保新的 Pod 可以被調(diào)度。
多個 TopologySpreadConstraint
上面是單個 Pod 拓撲分布約束的情況,下面的例子建立在前面例子的基礎上來對多個 Pod 拓撲分布約束進行說明。假設你擁有一個 4 節(jié)點集群,其中 3 個標記為 foo:bar 的 Pod 分別位于 node1、node2 和 node3 上:

可以使用 2 個 TopologySpreadConstraint 來控制 Pod 在 區(qū)域和節(jié)點兩個維度上的分布:
# two-constraints.yaml
kind: Pod
apiVersion: v1
metadata:
name: mypod
labels:
foo: bar
spec:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
foo: bar
- maxSkew: 1
topologyKey: node
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
foo: bar
containers:
- name: pause
image: k8s.gcr.io/pause:3.1
在這種情況下,為了匹配第一個約束,新的 Pod 只能放置在 "zoneB" 中;而在第二個約束中, 新的 Pod 只能放置在 "node4" 上,最后兩個約束的結(jié)果加在一起,唯一可行的選擇是放置 在 "node4" 上。
多個約束之間可能存在沖突,假設有一個跨越 2 個區(qū)域的 3 節(jié)點集群:

如果對集群應用 two-constraints.yaml,會發(fā)現(xiàn) "mypod" 處于 Pending 狀態(tài),這是因為為了滿足第一個約束,"mypod" 只能放在 "zoneB" 中,而第二個約束要求 "mypod" 只能放在 "node2" 上,Pod 調(diào)度無法滿足這兩種約束,所以就沖突了。
為了克服這種情況,你可以增加 maxSkew 或修改其中一個約束,讓其使用 whenUnsatisfiable: ScheduleAnyway。
集群默認約束
除了為單個 Pod 設置拓撲分布約束,也可以為集群設置默認的拓撲分布約束,默認拓撲分布約束在且僅在以下條件滿足 時才會應用到 Pod 上:
Pod 沒有在其 .spec.topologySpreadConstraints設置任何約束;Pod 隸屬于某個服務、副本控制器、ReplicaSet 或 StatefulSet。
你可以在 調(diào)度方案(Schedulingg Profile)中將默認約束作為 PodTopologySpread 插件參數(shù)的一部分來進行設置。約束的設置采用和前面 Pod 中的規(guī)范一致,只是 labelSelector 必須為空。配置的示例可能看起來像下面這個樣子:
apiVersion: kubescheduler.config.k8s.io/v1beta1
kind: KubeSchedulerConfiguration
profiles:
- pluginConfig:
- name: PodTopologySpread
args:
defaultConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: ScheduleAnyway
defaultingType: List
預選
前面了解了如何使用 Pod 拓撲分布約束,接下來我們就可以來看下調(diào)度器中對應插件是如何實現(xiàn)的了。
PreFilter
首先也是去查看這個插件的 PreFilter 函數(shù)的實現(xiàn):
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
// 在 prefilter 擴展點調(diào)用
func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
s, err := pl.calPreFilterState(pod)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
cycleState.Write(preFilterStateKey, s)
return nil
}
這里最核心的就是 calPreFilterState 函數(shù),該函數(shù)用來計算描述如何在拓撲域上傳遞 Pod 的 preFilterState 狀態(tài)數(shù)據(jù),在了解該函數(shù)如何實現(xiàn)之前,我們需要先弄明白 preFilterState 的定義:
// pkg/scheduler/framework/plugins/podtopologyspread/common.go
type topologyPair struct {
key string
value string
}
// 拓撲分布約束定義
type topologySpreadConstraint struct {
MaxSkew int32
TopologyKey string
Selector labels.Selector
}
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
const preFilterStateKey = "PreFilter" + Name
// preFilterState 在 PreFilter 處進行計算,在 Filter 中使用。
// 它結(jié)合了 TpKeyToCriticalPaths 和 TpPairToMatchNum 來表示。
// (1) 最少的 Pod 在每個分布約束上匹配的關鍵路徑。
// (2) 在每個分布約束上匹配的 Pod 數(shù)量。
// 一個 nil preFilterState 表示沒有設置(在 PreFilter 階段);
// 一個空的 preFilterState 對象是一個合法的狀態(tài),在 PreFilter 階段進行設置。
type preFilterState struct {
// demo: {{
// MaxSkew: 1,
// TopologyKey: "zone",
// Selector: ......,
// }}
Constraints []topologySpreadConstraint
// 這里記錄2條關鍵路徑,而不是所有的關鍵路徑。
// criticalPaths[0].MatchNum 總是保持最小的匹配數(shù)。
// criticalPaths[1].MatchNum 總是大于或等于criticalPaths[0].MatchNum,但不能保證是第2個最小匹配數(shù)。
// demo: {
// "zone": {{"zone3", 0}, {"zone2", 2}},
// "node": {{"node-b", 1}, {"node-a", 2}},
// }
TpKeyToCriticalPaths map[string]*criticalPaths
// TpPairToMatchNum 以 topologyPair 為 key,匹配的 Pods 數(shù)量為 value 值
// demo: {key: "zone", value: "zone1"}: pointer.Int32Ptr(3),
// {key: "zone", value: "zone2"}: pointer.Int32Ptr(2),
// {key: "zone", value: "zone3"}: pointer.Int32Ptr(0),
// {key: "node", value: "node-a"}: pointer.Int32Ptr(2),
// {key: "node", value: "node-b"}: pointer.Int32Ptr(1),
TpPairToMatchNum map[topologyPair]*int32
}
type criticalPaths [2]struct {
// TopologyValue 拓撲Key對應的拓撲值
TopologyValue string
// MatchNum 匹配的 Pod 數(shù)量
MatchNum int32
}
preFilterState 中定義了3個屬性,在 PreFilter 處進行計算,在 Filter 中使用:
Constraints 用來保存定義的所有拓撲分布約束信息 TpKeyToCriticalPaths 是一個 map,以定義的拓撲 Key 為 Key,值是一個 criticalPaths指針,criticalPaths的定義不太好理解,是一個兩個長度的結(jié)構體數(shù)組,結(jié)構體里面保存的是定義的拓撲對應的 Value 值以及該拓撲下匹配的 Pod 數(shù)量,而且需要注意的是這個數(shù)組的第一個元素中匹配數(shù)量是最小的(其實這里定義一個結(jié)構體就可以,只是為了保證獲取到的是最小的匹配數(shù)量,就定義了兩個,第二個是用來臨時比較用的,真正有用的是第一個結(jié)構體)TpPairToMatchNum 同樣是一個 map,對應的 Key 是 topologyPair,這個類型其實就是一個拓撲對,Values 值就是這個拓撲對下匹配的 Pod 數(shù)
這里可能不是很好理解,我們用測試代碼中的一段測試用例來進行說明可能更好理解:
// pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go
{
name: "normal case with two spreadConstraints",
pod: st.MakePod().Name("p").Label("foo", "").
SpreadConstraint(1, "zone", v1.DoNotSchedule, fooSelector).
SpreadConstraint(1, "node", v1.DoNotSchedule, fooSelector).
Obj(),
nodes: []*v1.Node{
st.MakeNode().Name("node-a").Label("zone", "zone1").Label("node", "node-a").Obj(),
st.MakeNode().Name("node-b").Label("zone", "zone1").Label("node", "node-b").Obj(),
st.MakeNode().Name("node-x").Label("zone", "zone2").Label("node", "node-x").Obj(),
st.MakeNode().Name("node-y").Label("zone", "zone2").Label("node", "node-y").Obj(),
},
existingPods: []*v1.Pod{
st.MakePod().Name("p-a1").Node("node-a").Label("foo", "").Obj(),
st.MakePod().Name("p-a2").Node("node-a").Label("foo", "").Obj(),
st.MakePod().Name("p-b1").Node("node-b").Label("foo", "").Obj(),
st.MakePod().Name("p-y1").Node("node-y").Label("foo", "").Obj(),
st.MakePod().Name("p-y2").Node("node-y").Label("foo", "").Obj(),
st.MakePod().Name("p-y3").Node("node-y").Label("foo", "").Obj(),
st.MakePod().Name("p-y4").Node("node-y").Label("foo", "").Obj(),
},
want: &preFilterState{
Constraints: []topologySpreadConstraint{
{
MaxSkew: 1,
TopologyKey: "zone",
Selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
},
{
MaxSkew: 1,
TopologyKey: "node",
Selector: mustConvertLabelSelectorAsSelector(t, fooSelector),
},
},
TpKeyToCriticalPaths: map[string]*criticalPaths{
"zone": {{"zone1", 3}, {"zone2", 4}},
"node": {{"node-x", 0}, {"node-b", 1}},
},
TpPairToMatchNum: map[topologyPair]*int32{
{key: "zone", value: "zone1"}: pointer.Int32Ptr(3),
{key: "zone", value: "zone2"}: pointer.Int32Ptr(4),
{key: "node", value: "node-a"}: pointer.Int32Ptr(2),
{key: "node", value: "node-b"}: pointer.Int32Ptr(1),
{key: "node", value: "node-x"}: pointer.Int32Ptr(0),
{key: "node", value: "node-y"}: pointer.Int32Ptr(4),
},
},
}
理解了 preFilterState 的定義,接下來我們就可以來分析 calPreFilterState 函數(shù)的實現(xiàn)了:
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, error) {
// 獲取所有節(jié)點信息
allNodes, err := pl.sharedLister.NodeInfos().List()
if err != nil {
return nil, fmt.Errorf("listing NodeInfos: %v", err)
}
var constraints []topologySpreadConstraint
if len(pod.Spec.TopologySpreadConstraints) > 0 {
// 如果 Pod 中配置了 TopologySpreadConstraints,轉(zhuǎn)換成這里的 topologySpreadConstraint 對象
constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
if err != nil {
return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %v", err)
}
} else {
// 獲取默認配置的拓撲分布約束
constraints, err = pl.defaultConstraints(pod, v1.DoNotSchedule)
if err != nil {
return nil, fmt.Errorf("setting default hard topology spread constraints: %v", err)
}
}
// 沒有約束,直接返回
if len(constraints) == 0 {
return &preFilterState{}, nil
}
// 初始化 preFilterState 狀態(tài)
s := preFilterState{
Constraints: constraints,
TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
TpPairToMatchNum: make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)),
}
for _, n := range allNodes {
node := n.Node()
if node == nil {
klog.Error("node not found")
continue
}
// 如果定義了 NodeAffinity 或者 NodeSelector,則應該分布到這些過濾器的節(jié)點
if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
continue
}
// 保證現(xiàn)在的節(jié)點的標簽包含 Constraints 中的所有 topologyKeys
if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
continue
}
// 根據(jù)約束初始化拓撲對
for _, c := range constraints {
pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
s.TpPairToMatchNum[pair] = new(int32)
}
}
processNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
// 計算每一個拓撲對下匹配的 Pod 總數(shù)
for _, constraint := range constraints {
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
tpCount := s.TpPairToMatchNum[pair]
if tpCount == nil {
continue
}
// 計算約束的拓撲域中匹配的 Pod 數(shù)
count := countPodsMatchSelector(nodeInfo.Pods, constraint.Selector, pod.Namespace)
atomic.AddInt32(tpCount, int32(count))
}
}
parallelize.Until(context.Background(), len(allNodes), processNode)
// 計算每個拓撲的最小匹配度(保證第一個Path下是最小的值)
for i := 0; i < len(constraints); i++ {
key := constraints[i].TopologyKey
s.TpKeyToCriticalPaths[key] = newCriticalPaths()
}
for pair, num := range s.TpPairToMatchNum {
s.TpKeyToCriticalPaths[pair.key].update(pair.value, *num)
}
return &s, nil
}
// update 函數(shù)就是來保證 criticalPaths 中的第一個元素是最小的 Pod 匹配數(shù)
func (p *criticalPaths) update(tpVal string, num int32) {
// first verify if `tpVal` exists or not
i := -1
if tpVal == p[0].TopologyValue {
i = 0
} else if tpVal == p[1].TopologyValue {
i = 1
}
if i >= 0 {
// `tpVal` exists
p[i].MatchNum = num
if p[0].MatchNum > p[1].MatchNum {
// swap paths[0] and paths[1]
p[0], p[1] = p[1], p[0]
}
} else {
// `tpVal` doesn't exist
if num < p[0].MatchNum {
// update paths[1] with paths[0]
p[1] = p[0]
// update paths[0]
p[0].TopologyValue, p[0].MatchNum = tpVal, num
} else if num < p[1].MatchNum {
// update paths[1]
p[1].TopologyValue, p[1].MatchNum = tpVal, num
}
}
}
首先判斷 Pod 中是否定義了 TopologySpreadConstraint ,如果定義了就獲取轉(zhuǎn)換成 preFilterState 中的 Constraints,如果沒有定義需要查看是否為調(diào)度器配置了默認的拓撲分布約束,如果都沒有這就直接返回了。
然后循環(huán)所有的節(jié)點,先根據(jù) NodeAffinity 或者 NodeSelector 進行過濾,然后根據(jù)約束中定義的 topologyKeys 過濾節(jié)點。
接著計算每個節(jié)點下的拓撲對匹配的 Pod 數(shù)量,存入 TpPairToMatchNum 中,最后就是要把所有約束中匹配的 Pod 數(shù)量最?。ɑ蛏源螅┑姆湃?TpKeyToCriticalPaths 中保存起來。整個 preFilterState 保存下來傳遞到后續(xù)的插件中使用,比如在 filter 擴展點中同樣也注冊了這個插件,所以我們可以來查看下在 filter 中是如何實現(xiàn)的。
Filter
在 preFilter 階段將 Pod 拓撲分布約束的相關信息存入到了 CycleState 中,下面在 filter 階段中就可以來直接使用這些數(shù)據(jù)了:
// pkg/scheduler/framework/plugins/podtopologyspread/filtering.go
func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, "node not found")
}
// 獲取 preFilsterState
s, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
// 如果沒有拓撲匹配的數(shù)量或者沒有約束,則直接返回
if len(s.TpPairToMatchNum) == 0 || len(s.Constraints) == 0 {
return nil
}
podLabelSet := labels.Set(pod.Labels)
// 循環(huán)Pod設置的約束
for _, c := range s.Constraints {
tpKey := c.TopologyKey // 拓撲Key
// 檢查當前節(jié)點是否有的對應拓撲Key
tpVal, ok := node.Labels[c.TopologyKey]
if !ok {
klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch)
}
// 如果拓撲約束的selector匹配pod本身標簽,則selfMatchNum=1
selfMatchNum := int32(0)
if c.Selector.Matches(podLabelSet) {
selfMatchNum = 1
}
// zone=zoneA zone=zoneB
// p p p p p
// 一個拓撲域
pair := topologyPair{key: tpKey, value: tpVal}
// 獲取指定拓撲key的路徑匹配數(shù)量
// [{zoneB 2}]
paths, ok := s.TpKeyToCriticalPaths[tpKey]
if !ok {
klog.Errorf("internal error: get paths from key %q of %#v", tpKey, s.TpKeyToCriticalPaths)
continue
}
// 獲取最小匹配數(shù)量
minMatchNum := paths[0].MatchNum
matchNum := int32(0)
// 獲取當前節(jié)點所在的拓撲域匹配的Pod數(shù)量
if tpCount := s.TpPairToMatchNum[pair]; tpCount != nil {
matchNum = *tpCount
}
// 如果匹配的Pod數(shù)量 + 1或者0 - 最小的匹配數(shù)量 > MaxSkew
// 則證明不滿足約束條件
skew := matchNum + selfMatchNum - minMatchNum
if skew > c.MaxSkew {
klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: MatchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.MaxSkew)
return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch)
}
}
return nil
}
首先通過 CycleState 獲取 preFilterState ,如果沒有配置約束或者拓撲對匹配數(shù)量為0就直接返回了。
然后循環(huán)定義的拓撲約束,先檢查當前節(jié)點是否有對應的 TopologyKey,沒有就返回錯誤,然后判斷拓撲對的分布程度是否大于 MaxSkew,判斷方式為拓撲中匹配的 Pod 數(shù)量 + 1/0(如果 Pod 本身也匹配則為1) - 最小的 Pod 匹配數(shù)量 > MaxSkew ,這個也是前面我們在關于 Pod 拓撲分布約束中的 maxSkew 的含義描述的意思**。**
優(yōu)選
PodTopologySpread 除了在預選階段會用到,在打分階段其實也會用到,在默認的插件注冊函數(shù)中可以看到:
func getDefaultConfig() *schedulerapi.Plugins {
return &schedulerapi.Plugins{
......
PreFilter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: podtopologyspread.Name},
......
},
},
Filter: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: podtopologyspread.Name},
......
},
},
......
PreScore: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
{Name: podtopologyspread.Name},
......
},
},
Score: &schedulerapi.PluginSet{
Enabled: []schedulerapi.Plugin{
// Weight is doubled because:
// - This is a score coming from user preference.
// - It makes its signal comparable to NodeResourcesLeastAllocated.
{Name: podtopologyspread.Name, Weight: 2},
......
},
},
......
}
}
PreScore 與 Score
同樣首先需要調(diào)用 PreScore 函數(shù)進行打分前的一些準備,把打分的數(shù)據(jù)存儲起來:
// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go
// preScoreState 在 PreScore 時計算,在 Score 時使用。
type preScoreState struct {
// 定義的約束
Constraints []topologySpreadConstraint
// IgnoredNodes 是一組 miss 掉 Constraints[*].topologyKey 的節(jié)點名稱
IgnoredNodes sets.String
// TopologyPairToPodCounts 以 topologyPair 為鍵,以匹配的 Pod 數(shù)量為值
TopologyPairToPodCounts map[topologyPair]*int64
// TopologyNormalizingWeight 是我們給每個拓撲的計數(shù)的權重
// 這使得較小的拓撲的 Pod 數(shù)不會被較大的稀釋
TopologyNormalizingWeight []float64
}
// initPreScoreState 迭代 "filteredNodes" 來過濾掉沒有設置 topologyKey 的節(jié)點,并進行初始化:
// 1) s.TopologyPairToPodCounts: 以符合條件的拓撲對和節(jié)點名稱為鍵
// 2) s.IgnoredNodes: 不應得分的節(jié)點集合
// 3) s.TopologyNormalizingWeight: 根據(jù)拓撲結(jié)構中的數(shù)值數(shù)量給予每個約束的權重
func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node) error {
var err error
// 將 Pod 或者默認定義的約束轉(zhuǎn)換到 Constraints 中
if len(pod.Spec.TopologySpreadConstraints) > 0 {
s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway)
if err != nil {
return fmt.Errorf("obtaining pod's soft topology spread constraints: %v", err)
}
} else {
s.Constraints, err = pl.defaultConstraints(pod, v1.ScheduleAnyway)
if err != nil {
return fmt.Errorf("setting default soft topology spread constraints: %v", err)
}
}
if len(s.Constraints) == 0 {
return nil
}
topoSize := make([]int, len(s.Constraints))
// 循環(huán)過濾節(jié)點得到的所有節(jié)點
for _, node := range filteredNodes {
if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
// 后面打分時,沒有全部所需 topologyKeys 的節(jié)點會被忽略
s.IgnoredNodes.Insert(node.Name)
continue
}
// 循環(huán)約束條件
for i, constraint := range s.Constraints {
if constraint.TopologyKey == v1.LabelHostname {
continue
}
// 拓撲對 初始化
pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
if s.TopologyPairToPodCounts[pair] == nil {
s.TopologyPairToPodCounts[pair] = new(int64)
topoSize[i]++ // 拓撲對數(shù)量+1
}
}
}
s.TopologyNormalizingWeight = make([]float64, len(s.Constraints))
for i, c := range s.Constraints {
sz := topoSize[i] // 拓撲約束數(shù)量
if c.TopologyKey == v1.LabelHostname {
// 如果 TopologyKey 是 Hostname 標簽
sz = len(filteredNodes) - len(s.IgnoredNodes)
}
// 計算拓撲約束的權重
s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
}
return nil
}
// topologyNormalizingWeight 根據(jù)拓撲存在的值的數(shù)量,計算拓撲的權重。
// 由于<size>至少為1(所有通過 Filters 的節(jié)點都在同一個拓撲結(jié)構中)
// 而k8s支持5k個節(jié)點,所以結(jié)果在區(qū)間<1.09,8.52>。
//
// 注意:當沒有節(jié)點具有所需的拓撲結(jié)構時,<size> 也可以為0
// 然而在這種情況下,我們并不關心拓撲結(jié)構的權重
// 因為我們對所有節(jié)點都返回0分。
func topologyNormalizingWeight(size int) float64 {
return math.Log(float64(size + 2))
}
// PreScore 構建寫入 CycleState 用于后面的 Score 和 NormalizeScore 使用
func (pl *PodTopologySpread) PreScore(
ctx context.Context,
cycleState *framework.CycleState,
pod *v1.Pod,
filteredNodes []*v1.Node,
) *framework.Status {
// 獲取所有節(jié)點
allNodes, err := pl.sharedLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("error when getting all nodes: %v", err))
}
// 過濾后的節(jié)點或者當前沒有節(jié)點,表示沒有節(jié)點用于打分
if len(filteredNodes) == 0 || len(allNodes) == 0 {
return nil
}
// 初始化 preScoreState 狀態(tài)
state := &preScoreState{
IgnoredNodes: sets.NewString(),
TopologyPairToPodCounts: make(map[topologyPair]*int64),
}
err = pl.initPreScoreState(state, pod, filteredNodes)
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("error when calculating preScoreState: %v", err))
}
// 如果傳入的 pod 沒有軟拓撲傳播約束,則返回
if len(state.Constraints) == 0 {
cycleState.Write(preScoreStateKey, state)
return nil
}
processAllNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
if node == nil {
return
}
// (1) `node`應滿足傳入 pod 的 NodeSelector/NodeAffinity
// (2) 所有的 topologyKeys 都需要存在于`node`中。
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
!nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
return
}
for _, c := range state.Constraints {
// 拓撲對
pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
// 如果當前拓撲對沒有與任何候選節(jié)點相關聯(lián),則繼續(xù)避免不必要的計算
// 每個節(jié)點的計數(shù)也被跳過,因為它們是在 Score 期間進行的
tpCount := state.TopologyPairToPodCounts[pair]
if tpCount == nil {
continue
}
// 計算節(jié)點上匹配的所有 Pod 數(shù)量
count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
atomic.AddInt64(tpCount, int64(count))
}
}
parallelize.Until(ctx, len(allNodes), processAllNode)
cycleState.Write(preScoreStateKey, state)
return nil
}
上面的處理邏輯整體比較簡單,最重要的是計算每個拓撲約束的權重,這樣才方便后面打分的時候計算分數(shù),存入到 CycleState 后就可以了來查看具體的 Score 函數(shù)的實現(xiàn)了:
// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go
// 在 Score 擴展點調(diào)用
func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
if err != nil || nodeInfo.Node() == nil {
return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
}
node := nodeInfo.Node()
s, err := getPreScoreState(cycleState)
if err != nil {
return 0, framework.NewStatus(framework.Error, err.Error())
}
// 如果該節(jié)點不合格,則返回
if s.IgnoredNodes.Has(node.Name) {
return 0, nil
}
// 每出現(xiàn)一個 <pair>,當前節(jié)點就會得到一個 <matchSum> 的分數(shù)。
// 而我們將<matchSum>相加,作為這個節(jié)點的分數(shù)返回。
var score float64
for i, c := range s.Constraints {
if tpVal, ok := node.Labels[c.TopologyKey]; ok {
var cnt int64
if c.TopologyKey == v1.LabelHostname {
// 如果 TopologyKey 是 Hostname 則 cnt 為節(jié)點上匹配約束的 selector 的 Pod 數(shù)量
cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
} else {
// 拓撲對下匹配的 Pod 數(shù)量
pair := topologyPair{key: c.TopologyKey, value: tpVal}
cnt = *s.TopologyPairToPodCounts[pair]
}
// 計算當前節(jié)點所得分數(shù)
score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
}
}
return int64(score), nil
}
// scoreForCount 根據(jù)拓撲域中匹配的豆莢數(shù)量、約束的maxSkew和拓撲權重計算得分。
// `maxSkew-1`加到分數(shù)中,這樣拓撲域之間的差異就會被淡化,控制分數(shù)對偏斜的容忍度。
func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
return float64(cnt)*tpWeight + float64(maxSkew-1)
}
在 Score 階段就是為當前的節(jié)點去計算一個分數(shù),這個分數(shù)就是通過拓撲對下匹配的 Pod 數(shù)量和對應權重的結(jié)果得到的一個分數(shù),另外在計算分數(shù)的時候還加上了 maxSkew-1,這樣可以淡化拓撲域之間的差異。
NormalizeScore
當所有節(jié)點的分數(shù)計算完成后,還需要調(diào)用 NormalizeScore 擴展插件:
// pkg/scheduler/framework/plugins/podtopologyspread/scoring.go
// NormalizeScore 在對所有節(jié)點打分過后調(diào)用
func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
s, err := getPreScoreState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
if s == nil {
return nil
}
// 計算 <minScore> and <maxScore>
var minScore int64 = math.MaxInt64
var maxScore int64
for _, score := range scores {
if s.IgnoredNodes.Has(score.Name) {
continue
}
if score.Score < minScore {
minScore = score.Score
}
if score.Score > maxScore {
maxScore = score.Score
}
}
// 循環(huán) scores({node score}集合)
for i := range scores {
nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
node := nodeInfo.Node()
// 節(jié)點被忽略了,分數(shù)記為0
if s.IgnoredNodes.Has(node.Name) {
scores[i].Score = 0
continue
}
// 如果 maxScore 為0,指定當前節(jié)點的分數(shù)為 MaxNodeScore
if maxScore == 0 {
scores[i].Score = framework.MaxNodeScore
continue
}
// 計算當前節(jié)點分數(shù)
s := scores[i].Score
scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
}
return nil
}
NormalizeScore 擴展是在 Score 擴展執(zhí)行完成后,為每個 ScorePlugin 并行運行 NormalizeScore 方法,然后并為每個 ScorePlugin 應用評分默認權重,然后總結(jié)所有插件調(diào)用過后的分數(shù),最后選擇一個分數(shù)最高的節(jié)點。
到這里我們就完成了對 PodTopologySpread 的實現(xiàn)分析,我們利用該特性可以實現(xiàn)對 Pod 更加細粒度的控制,我們可以把 Pod 分布到不同的拓撲域,從而實現(xiàn)高可用性,這也有助于工作負載的滾動更新和平穩(wěn)地擴展副本。
不過如果對 Deployment 進行縮容操作可能會導致 Pod 的分布不均衡,此外具有污點的節(jié)點上的 Pods 也會被統(tǒng)計到。
K8S 進階訓練營
點擊屏末 | 閱讀原文 | 即刻學習

