<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Pod 拓撲分布約束使用及調(diào)度原理

          共 37303字,需瀏覽 75分鐘

           ·

          2021-03-26 19:06

          在 k8s 集群調(diào)度中,“親和性”相關的概念本質(zhì)上都是控制 Pod 如何被調(diào)度 - 堆疊或是打散。目前 k8s 提供了 podAffinity 以及 podAntiAffinity 兩個特性對 Pod 在不同拓撲域的分布進行了一些控制,podAffinity 可以將無數(shù)個 Pod 調(diào)度到特定的某一個拓撲域,這是堆疊的體現(xiàn);podAntiAffinity 則可以控制一個拓撲域只存在一個 Pod,這是打散的體現(xiàn)。但這兩種情況都太極端了,在不少場景下都無法達到理想的效果,例如為了實現(xiàn)容災和高可用,將業(yè)務 Pod 盡可能均勻的分布在不同可用區(qū)就很難實現(xiàn)。

          PodTopologySpread 特性的提出正是為了對 Pod 的調(diào)度分布提供更精細的控制,以提高服務可用性以及資源利用率,PodTopologySpreadEvenPodsSpread 特性門所控制,在 v1.16 版本第一次發(fā)布,并在 v1.18 版本進入 beta 階段默認啟用。再了解這個插件是如何實現(xiàn)之前,我們首先需要搞清楚這個特性是如何使用的。

          使用規(guī)范

          在 Pod 的 Spec 規(guī)范中新增了一個 topologySpreadConstraints 字段:

          spec:
            topologySpreadConstraints:
            - maxSkew: <integer>
              topologyKey: <string>
              whenUnsatisfiable: <string>
              labelSelector: <object>

          由于這個新增的字段是在 Pod spec 層面添加,因此更高層級的控制 (Deployment、DaemonSet、StatefulSet) 也能使用 PodTopologySpread 功能。

          讓我們結(jié)合上圖來理解 topologySpreadConstraints 中各個字段的含義和作用:

          • labelSelector: 用來查找匹配的 Pod,我們能夠計算出每個拓撲域中匹配該 label selector 的 Pod 數(shù)量,在上圖中,假如 label selector 是 app:foo,那么 zone1 的匹配個數(shù)為 2, zone2 的匹配個數(shù)為 0。
          • topologyKey: 是 Node label 的 key,如果兩個 Node 的 label 同時具有該 key 并且 label 值相同,就說它們在同一個拓撲域。在上圖中,指定 topologyKey 為 zone, 具有 zone=zone1  標簽的 Node 被分在一個拓撲域,具有 zone=zone2 標簽的 Node 被分在另一個拓撲域。
          • maxSkew: 描述了 Pod 在不同拓撲域中不均勻分布的最大程度,maxSkew 的取值必須大于 0。每個拓撲域都有一個 skew,計算的公式是: skew[i] = 拓撲域[i]中匹配的 Pod 個數(shù) - min{其他拓撲域中匹配的 Pod 個數(shù)}。在上圖中,我們新建一個帶有 app=foo 標簽的 Pod:
            • 如果該 Pod 被調(diào)度到 zone1,那么 zone1 中 Node 的 skew 值變?yōu)?3,zone2 中 Node 的 skew 值變?yōu)?0 (zone1 有 3 個匹配的 Pod,zone2 有 0 個匹配的 Pod )
            • 如果該 Pod 被調(diào)度到 zone2,那么 zone1 中 Node 的 skew 值變?yōu)?1,zone2 中 Node 的 skew 值變?yōu)?0 (zone2 有 1 個匹配的 Pod,擁有全局最小匹配 Pod 數(shù)的拓撲域正是 zone2 自己 )
          • whenUnsatisfiable: 描述了如果 Pod 不滿足分布約束條件該采取何種策略:
            • DoNotSchedule (默認) 告訴調(diào)度器不要調(diào)度該 Pod,因此也可以叫作硬策略;
            • ScheduleAnyway 告訴調(diào)度器根據(jù)每個 Node 的 skew 值打分排序后仍然調(diào)度,因此也可以叫作軟策略。

          下面我們用兩個實際的示例來進一步說明。

          單個 TopologySpreadConstraint

          假設你擁有一個 4 節(jié)點集群,其中標記為 foo:bar 的 3 個 Pod 分別位于 node1、node2 和 node3 中:

          如果希望新來的 Pod 均勻分布在現(xiàn)有的可用區(qū)域,則可以按如下設置其約束:

          kind: Pod
          apiVersion: v1
          metadata:
            name: mypod
            labels:
              foo: bar
          spec:
            topologySpreadConstraints:
            - maxSkew: 1
              topologyKey: zone
              whenUnsatisfiable: DoNotSchedule
              labelSelector:
                matchLabels:
                  foo: bar
            containers:
            - name: pause
              image: k8s.gcr.io/pause:3.1

          topologyKey: zone 意味著均勻分布將只應用于存在標簽鍵值對為 zone:<any value> 的節(jié)點。 whenUnsatisfiable: DoNotSchedule 告訴調(diào)度器如果新的 Pod 不滿足約束,則不可調(diào)度。如果調(diào)度器將新的 Pod 放入 "zoneA",Pods 分布將變?yōu)?[3, 1],因此實際的偏差為 2(3 - 1),這違反了 maxSkew: 1 的約定。此示例中,新 Pod 只能放置在 "zoneB" 上:

          或者

          你可以調(diào)整 Pod 約束以滿足各種要求:

          • 將 maxSkew 更改為更大的值,比如 "2",這樣新的 Pod 也可以放在 "zoneA" 上。
          • 將 topologyKey 更改為 "node",以便將 Pod 均勻分布在節(jié)點上而不是區(qū)域中。在上面的例子中,如果 maxSkew 保持為 "1",那么傳入的 Pod 只能放在 "node4" 上。
          • 將 whenUnsatisfiable: DoNotSchedule 更改為 whenUnsatisfiable: ScheduleAnyway, 以確保新的 Pod 可以被調(diào)度。

          多個 TopologySpreadConstraint

          上面是單個 Pod 拓撲分布約束的情況,下面的例子建立在前面例子的基礎上來對多個 Pod 拓撲分布約束進行說明。假設你擁有一個 4 節(jié)點集群,其中 3 個標記為 foo:bar 的 Pod 分別位于 node1、node2 和 node3 上:

          可以使用 2 個 TopologySpreadConstraint 來控制 Pod 在 區(qū)域和節(jié)點兩個維度上的分布:

          # two-constraints.yaml
          kind: Pod
          apiVersion: v1
          metadata:
            name: mypod
            labels:
              foo: bar
          spec:
            topologySpreadConstraints:
            - maxSkew: 1
              topologyKey: zone
              whenUnsatisfiable: DoNotSchedule
              labelSelector:
                matchLabels:
                  foo: bar
            - maxSkew: 1
              topologyKey: node
              whenUnsatisfiable: DoNotSchedule
              labelSelector:
                matchLabels:
                  foo: bar
            containers:
            - name: pause
              image: k8s.gcr.io/pause:3.1

          在這種情況下,為了匹配第一個約束,新的 Pod 只能放置在 "zoneB" 中;而在第二個約束中, 新的 Pod 只能放置在 "node4" 上,最后兩個約束的結(jié)果加在一起,唯一可行的選擇是放置 在 "node4" 上。

          多個約束之間可能存在沖突,假設有一個跨越 2 個區(qū)域的 3 節(jié)點集群:

          如果對集群應用 two-constraints.yaml,會發(fā)現(xiàn) "mypod" 處于 Pending 狀態(tài),這是因為為了滿足第一個約束,"mypod" 只能放在 "zoneB" 中,而第二個約束要求 "mypod" 只能放在 "node2" 上,Pod 調(diào)度無法滿足這兩種約束,所以就沖突了。

          為了克服這種情況,你可以增加 maxSkew 或修改其中一個約束,讓其使用 whenUnsatisfiable: ScheduleAnyway

          集群默認約束

          除了為單個 Pod 設置拓撲分布約束,也可以為集群設置默認的拓撲分布約束,默認拓撲分布約束在且僅在以下條件滿足 時才會應用到 Pod 上:

          • Pod 沒有在其 .spec.topologySpreadConstraints 設置任何約束;
          • Pod 隸屬于某個服務、副本控制器、ReplicaSet 或 StatefulSet。

          你可以在 調(diào)度方案(Schedulingg Profile)中將默認約束作為 PodTopologySpread 插件參數(shù)的一部分來進行設置。約束的設置采用和前面 Pod 中的規(guī)范一致,只是 labelSelector 必須為空。配置的示例可能看起來像下面這個樣子:

          apiVersion: kubescheduler.config.k8s.io/v1beta1
          kind: KubeSchedulerConfiguration

          profiles:
            - pluginConfig:
                - name: PodTopologySpread
                  args:
                    defaultConstraints:
                      - maxSkew: 1
                        topologyKey: topology.kubernetes.io/zone
                        whenUnsatisfiable: ScheduleAnyway
                    defaultingType: List

          預選

          前面了解了如何使用 Pod 拓撲分布約束,接下來我們就可以來看下調(diào)度器中對應插件是如何實現(xiàn)的了。

          PreFilter

          首先也是去查看這個插件的 PreFilter 函數(shù)的實現(xiàn):

          // pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

          // 在 prefilter 擴展點調(diào)用
          func (pl *PodTopologySpread) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
           s, err := pl.calPreFilterState(pod)
           if err != nil {
            return framework.NewStatus(framework.Error, err.Error())
           }
           cycleState.Write(preFilterStateKey, s)
           return nil
          }

          這里最核心的就是 calPreFilterState 函數(shù),該函數(shù)用來計算描述如何在拓撲域上傳遞 Pod 的 preFilterState 狀態(tài)數(shù)據(jù),在了解該函數(shù)如何實現(xiàn)之前,我們需要先弄明白 preFilterState 的定義:

          // pkg/scheduler/framework/plugins/podtopologyspread/common.go

          type topologyPair struct {
           key   string
           value string
          }

          // 拓撲分布約束定義
          type topologySpreadConstraint struct {
           MaxSkew     int32
           TopologyKey string
           Selector    labels.Selector
          }

          // pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

          const preFilterStateKey = "PreFilter" + Name

          // preFilterState 在 PreFilter 處進行計算,在 Filter 中使用。
          // 它結(jié)合了 TpKeyToCriticalPaths 和 TpPairToMatchNum 來表示。
          // (1) 最少的 Pod 在每個分布約束上匹配的關鍵路徑。
          // (2) 在每個分布約束上匹配的 Pod 數(shù)量。
          // 一個 nil preFilterState 表示沒有設置(在 PreFilter 階段);
          // 一個空的 preFilterState 對象是一個合法的狀態(tài),在 PreFilter 階段進行設置。
          type preFilterState struct {
            // demo: {{
           //     MaxSkew:     1,
           //     TopologyKey: "zone",
           //     Selector:    ......,
           //    }}
           Constraints []topologySpreadConstraint
            // 這里記錄2條關鍵路徑,而不是所有的關鍵路徑。
            // criticalPaths[0].MatchNum 總是保持最小的匹配數(shù)。
            // criticalPaths[1].MatchNum 總是大于或等于criticalPaths[0].MatchNum,但不能保證是第2個最小匹配數(shù)。
            // demo: {
           //    "zone": {{"zone3", 0}, {"zone2", 2}},
           //    "node": {{"node-b", 1}, {"node-a", 2}},   
           //   }
           TpKeyToCriticalPaths map[string]*criticalPaths
           // TpPairToMatchNum 以 topologyPair 為 key,匹配的 Pods 數(shù)量為 value 值
            // demo: {key: "zone", value: "zone1"}: pointer.Int32Ptr(3),
           //    {key: "zone", value: "zone2"}: pointer.Int32Ptr(2),
           //    {key: "zone", value: "zone3"}: pointer.Int32Ptr(0),
            //       {key: "node", value: "node-a"}: pointer.Int32Ptr(2),
           //   {key: "node", value: "node-b"}: pointer.Int32Ptr(1),
           TpPairToMatchNum map[topologyPair]*int32
          }

          type criticalPaths [2]struct {
           // TopologyValue 拓撲Key對應的拓撲值
           TopologyValue string
           // MatchNum 匹配的 Pod 數(shù)量
           MatchNum int32
          }

          preFilterState 中定義了3個屬性,在 PreFilter 處進行計算,在 Filter 中使用:

          • Constraints 用來保存定義的所有拓撲分布約束信息
          • TpKeyToCriticalPaths 是一個 map,以定義的拓撲 Key 為 Key,值是一個 criticalPaths 指針,criticalPaths 的定義不太好理解,是一個兩個長度的結(jié)構體數(shù)組,結(jié)構體里面保存的是定義的拓撲對應的 Value 值以及該拓撲下匹配的 Pod 數(shù)量,而且需要注意的是這個數(shù)組的第一個元素中匹配數(shù)量是最小的(其實這里定義一個結(jié)構體就可以,只是為了保證獲取到的是最小的匹配數(shù)量,就定義了兩個,第二個是用來臨時比較用的,真正有用的是第一個結(jié)構體
          • TpPairToMatchNum 同樣是一個 map,對應的 Key 是 topologyPair,這個類型其實就是一個拓撲對,Values 值就是這個拓撲對下匹配的 Pod 數(shù)

          這里可能不是很好理解,我們用測試代碼中的一段測試用例來進行說明可能更好理解:

          // pkg/scheduler/framework/plugins/podtopologyspread/filtering_test.go

          {
            name: "normal case with two spreadConstraints",
            pod: st.MakePod().Name("p").Label("foo""").
             SpreadConstraint(1"zone", v1.DoNotSchedule, fooSelector).
             SpreadConstraint(1"node", v1.DoNotSchedule, fooSelector).
             Obj(),
            nodes: []*v1.Node{
             st.MakeNode().Name("node-a").Label("zone""zone1").Label("node""node-a").Obj(),
             st.MakeNode().Name("node-b").Label("zone""zone1").Label("node""node-b").Obj(),
             st.MakeNode().Name("node-x").Label("zone""zone2").Label("node""node-x").Obj(),
             st.MakeNode().Name("node-y").Label("zone""zone2").Label("node""node-y").Obj(),
            },
            existingPods: []*v1.Pod{
             st.MakePod().Name("p-a1").Node("node-a").Label("foo""").Obj(),
             st.MakePod().Name("p-a2").Node("node-a").Label("foo""").Obj(),
             st.MakePod().Name("p-b1").Node("node-b").Label("foo""").Obj(),
             st.MakePod().Name("p-y1").Node("node-y").Label("foo""").Obj(),
             st.MakePod().Name("p-y2").Node("node-y").Label("foo""").Obj(),
             st.MakePod().Name("p-y3").Node("node-y").Label("foo""").Obj(),
             st.MakePod().Name("p-y4").Node("node-y").Label("foo""").Obj(),
            },
            want: &preFilterState{
             Constraints: []topologySpreadConstraint{
              {
               MaxSkew:     1,
               TopologyKey: "zone",
               Selector:    mustConvertLabelSelectorAsSelector(t, fooSelector),
              },
              {
               MaxSkew:     1,
               TopologyKey: "node",
               Selector:    mustConvertLabelSelectorAsSelector(t, fooSelector),
              },
             },
             TpKeyToCriticalPaths: map[string]*criticalPaths{
              "zone": {{"zone1", 3}, {"zone2", 4}},
              "node": {{"node-x", 0}, {"node-b", 1}},
             },
             TpPairToMatchNum: map[topologyPair]*int32{
              {key: "zone", value: "zone1"}:  pointer.Int32Ptr(3),
              {key: "zone", value: "zone2"}:  pointer.Int32Ptr(4),
              {key: "node", value: "node-a"}: pointer.Int32Ptr(2),
              {key: "node", value: "node-b"}: pointer.Int32Ptr(1),
              {key: "node", value: "node-x"}: pointer.Int32Ptr(0),
              {key: "node", value: "node-y"}: pointer.Int32Ptr(4),
             },
            },
           }

          理解了 preFilterState  的定義,接下來我們就可以來分析 calPreFilterState 函數(shù)的實現(xiàn)了:

          // pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

          func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, error) {
           // 獲取所有節(jié)點信息
            allNodes, err := pl.sharedLister.NodeInfos().List()
           if err != nil {
            return nil, fmt.Errorf("listing NodeInfos: %v", err)
           }
           var constraints []topologySpreadConstraint
           if len(pod.Spec.TopologySpreadConstraints) > 0 {
            // 如果 Pod 中配置了 TopologySpreadConstraints,轉(zhuǎn)換成這里的 topologySpreadConstraint 對象
            constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.DoNotSchedule)
            if err != nil {
             return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %v", err)
            }
           } else {
              // 獲取默認配置的拓撲分布約束
            constraints, err = pl.defaultConstraints(pod, v1.DoNotSchedule)
            if err != nil {
             return nil, fmt.Errorf("setting default hard topology spread constraints: %v", err)
            }
           }
            // 沒有約束,直接返回
           if len(constraints) == 0 {
            return &preFilterState{}, nil
           }
            // 初始化 preFilterState 狀態(tài)
           s := preFilterState{
            Constraints:          constraints,
            TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
            TpPairToMatchNum:     make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)),
           }
           for _, n := range allNodes {
            node := n.Node()
            if node == nil {
             klog.Error("node not found")
             continue
            }
              // 如果定義了 NodeAffinity 或者 NodeSelector,則應該分布到這些過濾器的節(jié)點
            if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
             continue
            }
            // 保證現(xiàn)在的節(jié)點的標簽包含 Constraints 中的所有 topologyKeys
            if !nodeLabelsMatchSpreadConstraints(node.Labels, constraints) {
             continue
            }
              // 根據(jù)約束初始化拓撲對
            for _, c := range constraints {
             pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
             s.TpPairToMatchNum[pair] = new(int32)
            }
           }
           
           processNode := func(i int) {
            nodeInfo := allNodes[i]
            node := nodeInfo.Node()
              // 計算每一個拓撲對下匹配的 Pod 總數(shù)
            for _, constraint := range constraints {
             pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
                tpCount := s.TpPairToMatchNum[pair]
             if tpCount == nil {
              continue
             }
                // 計算約束的拓撲域中匹配的 Pod 數(shù)
             count := countPodsMatchSelector(nodeInfo.Pods, constraint.Selector, pod.Namespace)
             atomic.AddInt32(tpCount, int32(count))
            }
           }
           parallelize.Until(context.Background(), len(allNodes), processNode)

           // 計算每個拓撲的最小匹配度(保證第一個Path下是最小的值)
           for i := 0; i < len(constraints); i++ {
            key := constraints[i].TopologyKey
            s.TpKeyToCriticalPaths[key] = newCriticalPaths()
           }
           for pair, num := range s.TpPairToMatchNum {
            s.TpKeyToCriticalPaths[pair.key].update(pair.value, *num)
           }

           return &s, nil
          }

          // update 函數(shù)就是來保證 criticalPaths 中的第一個元素是最小的 Pod 匹配數(shù)
          func (p *criticalPaths) update(tpVal string, num int32) {
           // first verify if `tpVal` exists or not
           i := -1
           if tpVal == p[0].TopologyValue {
            i = 0
           } else if tpVal == p[1].TopologyValue {
            i = 1
           }
           if i >= 0 {
            // `tpVal` exists
            p[i].MatchNum = num
            if p[0].MatchNum > p[1].MatchNum {
             // swap paths[0] and paths[1]
             p[0], p[1] = p[1], p[0]
            }
           } else {
            // `tpVal` doesn't exist
            if num < p[0].MatchNum {
             // update paths[1] with paths[0]
             p[1] = p[0]
             // update paths[0]
             p[0].TopologyValue, p[0].MatchNum = tpVal, num
            } else if num < p[1].MatchNum {
             // update paths[1]
             p[1].TopologyValue, p[1].MatchNum = tpVal, num
            }
           }
          }

          首先判斷 Pod 中是否定義了 TopologySpreadConstraint ,如果定義了就獲取轉(zhuǎn)換成 preFilterState 中的 Constraints,如果沒有定義需要查看是否為調(diào)度器配置了默認的拓撲分布約束,如果都沒有這就直接返回了。

          然后循環(huán)所有的節(jié)點,先根據(jù) NodeAffinity 或者 NodeSelector 進行過濾,然后根據(jù)約束中定義的 topologyKeys 過濾節(jié)點。

          接著計算每個節(jié)點下的拓撲對匹配的 Pod 數(shù)量,存入 TpPairToMatchNum 中,最后就是要把所有約束中匹配的 Pod 數(shù)量最?。ɑ蛏源螅┑姆湃?TpKeyToCriticalPaths 中保存起來。整個 preFilterState 保存下來傳遞到后續(xù)的插件中使用,比如在 filter 擴展點中同樣也注冊了這個插件,所以我們可以來查看下在 filter 中是如何實現(xiàn)的。

          Filter

          在 preFilter 階段將 Pod 拓撲分布約束的相關信息存入到了 CycleState  中,下面在 filter 階段中就可以來直接使用這些數(shù)據(jù)了:

          // pkg/scheduler/framework/plugins/podtopologyspread/filtering.go

          func (pl *PodTopologySpread) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
           node := nodeInfo.Node()
           if node == nil {
            return framework.NewStatus(framework.Error, "node not found")
           }
            // 獲取 preFilsterState
           s, err := getPreFilterState(cycleState)
           if err != nil {
            return framework.NewStatus(framework.Error, err.Error())
           }
           // 如果沒有拓撲匹配的數(shù)量或者沒有約束,則直接返回
           if len(s.TpPairToMatchNum) == 0 || len(s.Constraints) == 0 {
            return nil
           }
            
           podLabelSet := labels.Set(pod.Labels)
            // 循環(huán)Pod設置的約束
           for _, c := range s.Constraints {
            tpKey := c.TopologyKey  // 拓撲Key
              // 檢查當前節(jié)點是否有的對應拓撲Key
            tpVal, ok := node.Labels[c.TopologyKey]  
            if !ok {
             klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
             return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNodeLabelNotMatch)
            }
              // 如果拓撲約束的selector匹配pod本身標簽,則selfMatchNum=1
            selfMatchNum := int32(0)
            if c.Selector.Matches(podLabelSet) {
             selfMatchNum = 1
            }
              // zone=zoneA   zone=zoneB
              //    p p p        p p
              // 一個拓撲域
            pair := topologyPair{key: tpKey, value: tpVal}
              // 獲取指定拓撲key的路徑匹配數(shù)量
              // [{zoneB 2}]
            paths, ok := s.TpKeyToCriticalPaths[tpKey]
            if !ok {
             klog.Errorf("internal error: get paths from key %q of %#v", tpKey, s.TpKeyToCriticalPaths)
             continue
            }
            
            // 獲取最小匹配數(shù)量
            minMatchNum := paths[0].MatchNum
            matchNum := int32(0)
              // 獲取當前節(jié)點所在的拓撲域匹配的Pod數(shù)量
            if tpCount := s.TpPairToMatchNum[pair]; tpCount != nil {
             matchNum = *tpCount
            }
              // 如果匹配的Pod數(shù)量 + 1或者0 - 最小的匹配數(shù)量 > MaxSkew
              // 則證明不滿足約束條件
            skew := matchNum + selfMatchNum - minMatchNum
            if skew > c.MaxSkew {
             klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: MatchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.MaxSkew)
             return framework.NewStatus(framework.Unschedulable, ErrReasonConstraintsNotMatch)
            }
           }

           return nil
          }

          首先通過 CycleState 獲取 preFilterState ,如果沒有配置約束或者拓撲對匹配數(shù)量為0就直接返回了。

          然后循環(huán)定義的拓撲約束,先檢查當前節(jié)點是否有對應的 TopologyKey,沒有就返回錯誤,然后判斷拓撲對的分布程度是否大于 MaxSkew,判斷方式為拓撲中匹配的 Pod 數(shù)量 + 1/0(如果 Pod 本身也匹配則為1) - 最小的 Pod 匹配數(shù)量 > MaxSkew ,這個也是前面我們在關于 Pod 拓撲分布約束中的 maxSkew 的含義描述的意思**。**

          優(yōu)選

          PodTopologySpread 除了在預選階段會用到,在打分階段其實也會用到,在默認的插件注冊函數(shù)中可以看到:

          func getDefaultConfig() *schedulerapi.Plugins {
           return &schedulerapi.Plugins{
            ......
            PreFilter: &schedulerapi.PluginSet{
             Enabled: []schedulerapi.Plugin{
              {Name: podtopologyspread.Name},
              ......
             },
            },
            Filter: &schedulerapi.PluginSet{
             Enabled: []schedulerapi.Plugin{
              {Name: podtopologyspread.Name},
              ......
             },
            },
            ......
            PreScore: &schedulerapi.PluginSet{
             Enabled: []schedulerapi.Plugin{
              {Name: podtopologyspread.Name},
              ......
             },
            },
            Score: &schedulerapi.PluginSet{
             Enabled: []schedulerapi.Plugin{
              // Weight is doubled because:
              // - This is a score coming from user preference.
              // - It makes its signal comparable to NodeResourcesLeastAllocated.
              {Name: podtopologyspread.Name, Weight: 2},
              ......
             },
            },
            ......
           }
          }

          PreScore 與 Score

          同樣首先需要調(diào)用 PreScore 函數(shù)進行打分前的一些準備,把打分的數(shù)據(jù)存儲起來:

          // pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

          // preScoreState 在 PreScore 時計算,在 Score 時使用。
          type preScoreState struct {
            // 定義的約束
           Constraints []topologySpreadConstraint
           // IgnoredNodes 是一組 miss 掉 Constraints[*].topologyKey 的節(jié)點名稱
           IgnoredNodes sets.String
           // TopologyPairToPodCounts 以 topologyPair 為鍵,以匹配的 Pod 數(shù)量為值
           TopologyPairToPodCounts map[topologyPair]*int64
           // TopologyNormalizingWeight 是我們給每個拓撲的計數(shù)的權重
           // 這使得較小的拓撲的 Pod 數(shù)不會被較大的稀釋
           TopologyNormalizingWeight []float64
          }

          // initPreScoreState 迭代 "filteredNodes" 來過濾掉沒有設置 topologyKey 的節(jié)點,并進行初始化:
          // 1) s.TopologyPairToPodCounts: 以符合條件的拓撲對和節(jié)點名稱為鍵
          // 2) s.IgnoredNodes: 不應得分的節(jié)點集合
          // 3) s.TopologyNormalizingWeight: 根據(jù)拓撲結(jié)構中的數(shù)值數(shù)量給予每個約束的權重
          func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*v1.Node) error {
           var err error
            // 將 Pod 或者默認定義的約束轉(zhuǎn)換到 Constraints 中
           if len(pod.Spec.TopologySpreadConstraints) > 0 {
            s.Constraints, err = filterTopologySpreadConstraints(pod.Spec.TopologySpreadConstraints, v1.ScheduleAnyway)
            if err != nil {
             return fmt.Errorf("obtaining pod's soft topology spread constraints: %v", err)
            }
           } else {
            s.Constraints, err = pl.defaultConstraints(pod, v1.ScheduleAnyway)
            if err != nil {
             return fmt.Errorf("setting default soft topology spread constraints: %v", err)
            }
           }
           if len(s.Constraints) == 0 {
            return nil
           }
           topoSize := make([]intlen(s.Constraints))
            // 循環(huán)過濾節(jié)點得到的所有節(jié)點
           for _, node := range filteredNodes {
            if !nodeLabelsMatchSpreadConstraints(node.Labels, s.Constraints) {
             // 后面打分時,沒有全部所需 topologyKeys 的節(jié)點會被忽略
             s.IgnoredNodes.Insert(node.Name)
             continue
            }
              // 循環(huán)約束條件
            for i, constraint := range s.Constraints {
             if constraint.TopologyKey == v1.LabelHostname {
              continue
             }
                // 拓撲對  初始化
             pair := topologyPair{key: constraint.TopologyKey, value: node.Labels[constraint.TopologyKey]}
             if s.TopologyPairToPodCounts[pair] == nil {
              s.TopologyPairToPodCounts[pair] = new(int64)
              topoSize[i]++  // 拓撲對數(shù)量+1
             }
            }
           }
            
           s.TopologyNormalizingWeight = make([]float64len(s.Constraints))
           for i, c := range s.Constraints {
            sz := topoSize[i]  // 拓撲約束數(shù)量
            if c.TopologyKey == v1.LabelHostname {
                // 如果 TopologyKey 是 Hostname 標簽
             sz = len(filteredNodes) - len(s.IgnoredNodes)
            }
              // 計算拓撲約束的權重
            s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
           }
           return nil
          }

          // topologyNormalizingWeight 根據(jù)拓撲存在的值的數(shù)量,計算拓撲的權重。
          // 由于<size>至少為1(所有通過 Filters 的節(jié)點都在同一個拓撲結(jié)構中)
          // 而k8s支持5k個節(jié)點,所以結(jié)果在區(qū)間<1.09,8.52>。
          //
          // 注意:當沒有節(jié)點具有所需的拓撲結(jié)構時,<size> 也可以為0
          // 然而在這種情況下,我們并不關心拓撲結(jié)構的權重
          // 因為我們對所有節(jié)點都返回0分。
          func topologyNormalizingWeight(size int) float64 {
           return math.Log(float64(size + 2))
          }

          // PreScore 構建寫入 CycleState 用于后面的 Score 和 NormalizeScore 使用
          func (pl *PodTopologySpread) PreScore(
           ctx context.Context,
           cycleState *framework.CycleState,
           pod *v1.Pod,
           filteredNodes []*v1.Node,
          )
           *framework.Status
           {
            // 獲取所有節(jié)點
           allNodes, err := pl.sharedLister.NodeInfos().List()
           if err != nil {
            return framework.NewStatus(framework.Error, fmt.Sprintf("error when getting all nodes: %v", err))
           }
            // 過濾后的節(jié)點或者當前沒有節(jié)點,表示沒有節(jié)點用于打分
           if len(filteredNodes) == 0 || len(allNodes) == 0 {
            return nil
           }
            // 初始化 preScoreState 狀態(tài)
           state := &preScoreState{
            IgnoredNodes:            sets.NewString(),
            TopologyPairToPodCounts: make(map[topologyPair]*int64),
           }
           err = pl.initPreScoreState(state, pod, filteredNodes)
           if err != nil {
            return framework.NewStatus(framework.Error, fmt.Sprintf("error when calculating preScoreState: %v", err))
           }

           // 如果傳入的 pod 沒有軟拓撲傳播約束,則返回
           if len(state.Constraints) == 0 {
            cycleState.Write(preScoreStateKey, state)
            return nil
           }

           processAllNode := func(i int) {
            nodeInfo := allNodes[i]
            node := nodeInfo.Node()
            if node == nil {
             return
            }
            // (1) `node`應滿足傳入 pod 的 NodeSelector/NodeAffinity
            // (2) 所有的 topologyKeys 都需要存在于`node`中。
            if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
             !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
             return
            }

            for _, c := range state.Constraints {
                // 拓撲對
             pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
             // 如果當前拓撲對沒有與任何候選節(jié)點相關聯(lián),則繼續(xù)避免不必要的計算
             // 每個節(jié)點的計數(shù)也被跳過,因為它們是在 Score 期間進行的
             tpCount := state.TopologyPairToPodCounts[pair]
             if tpCount == nil {
              continue
             }
                // 計算節(jié)點上匹配的所有 Pod 數(shù)量
             count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
             atomic.AddInt64(tpCount, int64(count))
            }
           }
           parallelize.Until(ctx, len(allNodes), processAllNode)

           cycleState.Write(preScoreStateKey, state)
           return nil
          }

          上面的處理邏輯整體比較簡單,最重要的是計算每個拓撲約束的權重,這樣才方便后面打分的時候計算分數(shù),存入到 CycleState 后就可以了來查看具體的 Score 函數(shù)的實現(xiàn)了:

          // pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

          // 在 Score 擴展點調(diào)用
          func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
           nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
           if err != nil || nodeInfo.Node() == nil {
            return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v, node is nil: %v", nodeName, err, nodeInfo.Node() == nil))
           }

           node := nodeInfo.Node()
           s, err := getPreScoreState(cycleState)
           if err != nil {
            return 0, framework.NewStatus(framework.Error, err.Error())
           }

           // 如果該節(jié)點不合格,則返回
           if s.IgnoredNodes.Has(node.Name) {
            return 0nil
           }

           // 每出現(xiàn)一個 <pair>,當前節(jié)點就會得到一個 <matchSum> 的分數(shù)。
            // 而我們將<matchSum>相加,作為這個節(jié)點的分數(shù)返回。
           var score float64
           for i, c := range s.Constraints {
            if tpVal, ok := node.Labels[c.TopologyKey]; ok {
             var cnt int64
             if c.TopologyKey == v1.LabelHostname {
                  // 如果 TopologyKey 是 Hostname 則 cnt 為節(jié)點上匹配約束的 selector 的 Pod 數(shù)量
              cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
             } else {
                  // 拓撲對下匹配的 Pod 數(shù)量
              pair := topologyPair{key: c.TopologyKey, value: tpVal}
              cnt = *s.TopologyPairToPodCounts[pair]
             }
                // 計算當前節(jié)點所得分數(shù)
             score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
            }
           }
           return int64(score), nil
          }

          // scoreForCount 根據(jù)拓撲域中匹配的豆莢數(shù)量、約束的maxSkew和拓撲權重計算得分。
          // `maxSkew-1`加到分數(shù)中,這樣拓撲域之間的差異就會被淡化,控制分數(shù)對偏斜的容忍度。
          func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
           return float64(cnt)*tpWeight + float64(maxSkew-1)
          }

          在 Score 階段就是為當前的節(jié)點去計算一個分數(shù),這個分數(shù)就是通過拓撲對下匹配的 Pod 數(shù)量和對應權重的結(jié)果得到的一個分數(shù),另外在計算分數(shù)的時候還加上了 maxSkew-1,這樣可以淡化拓撲域之間的差異。

          NormalizeScore

          當所有節(jié)點的分數(shù)計算完成后,還需要調(diào)用 NormalizeScore 擴展插件:

          // pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

          // NormalizeScore 在對所有節(jié)點打分過后調(diào)用
          func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
           s, err := getPreScoreState(cycleState)
           if err != nil {
            return framework.NewStatus(framework.Error, err.Error())
           }
           if s == nil {
            return nil
           }

           // 計算 <minScore> and <maxScore>
           var minScore int64 = math.MaxInt64
           var maxScore int64
           for _, score := range scores {
            if s.IgnoredNodes.Has(score.Name) {
             continue
            }
            if score.Score < minScore {
             minScore = score.Score
            }
            if score.Score > maxScore {
             maxScore = score.Score
            }
           }
            // 循環(huán) scores({node score}集合)
           for i := range scores {
            nodeInfo, err := pl.sharedLister.NodeInfos().Get(scores[i].Name)
            if err != nil {
             return framework.NewStatus(framework.Error, err.Error())
            }
            node := nodeInfo.Node()
              // 節(jié)點被忽略了,分數(shù)記為0
            if s.IgnoredNodes.Has(node.Name) {
             scores[i].Score = 0
             continue
            }
              // 如果 maxScore 為0,指定當前節(jié)點的分數(shù)為 MaxNodeScore
            if maxScore == 0 {
             scores[i].Score = framework.MaxNodeScore
             continue
            }
              // 計算當前節(jié)點分數(shù)
            s := scores[i].Score
            scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
           }
           return nil
          }

          NormalizeScore 擴展是在 Score 擴展執(zhí)行完成后,為每個 ScorePlugin 并行運行 NormalizeScore 方法,然后并為每個 ScorePlugin 應用評分默認權重,然后總結(jié)所有插件調(diào)用過后的分數(shù),最后選擇一個分數(shù)最高的節(jié)點。

          到這里我們就完成了對 PodTopologySpread 的實現(xiàn)分析,我們利用該特性可以實現(xiàn)對 Pod 更加細粒度的控制,我們可以把 Pod 分布到不同的拓撲域,從而實現(xiàn)高可用性,這也有助于工作負載的滾動更新和平穩(wěn)地擴展副本。

          不過如果對 Deployment 進行縮容操作可能會導致 Pod 的分布不均衡,此外具有污點的節(jié)點上的 Pods 也會被統(tǒng)計到。


          K8S 進階訓練營


           點擊屏末  | 即刻學習
          瀏覽 193
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91豆花视频在线资源 | 大香蕉人在线 | 手机看片欧美+日韩+国产 | 中文字幕在线字幕中文乱码区别 | 一级a免一级a做片免费 |