<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Calico 網(wǎng)絡(luò)策略深度解析

          共 5752字,需瀏覽 12分鐘

           ·

          2022-03-16 06:51


          本文主要探討 Calico 項目如何實現(xiàn) Kubernetes 的網(wǎng)絡(luò)策略(Network Policy)。

          網(wǎng)絡(luò)策略是一種以應(yīng)用為中心的結(jié)構(gòu),設(shè)置規(guī)則來指定 Pod 如何與各類網(wǎng)絡(luò)“實體”通信。

          NetworkPolicies are an application-centric construct which allow you to specify how a pod is allowed to communicate with various network “entities” (we use the word “entity” here to avoid overloading the more common terms such as “endpoints” and “services”, which have specific Kubernetes connotations) over the network.

          $?kubectl?create?deployment?--namespace=policy-demo?nginx?--image=nginx
          deployment.apps/nginx?created
          $?kubectl?create?-f?-?<kind:?NetworkPolicy
          apiVersion:?networking.k8s.io/v1
          metadata:
          ??name:?access-nginx
          ??namespace:?policy-demo
          spec:
          ??podSelector:
          ????matchLabels:
          ??????app:?nginx
          ??ingress:
          ????-?from:
          ??????-?podSelector:
          ??????????matchLabels:
          ????????????run:?access
          EOF
          $?kubectl?get?netpol?-n?policy-demo
          NAME???????????POD-SELECTOR???AGE
          access-nginx???app=nginx??????12s

          這條 NetworkPolicy 允許帶上 run: access 標(biāo)簽的 Pod 訪問帶上 app: nginx 標(biāo)簽的 Pod。

          Calico 部署完成后,Kubernetes 集群中每個節(jié)點都會運行一個名為 calico-node daemon 進程。

          manifest:https://projectcalico.docs.tigera.io/manifests/calico.yaml

          containers:
          ??#?Runs?calico-node?container?on?each?Kubernetes?node.?This
          ??#?container?programs?network?policy?and?routes?on?each
          ??#?host.
          ??-?name:?calico-node
          ????image:?docker.io/calico/node:v3.22.1
          ????envFrom:
          ????-?configMapRef:
          ????????#?Allow?KUBERNETES_SERVICE_HOST?and?KUBERNETES_SERVICE_PORT?to?be?overridden?for?eBPF?mode.
          ????????name:?kubernetes-services-endpoint
          ????????optional:?true
          ????env:
          ??????#?Use?Kubernetes?API?as?the?backing?datastore.
          ??????-?name:?DATASTORE_TYPE
          ????????value:?"kubernetes"
          ??????#?Wait?for?the?datastore.
          ??????-?name:?WAIT_FOR_DATASTORE
          ????????value:?"true"
          ??????#?Set?based?on?the?k8s?node?name.
          ??????-?name:?NODENAME
          ????????valueFrom:
          ??????????fieldRef:
          ????????????fieldPath:?spec.nodeName
          ??????#?Choose?the?backend?to?use.
          ??????-?name:?CALICO_NETWORKING_BACKEND
          ????????valueFrom:
          ??????????configMapKeyRef:
          ????????????name:?calico-config
          ????????????key:?calico_backend
          ??????#?Cluster?type?to?identify?the?deployment?type
          ??????-?name:?CLUSTER_TYPE
          ????????value:?"k8s,bgp"
          ??????#?Auto-detect?the?BGP?IP?address.
          ??????-?name:?IP
          ????????value:?"autodetect"
          ??????#?Enable?IPIP
          ??????-?name:?CALICO_IPV4POOL_IPIP
          ????????value:?"Always"
          ??????#?Enable?or?Disable?VXLAN?on?the?default?IP?pool.
          ??????-?name:?CALICO_IPV4POOL_VXLAN
          ????????value:?"Never"
          ??????#?Set?MTU?for?tunnel?device?used?if?ipip?is?enabled
          ??????-?name:?FELIX_IPINIPMTU
          ????????valueFrom:
          ??????????configMapKeyRef:
          ????????????name:?calico-config
          ????????????key:?veth_mtu
          ??????#?Set?MTU?for?the?VXLAN?tunnel?device.
          ??????-?name:?FELIX_VXLANMTU
          ????????valueFrom:
          ??????????configMapKeyRef:
          ????????????name:?calico-config
          ????????????key:?veth_mtu
          ??????#?Set?MTU?for?the?Wireguard?tunnel?device.
          ??????-?name:?FELIX_WIREGUARDMTU
          ????????valueFrom:
          ??????????configMapKeyRef:
          ????????????name:?calico-config
          ????????????key:?veth_mtu
          ??????#?The?default?IPv4?pool?to?create?on?startup?if?none?exists.?Pod?IPs?will?be
          ??????#?chosen?from?this?range.?Changing?this?value?after?installation?will?have
          ??????#?no?effect.?This?should?fall?within?`--cluster-cidr`.
          ??????#?-?name:?CALICO_IPV4POOL_CIDR
          ??????#???value:?"192.168.0.0/16"
          ??????#?Disable?file?logging?so?`kubectl?logs`?works.
          ??????-?name:?CALICO_DISABLE_FILE_LOGGING
          ????????value:?"true"
          ??????#?Set?Felix?endpoint?to?host?default?action?to?ACCEPT.
          ??????-?name:?FELIX_DEFAULTENDPOINTTOHOSTACTION
          ????????value:?"ACCEPT"
          ??????#?Disable?IPv6?on?Kubernetes.
          ??????-?name:?FELIX_IPV6SUPPORT
          ????????value:?"false"
          ??????-?name:?FELIX_HEALTHENABLED
          ????????value:?"true"

          無法從 DaemonSet 的定義得知 calico-node 啟動了什么,應(yīng)該是被封裝在鏡像中[1]了,所以我們在這里使用 ps 工具來查看 calico-node 相關(guān)進程的啟動命令:

          $?ps?-ef?|?grep?calico-node
          root?????10726?10718??2?06:44??????????00:00:16?calico-node?-felix
          root?????10727?10725??0?06:44??????????00:00:00?calico-node?-monitor-token
          root?????10728?10719??0?06:44??????????00:00:00?calico-node?-monitor-addresses
          root?????10729?10721??0?06:44??????????00:00:00?calico-node?-status-reporter
          root?????10731?10724??0?06:44??????????00:00:00?calico-node?-confd
          root?????10733?10720??0?06:44??????????00:00:00?calico-node?-allocate-tunnel-addrs
          root?????22100?10594??0?06:53?pts/0????00:00:00?grep?--color=auto?calico-node

          提前告知一下,監(jiān)控 Kubernetes 集群中 NetworkPolicy 對象的進程叫 felix:

          calico-node 進程的命令源碼文件為 https://github.com/projectcalico/calico/blob/v3.22.1/node/cmd/calico-node/main.go:

          var?runFelix?=?flagSet.Bool("felix",?false,?"Run?Felix")

          func?main()?{
          ????//?a?lot?of?code?here

          ????if?*version?{
          ????????fmt.Println(startup.VERSION)
          ????????os.Exit(0)
          ????}?else?if?*runFelix?{
          ????????logrus.SetFormatter(&logutils.Formatter{Component:?"felix"})
          ????????felix.Run("/etc/calico/felix.cfg",?buildinfo.GitVersion,?buildinfo.BuildDate,?buildinfo.GitRevision)
          ????}

          ????//?a?lot?of?code?here
          }

          我們繼續(xù)來看 calico-node felix 實例的啟動函數(shù) https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L88-L652:

          func?Run(configFile?string,?gitVersion?string,?buildDate?string,?gitRevision?string)?{
          ????//?a?lot?of?code?here
          ????backendClient,?err?=?backend.NewClient(datastoreConfig)
          ????if?err?!=?nil?{
          ????????log.WithError(err).Error("Failed?to?(re)connect?to?datastore")
          ????????time.Sleep(1?*?time.Second)
          ????????continue?configRetry
          ????}
          ????//?a?lot?of?code?here
          ????var?syncer?Startable
          ????if?typhaAddr?!=?""?{
          ????????//?a?lot?of?code?here
          ????}?else?{
          ????????//?Use?the?syncer?locally.
          ????????syncer?=?felixsyncer.New(backendClient,?datastoreConfig.Spec,?syncerToValidator,?configParams.IsLeader())
          ????}
          }

          因為部署 Calico 時選擇使用 Kubernetes 存儲數(shù)據(jù),這里的 backendClient 也就是一個與 apiserver 通信的 kubeclient。繼續(xù)往下看初始化 syncer 實例的 felixsyncer.New 方法 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/syncersv1/felixsyncer/felixsyncerv1.go#L28-L111:

          func?New(client?api.Client,?cfg?apiconfig.CalicoAPIConfigSpec,?callbacks?api.SyncerCallbacks,?isLeader?bool)?api.Syncer?{
          ????????if?cfg.DatastoreType?==?apiconfig.Kubernetes?{
          ????????????additionalTypes?=?append(additionalTypes,?watchersyncer.ResourceType{
          ????????????????ListInterface:???model.ResourceListOptions{Kind:?model.KindKubernetesNetworkPolicy},
          ????????????????UpdateProcessor:?updateprocessors.NewNetworkPolicyUpdateProcessor(),
          ????????????})
          ????????????additionalTypes?=?append(additionalTypes,?watchersyncer.ResourceType{
          ????????????????ListInterface:?model.ResourceListOptions{Kind:?model.KindKubernetesEndpointSlice},
          ????????????})
          ????????}

          ????return?watchersyncer.New(
          ????????client,
          ????????resourceTypes,
          ????????callbacks,
          ????)
          }

          返回一個 watcherSyncer[2] 結(jié)構(gòu)實例,這個結(jié)構(gòu)實現(xiàn)了 api.Syncer[3] 接口:

          type?watcherSyncer?struct?{
          ????status????????api.SyncStatus
          ????watcherCaches?[]*watcherCache
          ????results???????chan?interface{}
          ????numSynced?????int
          ????callbacks?????api.SyncerCallbacks
          ????wgwc??????????*sync.WaitGroup
          ????wgws??????????*sync.WaitGroup
          ????cancel????????context.CancelFunc
          }

          使用 Kubernetes 存儲數(shù)據(jù),除了 Calico 項目自定義的 CRD 資源(比如 GlobalNetworkPolicy)外,還會 watch Kubernetes 的 NetworkPolicy 資源。

          再回到 daemon.go 文件[4],實例化好 syncer 后下一步就是要啟動它了 https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L489-L493:

          ????if?syncer?!=?nil?{
          ????????log.Infof("Starting?the?datastore?Syncer")
          ????????syncer.Start()
          ????}

          繼續(xù)跳轉(zhuǎn)到 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchersyncer.go:

          func?(ws?*watcherSyncer)?Start()?{
          ????//?a?lot?of?code?here
          ????go?func()?{
          ????????ws.run(ctx)
          ????????log.Debug("Watcher?syncer?run?completed")
          ????}()
          }

          func?(ws?*watcherSyncer)?run(ctx?context.Context)?{
          ????for?_,?wc?:=?range?ws.watcherCaches?{
          ????????ws.wgwc.Add(1)
          ????????go?func(wc?*watcherCache)?{
          ????????????wc.run(ctx)
          ????????????ws.wgwc.Done()
          ????????}(wc)
          ????}
          }

          遍歷 watcherCache 切片,在 goroutine 中運行它們的 run 方法 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L76-L142:

          func?(wc?*watcherCache)?run(ctx?context.Context)?{
          ????//?a?lot?of?code?here
          mainLoop:
          ????for?{
          ????????if?wc.watch?==?nil?{
          ????????????//?The?watcher?will?be?nil?if?the?context?cancelled?during?a?resync.
          ????????????wc.logger.Debug("Watch?is?nil.?Returning")
          ????????????break?mainLoop
          ????????}
          ????????select?{
          ????????case?<-ctx.Done():
          ????????????wc.logger.Debug("Context?is?done.?Returning")
          ????????????wc.cleanExistingWatcher()
          ????????????break?mainLoop
          ????????case?event,?ok?:=?<-wc.watch.ResultChan():
          ????????????if?!ok?{
          ????????????????//?If?the?channel?is?closed?then?resync/recreate?the?watch.
          ????????????????wc.logger.Info("Watch?channel?closed?by?remote?-?recreate?watcher")
          ????????????????wc.resyncAndCreateWatcher(ctx)
          ????????????????continue
          ????????????}
          ????????????wc.logger.WithField("RC",?wc.watch.ResultChan()).Debug("Reading?event?from?results?channel")

          ????????????//?Handle?the?specific?event?type.
          ????????????switch?event.Type?{
          ????????????case?api.WatchAdded,?api.WatchModified:
          ????????????????kvp?:=?event.New
          ????????????????wc.handleWatchListEvent(kvp)
          ????????????case?api.WatchDeleted:
          ????????????????//?Nil?out?the?value?to?indicate?a?delete.
          ????????????????kvp?:=?event.Old
          ????????????????if?kvp?==?nil?{
          ????????????????????//?Bug,?we're?about?to?panic?when?we?hit?the?nil?pointer,?log?something?useful.
          ????????????????????wc.logger.WithField("watcher",?wc).WithField("event",?event).Panic("Deletion?event?without?old?value")
          ????????????????}
          ????????????????kvp.Value?=?nil
          ????????????????wc.handleWatchListEvent(kvp)
          ????????????case?api.WatchError:
          ????????????????//?Handle?a?WatchError.?This?error?triggered?from?upstream,?all?type
          ????????????????//?of?WatchError?are?treated?equally,log?the?Error?and?trigger?a?full?resync.?We?only?log?at?info
          ????????????????//?because?errors?may?occur?due?to?compaction?causing?revisions?to?no?longer?be?valid?-?in?this?case
          ????????????????//?we?simply?need?to?do?a?full?resync.
          ????????????????wc.logger.WithError(event.Error).Infof("Watch?error?received?from?Upstream")
          ????????????????wc.currentWatchRevision?=?"0"
          ????????????????wc.resyncAndCreateWatcher(ctx)
          ????????????default:
          ????????????????//?Unknown?event?type?-?not?much?we?can?do?other?than?log.
          ????????????????wc.logger.WithField("EventType",?event.Type).Errorf("Unknown?event?type?received?from?the?datastore")
          ????????????}
          ????????}
          ????}

          ???//?a?lot?of?code?here
          }

          當(dāng)監(jiān)控到資源創(chuàng)建、修改、刪除事件,并做出相應(yīng)的處理。此處代碼跳轉(zhuǎn)較多,就不貼了。比如我們在 Kubernetes 集群中創(chuàng)建了一個 NetworkPolicy 對象,就會向 NetworkPolicy 對應(yīng)的 watcherCacheresults channel 發(fā)送一條“更新”消息 https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L333-L372:

          ????wc.results?<-?[]api.Update{{
          ????????UpdateType:?api.UpdateTypeKVNew,
          ????????KVPair:?????*kvp,
          ????}}

          watcherCache 引用了 watcherSyncerresults channel https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchercache.go#L64-L74。

          最終會觸發(fā) watcherSyncer 初始化時就設(shè)置好的回調(diào) api.SyncerCallbacks https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L459:

          syncerToValidator?:=?calc.NewSyncerCallbacksDecoupler()

          此處相關(guān)代碼較多,就不詳細說了。


          在 felix 實例啟動時,會去實例化并啟動一個 AsyncCalcGraph 對象,用于動態(tài)計算 IP 組成員并“激活”策略 https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L531-L597:

          ????asyncCalcGraph?:=?calc.NewAsyncCalcGraph(
          ????????configParams.Copy(),?//?Copy?to?avoid?concurrent?access.
          ????????calcGraphClientChannels,
          ????????healthAggregator,
          ????)

          ????//?a?lot?of?code?here
          ????asyncCalcGraph.Start()

          asyncCalcGraph 啟動一個循環(huán)監(jiān)聽更新事件 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/async_calc_graph.go#L136-L194:

          func?(acg?*AsyncCalcGraph)?loop()?{
          ????for?{
          ????????select?{
          ????????case?update?:=?<-acg.inputEvents:
          ????????????switch?update?:=?update.(type)?{
          ????????????case?[]api.Update:
          ????????????????//?Update;?send?it?to?the?dispatcher.
          ????????????????log.Debug("Pulled?[]KVPair?off?channel")
          ????????????????for?i,?upd?:=?range?update?{
          ????????????????????//?Send?the?updates?individually?so?that?we?can?report?live?in?between
          ????????????????????//?each?update.??(The?dispatcher?sends?individual?updates?anyway?so?this?makes
          ????????????????????//?no?difference.)
          ????????????????????updStartTime?:=?time.Now()
          ????????????????????acg.AllUpdDispatcher.OnUpdates(update[i?:?i+1])?//?here
          ????????????????????summaryUpdateTime.Observe(time.Since(updStartTime).Seconds())
          ????????????????????//?Record?stats?for?the?number?of?messages?processed.
          ????????????????????typeName?:=?reflect.TypeOf(upd.Key).Name()
          ????????????????????count?:=?countUpdatesProcessed.WithLabelValues(typeName)
          ????????????????????count.Inc()
          ????????????????????acg.reportHealth()
          ????????????????}
          ????????????????//?a?lot?of?code?here
          ????????????}
          ????????}
          ????????acg.maybeFlush()
          ????}
          }

          我們接著來看 AllUpdDispatcher 成員,它在 `NewAsyncCalcGraph`[5] 函數(shù)中被初始化,跳轉(zhuǎn)到 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L114-L408:

          • allUpdDispatcher := dispatcher.NewDispatcher()?實例化一個 _allUpdDispatcher_ 對象[6]
          • 將各種資源相關(guān)的 Dispatcher 注冊至 allUpdDispatcher
            • local endpoints[7]
            • service index[8]
            • ipset member[9]
            • endpoint policy resolver[10]

          本文只討論 NetworkPolicy 相關(guān)的部分,來到 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/policy_resolver.go 文件:

          func?(pr?*PolicyResolver)?OnUpdate(update?api.Update)?(filterOut?bool)?{
          ????policiesDirty?:=?false
          ????switch?key?:=?update.Key.(type)?{
          ????case?model.WorkloadEndpointKey,?model.HostEndpointKey:
          ????????if?update.Value?!=?nil?{
          ????????????pr.endpoints[key]?=?update.Value
          ????????}?else?{
          ????????????delete(pr.endpoints,?key)
          ????????}
          ????????pr.dirtyEndpoints.Add(key)
          ????????gaugeNumActiveEndpoints.Set(float64(len(pr.endpoints)))
          ????case?model.PolicyKey:
          ????????log.Debugf("Policy?update:?%v",?key)
          ????????policiesDirty?=?pr.policySorter.OnUpdate(update)
          ????????if?policiesDirty?{
          ????????????pr.markEndpointsMatchingPolicyDirty(key)
          ????????}
          ????}
          ????pr.sortRequired?=?pr.sortRequired?||?policiesDirty
          ????pr.maybeFlush()
          ????gaugeNumActivePolicies.Set(float64(pr.policyIDToEndpointIDs.Len()))
          ????return
          }

          func?(pr?*PolicyResolver)?maybeFlush()?{
          ????if?!pr.InSync?{
          ????????log.Debugf("Not?in?sync,?skipping?flush")
          ????????return
          ????}
          ????if?pr.sortRequired?{
          ????????pr.refreshSortOrder()
          ????}
          ????pr.dirtyEndpoints.Iter(pr.sendEndpointUpdate)
          ????pr.dirtyEndpoints?=?set.New()
          }

          每當(dāng) NetworkPolicy 和 Endpoint 被添加/移除/修改,會通過 PolicyResolverCallbacks 回調(diào)觸發(fā)對應(yīng)事件,我們要找的,就是回調(diào),NetworkPolicy 相關(guān)的回調(diào)聲明了接口 https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L49-L106:

          type?PipelineCallbacks?interface?{
          ????ipSetUpdateCallbacks
          ????rulesUpdateCallbacks
          ????endpointCallbacks
          ????configCallbacks
          ????passthruCallbacks
          ????routeCallbacks
          ????vxlanCallbacks
          }

          type?rulesUpdateCallbacks?interface?{
          ????OnPolicyActive(model.PolicyKey,?*ParsedRules)
          ????OnPolicyInactive(model.PolicyKey)
          ????OnProfileActive(model.ProfileRulesKey,?*ParsedRules)
          ????OnProfileInactive(model.ProfileRulesKey)
          }

          rulesUpdateCallbacks 接口的 OnPolicyActive 方法由 `RuleScanner`[11] 結(jié)構(gòu)實現(xiàn):

          https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/rule_scanner.go#L165-L168

          func?(rs?*RuleScanner)?OnPolicyActive(key?model.PolicyKey,?policy?*model.Policy)?{
          ????parsedRules?:=?rs.updateRules(key,?policy.InboundRules,?policy.OutboundRules,?policy.DoNotTrack,?policy.PreDNAT,?policy.Namespace)
          ????rs.RulesUpdateCallbacks.OnPolicyActive(key,?parsedRules)
          }

          有點越來越復(fù)雜了,不能再往下追了。


          回到 felix 實例 Run 函數(shù),實例化并啟動 DataplaneDriver https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go#L398-L403:

          ????dpDriver,?dpDriverCmd?=?dp.StartDataplaneDriver(
          ????????configParams.Copy(),?//?Copy?to?avoid?concurrent?access.
          ????????healthAggregator,
          ????????configChangedRestartCallback,
          ????????fatalErrorCallback,
          ????????k8sClientSet)

          跳轉(zhuǎn)到 dp.StartDataplaneDriver[12] 函數(shù):

          func?StartDataplaneDriver(configParams?*config.Config,
          ????healthAggregator?*health.HealthAggregator,
          ????configChangedRestartCallback?func()
          ,

          ????fatalErrorCallback?func(error),
          ????k8sClientSet?*kubernetes.Clientset)?(DataplaneDriver,?*exec.Cmd)?{
          ????//?a?lot?of?code?here
          ????????intDP?:=?intdataplane.NewIntDataplaneDriver(dpConfig)
          ????????intDP.Start()
          ????//?a?lot?of?code?here
          }

          繼續(xù)跳轉(zhuǎn)到 intdataplane.NewIntDataplaneDriver[13] 函數(shù):

          func?NewIntDataplaneDriver(config?Config)?*InternalDataplane?{
          ????//?a?lot?of?code?here
          ????if?!config.BPFEnabled?{
          ????????dp.RegisterManager(newPolicyManager(rawTableV4,?mangleTableV4,?filterTableV4,?ruleRenderer,?4))
          ????}
          ????//?a?lot?of?code?here
          }

          如果未啟用 EBPF,那就注冊一個 PolicyManager[14],相關(guān)代碼在 https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go 文件中:

          func?newPolicyManager(rawTable,?mangleTable,?filterTable?iptablesTable,?ruleRenderer?policyRenderer,?ipVersion?uint8)?*policyManager?{
          ????return?&policyManager{
          ????????rawTable:?????rawTable,
          ????????mangleTable:??mangleTable,
          ????????filterTable:??filterTable,
          ????????ruleRenderer:?ruleRenderer,
          ????????ipVersion:????ipVersion,
          ????}
          }

          隱隱感覺到可能和 iptables 有關(guān)系,再來看 PolicyManager`OnUpdate`[15] 方法:

          func?(m?*policyManager)?OnUpdate(msg?interface{})?{
          ????switch?msg?:=?msg.(type)?{
          ????case?*proto.ActivePolicyUpdate:
          ????????if?m.rawEgressOnly?&&?!msg.Policy.Untracked?{
          ????????????log.WithField("id",?msg.Id).Debug("Ignore?non-untracked?policy")
          ????????????return
          ????????}
          ????????log.WithField("id",?msg.Id).Debug("Updating?policy?chains")
          ????????chains?:=?m.ruleRenderer.PolicyToIptablesChains(msg.Id,?msg.Policy,?m.ipVersion)
          ????????if?m.rawEgressOnly?{
          ????????????neededIPSets?:=?set.New()
          ????????????filteredChains?:=?[]*iptables.Chain(nil)
          ????????????for?_,?chain?:=?range?chains?{
          ????????????????if?strings.Contains(chain.Name,?string(rules.PolicyOutboundPfx))?{
          ????????????????????filteredChains?=?append(filteredChains,?chain)
          ????????????????????neededIPSets.AddAll(chain.IPSetNames())
          ????????????????}
          ????????????}
          ????????????chains?=?filteredChains
          ????????????m.mergeNeededIPSets(msg.Id,?neededIPSets)
          ????????}
          ????????//?We?can't?easily?tell?whether?the?policy?is?in?use?in?a?particular?table,?and,?if?the?policy
          ????????//?type?gets?changed?it?may?move?between?tables.??Hence,?we?put?the?policy?into?all?tables.
          ????????//?The?iptables?layer?will?avoid?programming?it?if?it?is?not?actually?used.
          ????????m.rawTable.UpdateChains(chains)
          ????????m.mangleTable.UpdateChains(chains)
          ????????m.filterTable.UpdateChains(chains)
          ????case?*proto.ActivePolicyRemove:
          ????????log.WithField("id",?msg.Id).Debug("Removing?policy?chains")
          ????????if?m.rawEgressOnly?{
          ????????????m.mergeNeededIPSets(msg.Id,?nil)
          ????????}
          ????????inName?:=?rules.PolicyChainName(rules.PolicyInboundPfx,?msg.Id)
          ????????outName?:=?rules.PolicyChainName(rules.PolicyOutboundPfx,?msg.Id)
          ????????//?As?above,?we?need?to?clean?up?in?all?the?tables.
          ????????m.filterTable.RemoveChainByName(inName)
          ????????m.filterTable.RemoveChainByName(outName)
          ????????m.mangleTable.RemoveChainByName(inName)
          ????????m.mangleTable.RemoveChainByName(outName)
          ????????m.rawTable.RemoveChainByName(inName)
          ????????m.rawTable.RemoveChainByName(outName)
          ????//?a?lot?of?code?here
          ????}
          }

          PolicyManager 會將 NetworkPolicy 轉(zhuǎn)換為 iptables 的鏈(Raw、Mangle、Filter),并更新宿主機上的 iptables 規(guī)則 https://github.com/projectcalico/calico/blob/v3.22.1/felix/iptables/table.go#L499-L528:

          func?(t?*Table)?UpdateChains(chains?[]*Chain)?{
          ????for?_,?chain?:=?range?chains?{
          ????????t.UpdateChain(chain)
          ????}
          }

          func?(t?*Table)?UpdateChain(chain?*Chain)?{
          ????t.logCxt.WithField("chainName",?chain.Name).Info("Queueing?update?of?chain.")
          ????oldNumRules?:=?0

          ????//?Incref?any?newly-referenced?chains,?then?decref?the?old?ones.??By?incrementing?first?we
          ????//?avoid?marking?a?still-referenced?chain?as?dirty.
          ????t.increfReferredChains(chain.Rules)
          ????if?oldChain?:=?t.chainNameToChain[chain.Name];?oldChain?!=?nil?{
          ????????oldNumRules?=?len(oldChain.Rules)
          ????????t.decrefReferredChains(oldChain.Rules)
          ????}
          ????t.chainNameToChain[chain.Name]?=?chain
          ????numRulesDelta?:=?len(chain.Rules)?-?oldNumRules
          ????t.gaugeNumRules.Add(float64(numRulesDelta))
          ????if?t.chainRefCounts[chain.Name]?>?0?{
          ????????t.dirtyChains.Add(chain.Name)
          ????}

          ????//?Defensive:?make?sure?we?re-read?the?dataplane?state?before?we?make?updates.??While?the
          ????//?code?was?originally?designed?not?to?need?this,?we?found?that?other?users?of
          ????//?iptables-restore?can?still?clobber?our?updates?so?it's?safest?to?re-read?the?state?before
          ????//?each?write.
          ????t.InvalidateDataplaneCache("chain?update")
          }

          以上只是更新 Table 的緩存,即期望 Raw、Mangle、Filter 表達到的狀態(tài),當(dāng)調(diào)用 Apply[16] 方法時真正地去同步宿主機上的 iptables 規(guī)則。

          我們到 Kubernetes 集群中的主機上看一下 iptables 規(guī)則:

          Chain cali-pi-_PfqSzIS1AirpjL0oXbg (1 references)
          pkts bytes target prot opt in out source destination
          0 0 MARK all -- any any anywhere anywhere /* cali:vPwStapFDttX0Qmr */ /* Policy policy-demo/knp.default.access-nginx ingress */ match-set cali40s:AMAB7BNa5u3MmIiwZrblkWt src MARK or 0x10000
          0 0 RETURN all -- any any anywhere anywhere /* cali:JmKby7c_zvSDZn-y */ mark match 0x10000/0x10000

          Chain cali-tw-calic6866a25fab (1 references)
          pkts bytes target prot opt in out source destination
          0 0 ACCEPT all -- any any anywhere anywhere /* cali:5QeAipZ91RWxAvAP */ ctstate RELATED,ESTABLISHED
          0 0 DROP all -- any any anywhere anywhere /* cali:nUFU9ZCyUibQGJeM */ ctstate INVALID
          0 0 MARK all -- any any anywhere anywhere /* cali:Fb8j6_8umAFH-caO */ MARK and 0xfffeffff
          0 0 MARK all -- any any anywhere anywhere /* cali:5EvW0trrqBzvohhg */ /* Start of policies */ MARK and 0xfffdffff
          0 0 cali-pi-_PfqSzIS1AirpjL0oXbg all -- any any anywhere anywhere /* cali:3WtlYwIIeClqiGDi */ mark match 0x0/0x20000
          0 0 RETURN all -- any any anywhere anywhere /* cali:F9Es9Pigmj_w4QXL */ /* Return if policy accepted */ mark match 0x10000/0x10000
          0 0 DROP all -- any any anywhere anywhere /* cali:IxzxRAfuJGtkJERO */ /* Drop if no policies passed packet */ mark match 0x0/0x20000
          0 0 cali-pri-kns.policy-demo all -- any any anywhere anywhere /* cali:y1Vh7ciW6qiyN7yE */
          0 0 RETURN all -- any any anywhere anywhere /* cali:ocU3P7TVKNDR3w0C */ /* Return if profile accepted */ mark match 0x10000/0x10000
          0 0 cali-pri-_QdhVZ8TmSXm2EvYAKH all -- any any anywhere anywhere /* cali:KYnUV6tBjKhOVhvl */
          0 0 RETURN all -- any any anywhere anywhere /* cali:-3c2S3tuRZHwglRj */ /* Return if profile accepted */ mark match 0x10000/0x10000
          0 0 DROP all -- any any anywhere anywhere /* cali:1X5WKlXPf0ajKXrw */ /* Drop if no profiles matched */

          可見 Calico 使用 iptables 的 mark 功能來標(biāo)記/匹配網(wǎng)絡(luò)數(shù)據(jù)包,實現(xiàn) Kubernetes 集群內(nèi)流量控制。

          引用鏈接

          [1]

          封裝在鏡像中: https://github.com/projectcalico/calico/blob/v3.22.1/node/Dockerfile.amd64

          [2]

          watcherSyncer: https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/watchersyncer/watchersyncer.go#L76-L86

          [3]

          api.Syncer: https://github.com/projectcalico/calico/blob/v3.22.1/libcalico-go/lib/backend/api/api.go#L127-L133

          [4]

          daemon.go 文件: https://github.com/projectcalico/calico/blob/v3.22.1/felix/daemon/daemon.go

          [5]

          NewAsyncCalcGraph: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/async_calc_graph.go#L96-L120

          [6]

          allUpdDispatcher := dispatcher.NewDispatcher() 實例化一個 allUpdDispatcher 對象: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L118-L131

          [7]

          local endpoints: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L133-L151

          [8]

          service index: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L203-L204

          [9]

          ipset member: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L225-L252

          [10]

          endpoint policy resolver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/calc_graph.go#L293-L319

          [11]

          RuleScanner: https://github.com/projectcalico/calico/blob/v3.22.1/felix/calc/rule_scanner.go#L48-L84

          [12]

          dp.StartDataplaneDriver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/driver.go#L57-L390

          [13]

          intdataplane.NewIntDataplaneDriver: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/int_dataplane.go#L307-L866

          [14]

          PolicyManager: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go#L29-L40

          [15]

          OnUpdate: https://github.com/projectcalico/calico/blob/v3.22.1/felix/dataplane/linux/policy_mgr.go#L86-L144

          [16]

          Apply: https://github.com/projectcalico/calico/blob/v3.22.1/felix/iptables/table.go#L946-L1039

          原文鏈接:https://blog.crazytaxii.com/posts/k8s_calico_network_policy/


          你可能還喜歡

          點擊下方圖片即可閱讀

          深入理解 netfilter 和 iptables!

          云原生是一種信仰???

          關(guān)注公眾號

          后臺回復(fù)?k8s?獲取史上最方便快捷的 Kubernetes 高可用部署工具,只需一條命令,連 ssh 都不需要!



          點擊?"閱讀原文"?獲取更好的閱讀體驗!


          發(fā)現(xiàn)朋友圈變“安靜”了嗎?

          瀏覽 97
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  a人片日本亚洲 | 尻屄视频播放 | 亚洲,日韩,aⅴ在线欧美 | 东京热久久综合 | 日本黄色成人网站 |