<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          K8s client-go 的四種客戶端

          共 92358字,需瀏覽 185分鐘

           ·

          2023-09-07 21:04

          前面都在講 kube-apiserver ,后續(xù) Kubernetes 其它組件,都會使用 client-go 庫來調(diào)用 kube-apiserver 提供的 API 服務進行資源對象的操作。

          Kubernetes 的源碼 staging/src/k8s.io/client-go 中集成了 client-go ,所有組件都是調(diào)用 vendor 本地庫的方式引入 client-go 庫,不需要考慮和集群版本的兼容性,但如果是平時我們自己基于 Kubernetes 做二次開發(fā)用到 client-go 庫時,則需要注意下所導入的庫和 Kubernetes 版本的對應關系。

          對于 Kubernetes 1.27.2 ,client-go 源碼的部分目錄如下:

          根據(jù)不同功能,可以將其劃分為四種客戶端:

          • RESTClient:最基礎的客戶端,僅對 HTTP Request 進行了封裝。實現(xiàn)位置在 rest 目錄
          • ClientSet:基于 RESTClient 實現(xiàn),封裝了 Kubernetes 內(nèi)置資源(Resource)和版本(Version)的方法。實現(xiàn)位置在 kubernetes 目錄
          • DynamicClient:動態(tài)客戶端,基于 RESTClient 實現(xiàn),封裝了 Kubernetes 任意資源(包括 CRD 自定義資源)和版本的方法。實現(xiàn)位置在 dynamic 目錄
          • DiscoveryClient:發(fā)現(xiàn)客戶端,基于 RESTClient 實現(xiàn),用于發(fā)現(xiàn) kube-apiserver 所支持的資源組(Group)、資源版本(Versions)和資源信息(Resources)。實現(xiàn)位置在 discovery 目錄

          RESTClient

          作為最基礎的客戶端,其接口定義如下:

          // k8s.io/client-go/rest/client.go

          type Interface interface {
           // 返回速率限制器
           GetRateLimiter() flowcontrol.RateLimiter
           // 構(gòu)建 (POST, Put, Patch, Get, DELETE) 請求器
           Verb(verb string) *Request
           // 構(gòu)建 POST 請求器
           Post() *Request
           // 構(gòu)建 Put 請求器
           Put() *Request
           // 構(gòu)建 Patch 請求器
           Patch(pt types.PatchType) *Request
           // 構(gòu)建 Get 請求器
           Get() *Request
           // 構(gòu)建 Delete 請求器
           Delete() *Request
           // 返回 API 版本
           APIVersion() schema.GroupVersion
          }

          RESTClient 的實現(xiàn)很簡單,主要提供一個對外的接口調(diào)用:

          // k8s.io/client-go/rest/client.go

          type RESTClient struct {
           base *url.URL
           versionedAPIPath string
           content ClientContentConfig
           createBackoffMgr func() BackoffManager
           rateLimiter flowcontrol.RateLimiter
           warningHandler WarningHandler
           // http 客戶端
           Client *http.Client
          }

          func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
           if len(config.ContentType) == 0 {
            config.ContentType = "application/json"
           }

           base := *baseURL
           if !strings.HasSuffix(base.Path, "/") {
            base.Path += "/"
           }
           base.RawQuery = ""
           base.Fragment = ""

           return &RESTClient{
            base:             &base,
            versionedAPIPath: versionedAPIPath,
            content:          config,
            createBackoffMgr: readExpBackoffConfig,
            rateLimiter:      rateLimiter,

            Client: client,
           }, nil
          }

          func (c *RESTClient) Verb(verb string) *Request {
           // 構(gòu)建請求器
           return NewRequest(c).Verb(verb)
          }

          func (c *RESTClient) Post() *Request {
           return c.Verb("POST")
          }

          // 其余請求器的構(gòu)建方法省略,同 Post

          核心的處理全在 Request 請求器上,來到構(gòu)建請求器的 NewRequest 方法:

          // k8s.io/client-go/rest/request.go

          type Request struct {
           c *RESTClient
           namespace    string
           resource     string
           // 省略
          }

          // 傳入 RESTClient 參數(shù)
          func NewRequest(c *RESTClient) *Request {
           // ...
           r := &Request{
            c:              c,
            // ...
           }

           switch {
           case len(c.content.AcceptContentTypes) > 0:
            r.SetHeader("Accept", c.content.AcceptContentTypes)
           case len(c.content.ContentType) > 0:
            r.SetHeader("Accept", c.content.ContentType+", */*")
           }
           return r
          }

          // 設置要訪問的命名空間 (<resource>/[ns/<namespace>/]<name>)
          func (r *Request) Namespace(namespace string) *Request {
           if r.err != nil {
            return r
           }
           if r.namespaceSet {
            r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
            return r
           }
           if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
            r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
            return r
           }
           r.namespaceSet = true
           r.namespace = namespace
           return r
          }

          // 設置要訪問的資源 (<resource>/[ns/<namespace>/]<name>)
          func (r *Request) Resource(resource string) *Request {
           if r.err != nil {
            return r
           }
           if len(r.resource) != 0 {
            r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
            return r
           }
           if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
            r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
            return r
           }
           r.resource = resource
           return r
          }

          Request 請求器的本質(zhì)就是對命名空間、資源等 K8s 的概念進行了一個通用的封裝,通過調(diào)用相應的方法就可以構(gòu)造出最終的請求 URL 和參數(shù):

          func (r *Request) URL() *url.URL {
           p := r.pathPrefix
           if r.namespaceSet && len(r.namespace) > 0 {
            // 命名空間
            p = path.Join(p, "namespaces", r.namespace)
           }
           if len(r.resource) != 0 {
            // 資源
            p = path.Join(p, strings.ToLower(r.resource))
           }
           // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
           if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
            p = path.Join(p, r.resourceName, r.subresource, r.subpath)
           }

           finalURL := &url.URL{}
           if r.c.base != nil {
            *finalURL = *r.c.base
           }
           finalURL.Path = p

           query := url.Values{}
           for key, values := range r.params {
            for _, value := range values {
             query.Add(key, value)
            }
           }

           // timeout is handled specially here.
           if r.timeout != 0 {
            // 參數(shù)
            query.Set("timeout", r.timeout.String())
           }
           finalURL.RawQuery = query.Encode()
           return finalURL
          }

          最后調(diào)用 Do 方法發(fā)起請求,并返回請求的結(jié)果:

          // 請求結(jié)果
          type Result struct {
           body        []byte
           warnings    []net.WarningHeader
           contentType string
           err         error
           statusCode  int

           decoder runtime.Decoder
          }

          func (r *Request) Do(ctx context.Context) Result {
           var result Result
           // 發(fā)起請求
           err := r.request(ctx, func(req *http.Request, resp *http.Response) {
            result = r.transformResponse(resp, req)
           })
           if err != nil {
            return Result{err: err}
           }
           if result.err == nil || len(result.body) > 0 {
            metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
           }
           return result
          }

          func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)error {
           // ...

           client := r.c.Client
           // 如果 http 客戶端沒有初始化,則使用默認的 http 客戶端
           if client == nil {
            client = http.DefaultClient
           }

           // ...

           // 重試機制
           retry := r.retryFn(r.maxRetries)
           for {
            if err := retry.Before(ctx, r); err != nil {
             return retry.WrapPreviousError(err)
            }
            // 構(gòu)造 http.Request 對象
            req, err := r.newHTTPRequest(ctx)
            if err != nil {
             return err
            }
            // 調(diào)用 http 客戶端的 Do 方法發(fā)起 HTTP 請求
            resp, err := client.Do(req)
            // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
            // https://pkg.go.dev/net/http#Request
            if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
             metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
            }
            retry.After(ctx, r, resp, err)

            done := func() bool {
             defer readAndCloseResponseBody(resp)

             // if the server returns an error in err, the response will be nil.
             f := func(req *http.Request, resp *http.Response) {
              if resp == nil {
               return
              }
              fn(req, resp)
             }

             if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
              return false
             }

             f(req, resp)
             return true
            }()
            if done {
             return retry.WrapPreviousError(err)
            }
           }
          }

          func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
           var body io.Reader
           switch {
           case r.body != nil && r.bodyBytes != nil:
            return nil, fmt.Errorf("cannot set both body and bodyBytes")
           case r.body != nil:
            body = r.body
           case r.bodyBytes != nil:
            // Create a new reader specifically for this request.
            // Giving each request a dedicated reader allows retries to avoid races resetting the request body.
            body = bytes.NewReader(r.bodyBytes)
           }

           // 構(gòu)造請求 url
           url := r.URL().String()
           // 初始化 http.Request 對象
           req, err := http.NewRequest(r.verb, url, body)
           if err != nil {
            return nil, err
           }
           req = req.WithContext(ctx)
           req.Header = r.headers
           return req, nil
          }

          例如,我們要請求默認命名空間下的 Pod 資源,可以通過如下代碼實現(xiàn):

          package main

          import (
           "context"
           "fmt"

           corev1 "k8s.io/api/core/v1"
           "k8s.io/client-go/kubernetes/scheme"
           "k8s.io/client-go/rest"
           "k8s.io/client-go/tools/clientcmd"
          )

          func main() {
           // 通過 kubeconfig 生成配置
           config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
           if err != nil {
            panic(err)
           }

           config.APIPath = "api"
           // 手動指定版本
           config.GroupVersion = &corev1.SchemeGroupVersion
           config.NegotiatedSerializer = scheme.Codecs

           // 創(chuàng)建 RESTClient
           restClient, err := rest.RESTClientFor(config)
           if err != nil {
            panic(err)
           }

           // 查詢 default 的 pod
           result := &corev1.PodList{}
           if err := restClient.
            Get().
            Namespace("default").
            Resource("pods").
            Do(context.TODO()).
            Into(result); err != nil {
            panic(err)
           }

           for _, item := range result.Items {
            fmt.Printf("%v \t [%v]\n", item.Name, item.Status.Phase)
           }
          }

          // $ go run main.go
          // nginx    [Pending]

          ClientSet

          如果清楚了 RESTClient 的使用,ClientSet 實際很簡單,首先 ClientSet 對外提供了一層各種版本的接口:

          // k8s.io/client-go/kubernetes/clientset.go

          type Interface interface {
           Discovery() discovery.DiscoveryInterface
           AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
           AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
           AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
           InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
           AppsV1() appsv1.AppsV1Interface
           AppsV1beta1() appsv1beta1.AppsV1beta1Interface
           AppsV1beta2() appsv1beta2.AppsV1beta2Interface
           AuthenticationV1() authenticationv1.AuthenticationV1Interface
           AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
           AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
           AuthorizationV1() authorizationv1.AuthorizationV1Interface
           AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
           AutoscalingV1() autoscalingv1.AutoscalingV1Interface
           AutoscalingV2() autoscalingv2.AutoscalingV2Interface
           AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
           AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
           BatchV1() batchv1.BatchV1Interface
           BatchV1beta1() batchv1beta1.BatchV1beta1Interface
           CertificatesV1() certificatesv1.CertificatesV1Interface
           CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
           CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
           CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
           CoordinationV1() coordinationv1.CoordinationV1Interface
           CoreV1() corev1.CoreV1Interface
           DiscoveryV1() discoveryv1.DiscoveryV1Interface
           DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
           EventsV1() eventsv1.EventsV1Interface
           EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
           ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
           FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
           FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
           FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
           FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
           NetworkingV1() networkingv1.NetworkingV1Interface
           NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
           NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
           NodeV1() nodev1.NodeV1Interface
           NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
           NodeV1beta1() nodev1beta1.NodeV1beta1Interface
           PolicyV1() policyv1.PolicyV1Interface
           PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
           RbacV1() rbacv1.RbacV1Interface
           RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
           RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
           ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
           SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
           SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
           SchedulingV1() schedulingv1.SchedulingV1Interface
           StorageV1beta1() storagev1beta1.StorageV1beta1Interface
           StorageV1() storagev1.StorageV1Interface
           StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
          }

          以 CoreV1 為例,又繼續(xù)提供了一層 CoreV1 版本的資源對象接口,其中內(nèi)置了 RESTClient 客戶端:

          // k8s.io/client-go/kubernetes/typed/core/v1/core_client.go

          type CoreV1Interface interface {
           RESTClient() rest.Interface
           ComponentStatusesGetter
           ConfigMapsGetter
           EndpointsGetter
           EventsGetter
           LimitRangesGetter
           NamespacesGetter
           NodesGetter
           PersistentVolumesGetter
           PersistentVolumeClaimsGetter
           PodsGetter
           PodTemplatesGetter
           ReplicationControllersGetter
           ResourceQuotasGetter
           SecretsGetter
           ServicesGetter
           ServiceAccountsGetter
          }

          以 PodsGetter 接口為例,該接口對 Pod 資源的各種操作進行了封裝:

          // k8s.io/client-go/kubernetes/typed/core/v1/pod.go

          type PodsGetter interface {
           Pods(namespace string) PodInterface
          }

          type PodInterface interface {
           Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
           Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
           UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
           Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
           DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
           Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
           List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
           Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
           Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
           Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
           ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
           UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

           PodExpansion
          }

          對于 Pod 資源 List 方法的實現(xiàn):

          type pods struct {
           client rest.Interface
           ns     string
          }

          func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
           var timeout time.Duration
           if opts.TimeoutSeconds != nil {
            timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
           }
           result = &v1.PodList{}
           err = c.client.Get().
            Namespace(c.ns).
            Resource("pods").
            VersionedParams(&opts, scheme.ParameterCodec).
            Timeout(timeout).
            Do(ctx).
            Into(result)
           return
          }

          實際就是我們之前使用 RESTClient 獲取 Pod 資源列表的寫法。

          基于對各種資源操作的封裝,現(xiàn)在使用 ClientSet 客戶端來獲取 Pod 的寫法就簡單多了:

          package main

          import (
           "context"
           "fmt"

           metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
           "k8s.io/client-go/kubernetes"

           "k8s.io/client-go/tools/clientcmd"
          )

          func main() {
           // 通過 kubeconfig 生成配置
           config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
           if err != nil {
            panic(err)
           }

           // 創(chuàng)建 ClientSet
           clientSet, err := kubernetes.NewForConfig(config)
           if err != nil {
            panic(err)
           }

           // 查詢 default 的 pod
           pods, err := clientSet.
            CoreV1().
            Pods("default").
            List(context.TODO(), metav1.ListOptions{})
           if err != nil {
            panic(err)
           }

           for _, pod := range pods.Items {
            fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
           }
          }

          // $ go run main.go
          // nginx    [Pending]

          DynamicClient

          從 ClientSet 客戶端的源碼實現(xiàn)就可以看出,它只支持 K8s 內(nèi)置資源的操作。對于 CRD 自定義資源,就需要使用 DynamicClient 動態(tài)客戶端。

          接口定義如下:

          // k8s.io/client-go/dynamic/interface.go

          type Interface interface {
           Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
          }

          type ResourceInterface interface {
           Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
           Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
           UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
           Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
           DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
           Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
           List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
           Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
           Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
           Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
           ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
          }

          type NamespaceableResourceInterface interface {
           Namespace(string) ResourceInterface
           ResourceInterface
          }

          DynamicClient 客戶端的實現(xiàn),同樣內(nèi)置 RESTClient 客戶端:

          // k8s.io/client-go/dynamic/simple.go

          type DynamicClient struct {
           client rest.Interface
          }

          func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
           return &dynamicResourceClient{client: c, resource: resource}
          }

          具體的資源操作實現(xiàn)在 dynamicResourceClient ,以 List 方法為例:

          type dynamicResourceClient struct {
           client    *DynamicClient
           namespace string
           resource  schema.GroupVersionResource
          }

          func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
           if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
            return nil, err
           }
           // 調(diào)用 RESTClient 客戶端
           result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
           if err := result.Error(); err != nil {
            return nil, err
           }
           // 返回 []byte 類型的結(jié)果
           retBytes, err := result.Raw()
           if err != nil {
            return nil, err
           }
           // 將結(jié)果解碼到 UnstructuredList 類型(無法預知的數(shù)據(jù)結(jié)構(gòu))
           uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
           if err != nil {
            return nil, err
           }
           if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
            return list, nil
           }

           list, err := uncastObj.(*unstructured.Unstructured).ToList()
           if err != nil {
            return nil, err
           }
           return list, nil
          }

          DynamicClient 客戶端本質(zhì)上也是調(diào)用 RESTClient 客戶端,只是最后返回的請求結(jié)果是一個 Unstructured 或 UnstructuredList 類型(無法預知的數(shù)據(jù)結(jié)構(gòu)),這也是為什么被稱為動態(tài)客戶端。

          type UnstructuredList struct {
           Object map[string]interface{}

           // Items is a list of unstructured objects.
           Items []Unstructured `json:"items"`
          }

          type Unstructured struct {
           // Object is a JSON compatible map with string, float, int, bool, []interface{}, or
           // map[string]interface{}
           // children.
           Object map[string]interface{}
          }

          DynamicClient 可以處理 ClientSet 無法處理的 CRD 資源,既是優(yōu)點也是缺點,動態(tài)就意味著缺乏類型安全,返回的請求結(jié)果也需要通過反射才能轉(zhuǎn)換成實際的結(jié)構(gòu)體類型,處理起來就略微麻煩了點:

          package main

          import (
           "context"
           "fmt"
           corev1 "k8s.io/api/core/v1"
           "k8s.io/apimachinery/pkg/runtime"
           "k8s.io/apimachinery/pkg/runtime/schema"
           "k8s.io/client-go/dynamic"

           metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
           "k8s.io/client-go/tools/clientcmd"
          )

          func main() {
           // 通過 kubeconfig 生成配置
           config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
           if err != nil {
            panic(err)
           }

           // 創(chuàng)建 DynamicClient
           dynamicClient, err := dynamic.NewForConfig(config)
           if err != nil {
            panic(err.Error())
           }

           // 查詢 default 的 pod
           unstructuredList, err := dynamicClient.Resource(schema.GroupVersionResource{
            Group:    "",
            Version:  "v1",
            Resource: "pods",
           }).Namespace("default").List(context.TODO(), metav1.ListOptions{})
           if err != nil {
            panic(err.Error())
           }

           // 使用反射將 unstructuredList 的數(shù)據(jù)轉(zhuǎn)成對應的結(jié)構(gòu)體類型 corev1.PodList
           pods := &corev1.PodList{}
           if err = runtime.DefaultUnstructuredConverter.FromUnstructured(
            unstructuredList.UnstructuredContent(),
            pods,
           ); err != nil {
            panic(err.Error())
           }

           for _, pod := range pods.Items {
            fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
           }
          }

          // $ go run main.go
          // nginx    [Pending]

          DiscoveryClient

          在上面示例中,請求某個資源時需要指定 GVR :資源組(Group)、資源版本(Versions)和資源信息(Resources)。例如 Pod 資源對應的 GVR 為:

          schema.GroupVersionResource{
            Group:    "",
            Version:  "v1",
            Resource: "pods",
          }

          在 K8s 中有很多的資源對象,我們不可能全部記住,除了翻閱官方文檔,還可以通過 DiscoveryClient 客戶端獲取,我們熟悉的 kubectl api-resources 命令實際也是通過該客戶端獲取的:

          $ kubectl api-resources
          NAME                              SHORTNAMES   APIVERSION                             NAMESPACED   KIND
          bindings                                       v1                                     true         Binding
          componentstatuses                 cs           v1                                     false        ComponentStatus
          configmaps                        cm           v1                                     true         ConfigMap
          endpoints                         ep           v1                                     true         Endpoints
          events                            ev           v1                                     true         Event
          limitranges                       limits       v1                                     true         LimitRange
          namespaces                        ns           v1                                     false        Namespace
          nodes                             no           v1                                     false        Node
          persistentvolumeclaims            pvc          v1                                     true         PersistentVolumeClaim
          persistentvolumes                 pv           v1                                     false        PersistentVolume
          pods                              po           v1                                     true         Pod
          podtemplates                                   v1                                     true         PodTemplate
          replicationcontrollers            rc           v1                                     true         ReplicationController
          resourcequotas                    quota        v1                                     true         ResourceQuota
          secrets                                        v1                                     true         Secret
          serviceaccounts                   sa           v1                                     true         ServiceAccount
          services                          svc          v1                                     true         Service
          mutatingwebhookconfigurations                  admissionregistration.k8s.io/v1        false        MutatingWebhookConfiguration
          validatingwebhookconfigurations                admissionregistration.k8s.io/v1        false        ValidatingWebhookConfiguration
          customresourcedefinitions         crd,crds     apiextensions.k8s.io/v1                false        CustomResourceDefinition
          apiservices                                    apiregistration.k8s.io/v1              false        APIService
          controllerrevisions                            apps/v1                                true         ControllerRevision
          daemonsets                        ds           apps/v1                                true         DaemonSet
          deployments                       deploy       apps/v1                                true         Deployment
          replicasets                       rs           apps/v1                                true         ReplicaSet
          statefulsets                      sts          apps/v1                                true         StatefulSet
          tokenreviews                                   authentication.k8s.io/v1               false        TokenReview
          localsubjectaccessreviews                      authorization.k8s.io/v1                true         LocalSubjectAccessReview
          selfsubjectaccessreviews                       authorization.k8s.io/v1                false        SelfSubjectAccessReview
          selfsubjectrulesreviews                        authorization.k8s.io/v1                false        SelfSubjectRulesReview
          subjectaccessreviews                           authorization.k8s.io/v1                false        SubjectAccessReview
          horizontalpodautoscalers          hpa          autoscaling/v2                         true         HorizontalPodAutoscaler
          cronjobs                          cj           batch/v1                               true         CronJob
          jobs                                           batch/v1                               true         Job
          certificatesigningrequests        csr          certificates.k8s.io/v1                 false        CertificateSigningRequest
          leases                                         coordination.k8s.io/v1                 true         Lease
          endpointslices                                 discovery.k8s.io/v1                    true         EndpointSlice
          events                            ev           events.k8s.io/v1                       true         Event
          flowschemas                                    flowcontrol.apiserver.k8s.io/v1beta3   false        FlowSchema
          prioritylevelconfigurations                    flowcontrol.apiserver.k8s.io/v1beta3   false        PriorityLevelConfiguration
          ingressclasses                                 networking.k8s.io/v1                   false        IngressClass
          ingresses                         ing          networking.k8s.io/v1                   true         Ingress
          networkpolicies                   netpol       networking.k8s.io/v1                   true         NetworkPolicy
          runtimeclasses                                 node.k8s.io/v1                         false        RuntimeClass
          poddisruptionbudgets              pdb          policy/v1                              true         PodDisruptionBudget
          clusterrolebindings                            rbac.authorization.k8s.io/v1           false        ClusterRoleBinding
          clusterroles                                   rbac.authorization.k8s.io/v1           false        ClusterRole
          rolebindings                                   rbac.authorization.k8s.io/v1           true         RoleBinding
          roles                                          rbac.authorization.k8s.io/v1           true         Role
          priorityclasses                   pc           scheduling.k8s.io/v1                   false        PriorityClass
          csidrivers                                     storage.k8s.io/v1                      false        CSIDriver
          csinodes                                       storage.k8s.io/v1                      false        CSINode
          csistoragecapacities                           storage.k8s.io/v1                      true         CSIStorageCapacity
          storageclasses                    sc           storage.k8s.io/v1                      false        StorageClass
          volumeattachments                              storage.k8s.io/v1                      false        VolumeAttachment

          以下是 DiscoveryClient 客戶端的接口定義,可以分為聚合發(fā)現(xiàn)和緩存發(fā)現(xiàn)兩種,同樣都內(nèi)置了 RESTClient 客戶端:

          // k8s.io/client-go/discovery/discovery_client.go

          // 聚合發(fā)現(xiàn)
          type AggregatedDiscoveryInterface interface {
           DiscoveryInterface
           GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error)
          }

          // 緩存發(fā)現(xiàn)
          type CachedDiscoveryInterface interface {
           DiscoveryInterface
           Fresh() bool
           Invalidate()
          }

          type DiscoveryInterface interface {
           RESTClient() restclient.Interface
           ServerGroupsInterface
           ServerResourcesInterface
           ServerVersionInterface
           OpenAPISchemaInterface
           OpenAPIV3SchemaInterface
           // Returns copy of current discovery client that will only
           // receive the legacy discovery format, or pointer to current
           // discovery client if it does not support legacy-only discovery.
           WithLegacy() DiscoveryInterface
          }

          type ServerGroupsInterface interface {
           // 返回資源組
           ServerGroups() (*metav1.APIGroupList, error)
          }

          type ServerResourcesInterface interface {
            // 返回指定資源組和版本的資源信息
           ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
           // 返回所有資源組和資源信息
           ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
           // 返回所有資源信息(首選版本)
           ServerPreferredResources() ([]*metav1.APIResourceList, error)
           // 返回所有支持命名空間的資源信息(首選版本)
           ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error)
          }

          type ServerVersionInterface interface {
           ServerVersion() (*version.Info, error)
          }

          type OpenAPISchemaInterface interface {
           OpenAPISchema() (*openapi_v2.Document, error)
          }

          type OpenAPIV3SchemaInterface interface {
           OpenAPIV3() openapi.Client
          }

          先看聚合發(fā)現(xiàn)客戶端的 ServerGroupsAndResources 方法實現(xiàn),即最基本的 DiscoveryClient :

          type DiscoveryClient struct {
           restClient restclient.Interface

           LegacyPrefix string
           // Forces the client to request only "unaggregated" (legacy) discovery.
           UseLegacyDiscovery bool
          }

          var _ AggregatedDiscoveryInterface = &DiscoveryClient{}

          func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
           return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
            // 調(diào)用 ServerGroupsAndResources 方法
            return ServerGroupsAndResources(d)
           })
          }

          func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
           var sgs *metav1.APIGroupList
           var resources []*metav1.APIResourceList
           var failedGVs map[schema.GroupVersion]error
           var err error

           if ad, ok := d.(AggregatedDiscoveryInterface); ok {
            // 如果是聚合發(fā)現(xiàn)客戶端
            var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
            // 調(diào)用 GroupsAndMaybeResources 方法
            sgs, resourcesByGV, failedGVs, err = ad.GroupsAndMaybeResources()
            for _, resourceList := range resourcesByGV {
             // 資源信息
             resources = append(resources, resourceList)
            }
           } else {
            sgs, err = d.ServerGroups()
           }

           if sgs == nil {
            return nilnil, err
           }
           resultGroups := []*metav1.APIGroup{}
           for i := range sgs.Groups {
            resultGroups = append(resultGroups, &sgs.Groups[i])
           }
           if resources != nil {
            var ferr error
            if len(failedGVs) > 0 {
             ferr = &ErrGroupDiscoveryFailed{Groups: failedGVs}
            }
            // 如果資源信息不為空,則返回
            return resultGroups, resources, ferr
           }

           // 如果沒有查詢到資源信息,繼續(xù)往下處理,此時主要為緩存發(fā)現(xiàn)的邏輯,暫時略過
           // ...
          }

          對于 DiscoveryClient 聚合發(fā)現(xiàn)的主要實現(xiàn)在 GroupsAndMaybeResources 方法:

          func (d *DiscoveryClient) GroupsAndMaybeResources() (
           *metav1.APIGroupList,
           map[schema.GroupVersion]*metav1.APIResourceList,
           map[schema.GroupVersion]error,
           error)
           {
           // 首先從 /api 下載遺留的組和資源
           groups, resources, failedGVs, err := d.downloadLegacy()
           if err != nil {
            return nilnilnil, err
           }
           // 從 /apis 下載發(fā)現(xiàn)的組和(可能的)資源
           apiGroups, apiResources, failedApisGVs, aerr := d.downloadAPIs()
           if aerr != nil {
            return nilnilnil, aerr
           }
           // 將從 /apis 下載的組合并到遺留的組中
           for _, group := range apiGroups.Groups {
            groups.Groups = append(groups.Groups, group)
           }
           // 僅當兩個端點都返回資源時,才返回資源
           if resources != nil && apiResources != nil {
            for gv, resourceList := range apiResources {
             resources[gv] = resourceList
            }
           } else if resources != nil {
            resources = nil
           }
           // 將從 /api 和 /apis 下載的失敗的 GroupVersion 合并
           for gv, err := range failedApisGVs {
            failedGVs[gv] = err
           }
           return groups, resources, failedGVs, err
          }

          func (d *DiscoveryClient) downloadLegacy() (
           *metav1.APIGroupList,
           map[schema.GroupVersion]*metav1.APIResourceList,
           map[schema.GroupVersion]error,
           error)
           {
           accept := acceptDiscoveryFormats
           if d.UseLegacyDiscovery {
            accept = AcceptV1
           }
           var responseContentType string
           // 使用 RESTClient 客戶端發(fā)起請求
           body, err := d.restClient.Get().
            AbsPath("/api").
            SetHeader("Accept", accept).
            Do(context.TODO()).
            ContentType(&responseContentType).
            Raw()
           apiGroupList := &metav1.APIGroupList{}
           failedGVs := map[schema.GroupVersion]error{}
           if err != nil {
            // Tolerate 404, since aggregated api servers can return it.
            if errors.IsNotFound(err) {
             // Return empty structures and no error.
             emptyGVMap := map[schema.GroupVersion]*metav1.APIResourceList{}
             return apiGroupList, emptyGVMap, failedGVs, nil
            } else {
             return nilnilnil, err
            }
           }

           var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
           // Switch on content-type server responded with: aggregated or unaggregated.
           switch {
           case isV2Beta1ContentType(responseContentType):
            var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
            err = json.Unmarshal(body, &aggregatedDiscovery)
            if err != nil {
             return nilnilnil, err
            }
            apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
           default:
            // Default is unaggregated discovery v1.
            var v metav1.APIVersions
            err = json.Unmarshal(body, &v)
            if err != nil {
             return nilnilnil, err
            }
            apiGroup := metav1.APIGroup{}
            if len(v.Versions) != 0 {
             apiGroup = apiVersionsToAPIGroup(&v)
            }
            apiGroupList.Groups = []metav1.APIGroup{apiGroup}
           }

           return apiGroupList, resourcesByGV, failedGVs, nil
          }

          func (d *DiscoveryClient) downloadAPIs() (
           *metav1.APIGroupList,
           map[schema.GroupVersion]*metav1.APIResourceList,
           map[schema.GroupVersion]error,
           error)
           {
           accept := acceptDiscoveryFormats
           if d.UseLegacyDiscovery {
            accept = AcceptV1
           }
           var responseContentType string
           // 使用 RESTClient 客戶端發(fā)起請求
           body, err := d.restClient.Get().
            AbsPath("/apis").
            SetHeader("Accept", accept).
            Do(context.TODO()).
            ContentType(&responseContentType).
            Raw()
           if err != nil {
            return nilnilnil, err
           }

           apiGroupList := &metav1.APIGroupList{}
           failedGVs := map[schema.GroupVersion]error{}
           var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
           // Switch on content-type server responded with: aggregated or unaggregated.
           switch {
           case isV2Beta1ContentType(responseContentType):
            var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
            err = json.Unmarshal(body, &aggregatedDiscovery)
            if err != nil {
             return nilnilnil, err
            }
            apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
           default:
            // Default is unaggregated discovery v1.
            err = json.Unmarshal(body, apiGroupList)
            if err != nil {
             return nilnilnil, err
            }
           }

           return apiGroupList, resourcesByGV, failedGVs, nil
          }

          可以看到 DiscoveryClient 客戶端就是通過 RESTClient 客戶端調(diào)用 /api 和 /apis 獲取到所有的資源信息聚合后返回。

          使用示例如下:

          package main

          import (
           "fmt"
           "k8s.io/apimachinery/pkg/runtime/schema"
           "k8s.io/client-go/discovery"
           "k8s.io/client-go/tools/clientcmd"
          )

          func main() {
           // 通過 kubeconfig 生成配置
           config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
           if err != nil {
            panic(err)
           }

           // 創(chuàng)建 DiscoveryClient
           discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
           if err != nil {
            panic(err.Error())
           }

           _, apiResources, err := discoveryClient.ServerGroupsAndResources()
           if err != nil {
            panic(err.Error())
           }

           for _, apiResource := range apiResources {
            gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
            if err != nil {
             panic(err.Error())
            }
            for _, resource := range apiResource.APIResources {
             fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
            }
           }
          }

          // $ go run main.go
          //  [G]node.k8s.io [V]v1 [R]runtimeclasses
          //  [G]authorization.k8s.io [V]v1 [R]localsubjectaccessreviews
          //  [G]authorization.k8s.io [V]v1 [R]selfsubjectaccessreviews
          //  [G]authorization.k8s.io [V]v1 [R]selfsubjectrulesreviews
          //  [G]authorization.k8s.io [V]v1 [R]subjectaccessreviews
          //  [G]discovery.k8s.io [V]v1 [R]endpointslices
          //  [G]storage.k8s.io [V]v1 [R]csidrivers
          //  [G]storage.k8s.io [V]v1 [R]csinodes
          //  [G]storage.k8s.io [V]v1 [R]csistoragecapacities
          //  [G]storage.k8s.io [V]v1 [R]storageclasses
          //  [G]storage.k8s.io [V]v1 [R]volumeattachments
          //  [G]storage.k8s.io [V]v1 [R]volumeattachments/status
          //  [G]coordination.k8s.io [V]v1 [R]leases
          //  [G]autoscaling [V]v1 [R]horizontalpodautoscalers
          //  [G]autoscaling [V]v1 [R]horizontalpodautoscalers/status
          //  [G]authentication.k8s.io [V]v1 [R]tokenreviews
          //  [G]networking.k8s.io [V]v1 [R]ingressclasses
          //  [G]networking.k8s.io [V]v1 [R]ingresses
          //  [G]networking.k8s.io [V]v1 [R]ingresses/status
          //  [G]networking.k8s.io [V]v1 [R]networkpolicies
          //  [G]networking.k8s.io [V]v1 [R]networkpolicies/status
          //  [G]admissionregistration.k8s.io [V]v1 [R]mutatingwebhookconfigurations
          //  [G]admissionregistration.k8s.io [V]v1 [R]validatingwebhookconfigurations
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas/status
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations/status
          //  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests
          //  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/approval
          //  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/status
          //  [G]events.k8s.io [V]v1 [R]events
          //  [G]apiregistration.k8s.io [V]v1 [R]apiservices
          //  [G]apiregistration.k8s.io [V]v1 [R]apiservices/status
          //  [G] [V]v1 [R]bindings
          //  [G] [V]v1 [R]componentstatuses
          //  [G] [V]v1 [R]configmaps
          //  [G] [V]v1 [R]endpoints
          //  [G] [V]v1 [R]events
          //  [G] [V]v1 [R]limitranges
          //  [G] [V]v1 [R]namespaces
          //  [G] [V]v1 [R]namespaces/finalize
          //  [G] [V]v1 [R]namespaces/status
          //  [G] [V]v1 [R]nodes
          //  [G] [V]v1 [R]nodes/proxy
          //  [G] [V]v1 [R]nodes/status
          //  [G] [V]v1 [R]persistentvolumeclaims
          //  [G] [V]v1 [R]persistentvolumeclaims/status
          //  [G] [V]v1 [R]persistentvolumes
          //  [G] [V]v1 [R]persistentvolumes/status
          //  [G] [V]v1 [R]pods
          //  [G] [V]v1 [R]pods/attach
          //  [G] [V]v1 [R]pods/binding
          //  [G] [V]v1 [R]pods/ephemeralcontainers
          //  [G] [V]v1 [R]pods/eviction
          //  [G] [V]v1 [R]pods/exec
          //  [G] [V]v1 [R]pods/log
          //  [G] [V]v1 [R]pods/portforward
          //  [G] [V]v1 [R]pods/proxy
          //  [G] [V]v1 [R]pods/status
          //  [G] [V]v1 [R]podtemplates
          //  [G] [V]v1 [R]replicationcontrollers
          //  [G] [V]v1 [R]replicationcontrollers/scale
          //  [G] [V]v1 [R]replicationcontrollers/status
          //  [G] [V]v1 [R]resourcequotas
          //  [G] [V]v1 [R]resourcequotas/status
          //  [G] [V]v1 [R]secrets
          //  [G] [V]v1 [R]serviceaccounts
          //  [G] [V]v1 [R]serviceaccounts/token
          //  [G] [V]v1 [R]services
          //  [G] [V]v1 [R]services/proxy
          //  [G] [V]v1 [R]services/status
          //  [G]apps [V]v1 [R]controllerrevisions
          //  [G]apps [V]v1 [R]daemonsets
          //  [G]apps [V]v1 [R]daemonsets/status
          //  [G]apps [V]v1 [R]deployments
          //  [G]apps [V]v1 [R]deployments/scale
          //  [G]apps [V]v1 [R]deployments/status
          //  [G]apps [V]v1 [R]replicasets
          //  [G]apps [V]v1 [R]replicasets/scale
          //  [G]apps [V]v1 [R]replicasets/status
          //  [G]apps [V]v1 [R]statefulsets
          //  [G]apps [V]v1 [R]statefulsets/scale
          //  [G]apps [V]v1 [R]statefulsets/status
          //  [G]scheduling.k8s.io [V]v1 [R]priorityclasses
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas/status
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations
          //  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations/status
          //  [G]policy [V]v1 [R]poddisruptionbudgets
          //  [G]policy [V]v1 [R]poddisruptionbudgets/status
          //  [G]rbac.authorization.k8s.io [V]v1 [R]clusterrolebindings
          //  [G]rbac.authorization.k8s.io [V]v1 [R]clusterroles
          //  [G]rbac.authorization.k8s.io [V]v1 [R]rolebindings
          //  [G]rbac.authorization.k8s.io [V]v1 [R]roles
          //  [G]batch [V]v1 [R]cronjobs
          //  [G]batch [V]v1 [R]cronjobs/status
          //  [G]batch [V]v1 [R]jobs
          //  [G]batch [V]v1 [R]jobs/status
          //  [G]autoscaling [V]v2 [R]horizontalpodautoscalers
          //  [G]autoscaling [V]v2 [R]horizontalpodautoscalers/status
          //  [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions
          //  [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions/status

          對于緩存發(fā)現(xiàn)客戶端,又分為基于內(nèi)存(memCacheClient)和基于磁盤(CachedDiscoveryClient)兩種,其中 CachedDiscoveryClient 實際上套用了 memCacheClient ,而 memCacheClient 則套用了聚合發(fā)現(xiàn) DiscoveryClient :

          // k8s.io/client-go/discovery/cached/disk/cached_discovery.go

          type CachedDiscoveryClient struct {
           // 內(nèi)部套的緩存發(fā)現(xiàn)客戶端,這里實際是 memCacheClient
           delegate discovery.DiscoveryInterface
           // 緩存的磁盤路徑
           cacheDirectory string
           // 緩存有效期
           ttl time.Duration
           // ...省略
          }

          func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
           if len(httpCacheDir) > 0 {
            config = restclient.CopyConfig(config)
            config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
             return newCacheRoundTripper(httpCacheDir, rt)
            })
           }

           // 聚合發(fā)現(xiàn) DiscoveryClient
           discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
           if err != nil {
            return nil, err
           }

           // 構(gòu)建基于磁盤的緩存發(fā)現(xiàn),其中套了一層基于內(nèi)存的,基于內(nèi)存的里面套了一層基本的 DiscoveryClient
           return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
          }

          func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
           return &CachedDiscoveryClient{
            delegate:       delegate,
            cacheDirectory: cacheDirectory,
            ttl:            ttl,
            ourFiles:       map[string]struct{}{},
            fresh:          true,
           }
          }

          CachedDiscoveryClient 客戶端的 ServerGroupsAndResources 方法實現(xiàn):

          // k8s.io/client-go/discovery/cached/disk/cached_discovery.go

          func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
           // 實際上調(diào)用的就是之前的聚合發(fā)現(xiàn) DiscoveryClient 所調(diào)用的 discovery.ServerGroupsAndResources 方法
           return discovery.ServerGroupsAndResources(d)
          }

          看到之前的 discovery.ServerGroupsAndResources 方法:

          // k8s.io/client-go/discovery/discovery_client.go

          func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
           var sgs *metav1.APIGroupList
           var resources []*metav1.APIResourceList
           var failedGVs map[schema.GroupVersion]error
           var err error

           if ad, ok := d.(AggregatedDiscoveryInterface); ok {
            // 聚合發(fā)現(xiàn)客戶端,上面講過了,省略 ...
           } else {
            // 對于緩存發(fā)現(xiàn)客戶端,調(diào)用其 ServerGroups 方法
            sgs, err = d.ServerGroups()
           }

           // ... 稍后繼續(xù)
          }

          這里的 d 是 CachedDiscoveryClient ,看到其 ServerGroups 方法:

          // k8s.io/client-go/discovery/cached/disk/cached_discovery.go

          func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
           // 從本地磁盤文件中查詢緩存數(shù)據(jù)
           filename := filepath.Join(d.cacheDirectory, "servergroups.json")
           cachedBytes, err := d.getCachedFile(filename)
           if err == nil {
            // 從緩存中查詢成功,解碼后返回資源組
            cachedGroups := &metav1.APIGroupList{}
            if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
             klog.V(10).Infof("returning cached discovery info from %v", filename)
             return cachedGroups, nil
            }
           }

           // 未能從緩存中查詢到,或緩存已過期,則調(diào)用內(nèi)部 delegate 的 ServerGroups 方法
           // 這里 delegate 是 memCacheClient
           liveGroups, err := d.delegate.ServerGroups()
           if err != nil {
            klog.V(3).Infof("skipped caching discovery info due to %v", err)
            return liveGroups, err
           }
           if liveGroups == nil || len(liveGroups.Groups) == 0 {
            klog.V(3).Infof("skipped caching discovery info, no groups found")
            return liveGroups, err
           }

           // 將數(shù)據(jù)寫入到本地磁盤緩存,以供下次查詢時使用
           if err := d.writeCachedFile(filename, liveGroups); err != nil {
            klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
           }

           return liveGroups, nil
          }

          查詢緩存文件和寫入緩存文件的方式很簡單,實際就是對文件的一個讀寫,以及 ttl 的判斷,直接貼出:

          func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
           // after invalidation ignore cache files not created by this process
           d.mutex.Lock()
           _, ourFile := d.ourFiles[filename]
           if d.invalidated && !ourFile {
            d.mutex.Unlock()
            return nil, errors.New("cache invalidated")
           }
           d.mutex.Unlock()

           file, err := os.Open(filename)
           if err != nil {
            return nil, err
           }
           defer file.Close()

           fileInfo, err := file.Stat()
           if err != nil {
            return nil, err
           }

           if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
            return nil, errors.New("cache expired")
           }

           // the cache is present and its valid.  Try to read and use it.
           cachedBytes, err := io.ReadAll(file)
           if err != nil {
            return nil, err
           }

           d.mutex.Lock()
           defer d.mutex.Unlock()
           d.fresh = d.fresh && ourFile

           return cachedBytes, nil
          }

          func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
           if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
            return err
           }

           bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
           if err != nil {
            return err
           }

           f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
           if err != nil {
            return err
           }
           defer os.Remove(f.Name())
           _, err = f.Write(bytes)
           if err != nil {
            return err
           }

           err = os.Chmod(f.Name(), 0660)
           if err != nil {
            return err
           }

           name := f.Name()
           err = f.Close()
           if err != nil {
            return err
           }

           // atomic rename
           d.mutex.Lock()
           defer d.mutex.Unlock()
           err = os.Rename(name, filename)
           if err == nil {
            d.ourFiles[filename] = struct{}{}
           }
           return err
          }

          主要是 CachedDiscoveryClient 在緩存文件中查詢不到數(shù)據(jù)或者緩存過期后,會調(diào)用 memCacheClient 的 ServerGroups 方法:

          // k8s.io/client-go/discovery/cached/memory/memcache.go

          // 資源信息緩存實體
          type cacheEntry struct {
           resourceList *metav1.APIResourceList
           err          error
          }

          type memCacheClient struct {
           // 內(nèi)部套的發(fā)現(xiàn)客戶端,這里是最基本的聚合發(fā)現(xiàn) DiscoveryClient
           delegate discovery.DiscoveryInterface

           lock                        sync.RWMutex
           // 資源信息緩存
           groupToServerResources      map[string]*cacheEntry
           // 資源組緩存
           groupList                   *metav1.APIGroupList
           cacheValid                  bool
           openapiClient               openapi.Client
           receivedAggregatedDiscovery bool
          }

          func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
           // 調(diào)用 GroupsAndMaybeResources 方法
           groups, _, _, err := d.GroupsAndMaybeResources()
           if err != nil {
            return nil, err
           }
           return groups, nil
          }

          func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
           d.lock.Lock()
           defer d.lock.Unlock()

           if !d.cacheValid {
            // 刷新內(nèi)存的緩存狀態(tài)
            if err := d.refreshLocked(); err != nil {
             return nilnilnil, err
            }
           }
           // 從內(nèi)存中構(gòu)造出需要返回的數(shù)據(jù)結(jié)構(gòu),省略
           return d.groupList, resourcesMap, failedGVs, nil
          }

          func (d *memCacheClient) refreshLocked() error {
           var gl *metav1.APIGroupList
           var err error

           // delegate 是最基本的聚合發(fā)現(xiàn) DiscoveryClient
           if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
            var resources map[schema.GroupVersion]*metav1.APIResourceList
            var failedGVs map[schema.GroupVersion]error
            // 調(diào)用 DiscoveryClient 的 GroupsAndMaybeResources 方法
            // 最開始的時候講過了
            // 通過 RESTClient 客戶端調(diào)用 /api 和 /apis 獲取到所有的資源信息聚合后返回
            gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
            if resources != nil && err == nil {
             // Cache the resources.
             d.groupToServerResources = map[string]*cacheEntry{}
             d.groupList = gl
             for gv, resources := range resources {
              d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
             }
             // Cache GroupVersion discovery errors
             for gv, err := range failedGVs {
              d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
             }
             d.receivedAggregatedDiscovery = true
             d.cacheValid = true
             return nil
            }
           } else {
            gl, err = d.delegate.ServerGroups()
           }
           // ...
           return nil
          }

          memCacheClient 實際上就是調(diào)用 DiscoveryClient ,然后將數(shù)據(jù)緩存在內(nèi)存中。

          總結(jié)一下就是,CachedDiscoveryClient 首先從本地磁盤文件中取緩存的資源組信息,如果取不到或者過期了,就去 memCacheClient 的內(nèi)存中取,內(nèi)存中也取不到或者被刷新了,就調(diào)用 DiscoveryClient 向 kube-apiserver 發(fā)起請求獲取。

          回到 discovery.ServerGroupsAndResources 方法:

          // k8s.io/client-go/discovery/discovery_client.go

          func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
           var sgs *metav1.APIGroupList
           var resources []*metav1.APIResourceList
           var failedGVs map[schema.GroupVersion]error
           var err error

           if ad, ok := d.(AggregatedDiscoveryInterface); ok {
            // 聚合發(fā)現(xiàn)客戶端,上面講過了,省略 ...
           } else {
            // 對于緩存發(fā)現(xiàn)客戶端,調(diào)用其 ServerGroups 方法,得到資源組 sgs
            sgs, err = d.ServerGroups()
           }

           if sgs == nil {
            return nilnil, err
           }
           // 調(diào)用 fetchGroupVersionResources 傳入資源組 sgs 獲取資源信息
           groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)

           // order results by group/version discovery order
           result := []*metav1.APIResourceList{}
           for _, apiGroup := range sgs.Groups {
            for _, version := range apiGroup.Versions {
             gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
             if resources, ok := groupVersionResources[gv]; ok {
              result = append(result, resources)
             }
            }
           }

           if len(failedGroups) == 0 {
            return resultGroups, result, nil
           }

           return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
          }

          看到最終的 fetchGroupVersionResources 方法:

          // k8s.io/client-go/discovery/discovery_client.go

          func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
           groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
           failedGroups := make(map[schema.GroupVersion]error)

           wg := &sync.WaitGroup{}
           resultLock := &sync.Mutex{}
           // 循環(huán)資源組
           for _, apiGroup := range apiGroups.Groups {
            // 循環(huán)資源版本
            for _, version := range apiGroup.Versions {
             // 根據(jù)資源組和資源版本去獲取資源信息
             groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
             wg.Add(1)
             go func() {
              defer wg.Done()
              defer utilruntime.HandleCrash()

              // 調(diào)用 CachedDiscoveryClient 的 ServerResourcesForGroupVersion 方法
              apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())

              // lock to record results
              resultLock.Lock()
              defer resultLock.Unlock()

              if err != nil {
               // TODO: maybe restrict this to NotFound errors
               failedGroups[groupVersion] = err
              }
              if apiResourceList != nil {
               // even in case of error, some fallback might have been returned
               groupVersionResources[groupVersion] = apiResourceList
              }
             }()
            }
           }
           // 等待所有并發(fā)查詢完成
           wg.Wait()

           return groupVersionResources, failedGroups
          }

          fetchGroupVersionResources 方法開啟了并發(fā)去調(diào)用 ServerResourcesForGroupVersion 方法根據(jù)資源組版本來查詢資源信息:

          // k8s.io/client-go/discovery/cached/disk/cached_discovery.go

          func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
           // 從本地磁盤緩存文件中查詢資源信息
           filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
           cachedBytes, err := d.getCachedFile(filename)
           // don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
           if err == nil {
            cachedResources := &metav1.APIResourceList{}
            if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
             klog.V(10).Infof("returning cached discovery info from %v", filename)
             return cachedResources, nil
            }
           }

           // 如果緩存沒有或者過期了,則調(diào)用 memCacheClient 的 ServerResourcesForGroupVersion 方法
           liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
           if err != nil {
            klog.V(3).Infof("skipped caching discovery info due to %v", err)
            return liveResources, err
           }
           if liveResources == nil || len(liveResources.APIResources) == 0 {
            klog.V(3).Infof("skipped caching discovery info, no resources found")
            return liveResources, err
           }

           // 將緩存寫入本地磁盤緩存
           if err := d.writeCachedFile(filename, liveResources); err != nil {
            klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
           }

           return liveResources, nil
          }

          ServerResourcesForGroupVersion 方法和 ServerGroups 方法的邏輯是完全一樣的,這里不重復了。感興趣自己去看。

          最后,基于緩存的發(fā)現(xiàn)客戶端的使用示例如下:

          package main

          import (
           "fmt"
           "k8s.io/apimachinery/pkg/runtime/schema"
           "k8s.io/client-go/discovery/cached/disk"
           "k8s.io/client-go/tools/clientcmd"
           "time"
          )

          func main() {
           // 通過 kubeconfig 生成配置
           config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
           if err != nil {
            panic(err)
           }

           // 創(chuàng)建 DiscoveryClient
           //discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)

           // 創(chuàng)建帶緩存的 DiscoveryClient
           discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(config, ".kube/cache"".kube/http-cache", time.Minute*10)
           if err != nil {
            panic(err.Error())
           }

           _, apiResources, err := discoveryClient.ServerGroupsAndResources()
           if err != nil {
            panic(err.Error())
           }

           for _, apiResource := range apiResources {
            gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
            if err != nil {
             panic(err.Error())
            }
            for _, resource := range apiResource.APIResources {
             fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
            }
           }
          }

          本地緩存目錄如下:

          本回完,下一回介紹 client-go 的 Informer 機制。

          瀏覽 1990
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  我爱操逼| 久久久久久久三级片AV | 免费的黄色大片 | 五月丁香综合激情 | 黄片网站免费观看 |