K8s client-go 的四種客戶端
前面都在講 kube-apiserver ,后續(xù) Kubernetes 其它組件,都會使用 client-go 庫來調(diào)用 kube-apiserver 提供的 API 服務進行資源對象的操作。
Kubernetes 的源碼 staging/src/k8s.io/client-go 中集成了 client-go ,所有組件都是調(diào)用 vendor 本地庫的方式引入 client-go 庫,不需要考慮和集群版本的兼容性,但如果是平時我們自己基于 Kubernetes 做二次開發(fā)用到 client-go 庫時,則需要注意下所導入的庫和 Kubernetes 版本的對應關系。
對于 Kubernetes 1.27.2 ,client-go 源碼的部分目錄如下:
根據(jù)不同功能,可以將其劃分為四種客戶端:
-
RESTClient:最基礎的客戶端,僅對 HTTP Request 進行了封裝。實現(xiàn)位置在 rest 目錄 -
ClientSet:基于 RESTClient 實現(xiàn),封裝了 Kubernetes 內(nèi)置資源(Resource)和版本(Version)的方法。實現(xiàn)位置在 kubernetes 目錄 -
DynamicClient:動態(tài)客戶端,基于 RESTClient 實現(xiàn),封裝了 Kubernetes 任意資源(包括 CRD 自定義資源)和版本的方法。實現(xiàn)位置在 dynamic 目錄 -
DiscoveryClient:發(fā)現(xiàn)客戶端,基于 RESTClient 實現(xiàn),用于發(fā)現(xiàn) kube-apiserver 所支持的資源組(Group)、資源版本(Versions)和資源信息(Resources)。實現(xiàn)位置在 discovery 目錄
RESTClient
作為最基礎的客戶端,其接口定義如下:
// k8s.io/client-go/rest/client.go
type Interface interface {
// 返回速率限制器
GetRateLimiter() flowcontrol.RateLimiter
// 構(gòu)建 (POST, Put, Patch, Get, DELETE) 請求器
Verb(verb string) *Request
// 構(gòu)建 POST 請求器
Post() *Request
// 構(gòu)建 Put 請求器
Put() *Request
// 構(gòu)建 Patch 請求器
Patch(pt types.PatchType) *Request
// 構(gòu)建 Get 請求器
Get() *Request
// 構(gòu)建 Delete 請求器
Delete() *Request
// 返回 API 版本
APIVersion() schema.GroupVersion
}
RESTClient 的實現(xiàn)很簡單,主要提供一個對外的接口調(diào)用:
// k8s.io/client-go/rest/client.go
type RESTClient struct {
base *url.URL
versionedAPIPath string
content ClientContentConfig
createBackoffMgr func() BackoffManager
rateLimiter flowcontrol.RateLimiter
warningHandler WarningHandler
// http 客戶端
Client *http.Client
}
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
content: config,
createBackoffMgr: readExpBackoffConfig,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
func (c *RESTClient) Verb(verb string) *Request {
// 構(gòu)建請求器
return NewRequest(c).Verb(verb)
}
func (c *RESTClient) Post() *Request {
return c.Verb("POST")
}
// 其余請求器的構(gòu)建方法省略,同 Post
核心的處理全在 Request 請求器上,來到構(gòu)建請求器的 NewRequest 方法:
// k8s.io/client-go/rest/request.go
type Request struct {
c *RESTClient
namespace string
resource string
// 省略
}
// 傳入 RESTClient 參數(shù)
func NewRequest(c *RESTClient) *Request {
// ...
r := &Request{
c: c,
// ...
}
switch {
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// 設置要訪問的命名空間 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
return r
}
if r.namespaceSet {
r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
return r
}
if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
return r
}
r.namespaceSet = true
r.namespace = namespace
return r
}
// 設置要訪問的資源 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Resource(resource string) *Request {
if r.err != nil {
return r
}
if len(r.resource) != 0 {
r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
return r
}
if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
return r
}
r.resource = resource
return r
}
Request 請求器的本質(zhì)就是對命名空間、資源等 K8s 的概念進行了一個通用的封裝,通過調(diào)用相應的方法就可以構(gòu)造出最終的請求 URL 和參數(shù):
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
// 命名空間
p = path.Join(p, "namespaces", r.namespace)
}
if len(r.resource) != 0 {
// 資源
p = path.Join(p, strings.ToLower(r.resource))
}
// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
p = path.Join(p, r.resourceName, r.subresource, r.subpath)
}
finalURL := &url.URL{}
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
query := url.Values{}
for key, values := range r.params {
for _, value := range values {
query.Add(key, value)
}
}
// timeout is handled specially here.
if r.timeout != 0 {
// 參數(shù)
query.Set("timeout", r.timeout.String())
}
finalURL.RawQuery = query.Encode()
return finalURL
}
最后調(diào)用 Do 方法發(fā)起請求,并返回請求的結(jié)果:
// 請求結(jié)果
type Result struct {
body []byte
warnings []net.WarningHeader
contentType string
err error
statusCode int
decoder runtime.Decoder
}
func (r *Request) Do(ctx context.Context) Result {
var result Result
// 發(fā)起請求
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
}
return result
}
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
// ...
client := r.c.Client
// 如果 http 客戶端沒有初始化,則使用默認的 http 客戶端
if client == nil {
client = http.DefaultClient
}
// ...
// 重試機制
retry := r.retryFn(r.maxRetries)
for {
if err := retry.Before(ctx, r); err != nil {
return retry.WrapPreviousError(err)
}
// 構(gòu)造 http.Request 對象
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
// 調(diào)用 http 客戶端的 Do 方法發(fā)起 HTTP 請求
resp, err := client.Do(req)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
}
retry.After(ctx, r, resp, err)
done := func() bool {
defer readAndCloseResponseBody(resp)
// if the server returns an error in err, the response will be nil.
f := func(req *http.Request, resp *http.Response) {
if resp == nil {
return
}
fn(req, resp)
}
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false
}
f(req, resp)
return true
}()
if done {
return retry.WrapPreviousError(err)
}
}
}
func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
var body io.Reader
switch {
case r.body != nil && r.bodyBytes != nil:
return nil, fmt.Errorf("cannot set both body and bodyBytes")
case r.body != nil:
body = r.body
case r.bodyBytes != nil:
// Create a new reader specifically for this request.
// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
body = bytes.NewReader(r.bodyBytes)
}
// 構(gòu)造請求 url
url := r.URL().String()
// 初始化 http.Request 對象
req, err := http.NewRequest(r.verb, url, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
return req, nil
}
例如,我們要請求默認命名空間下的 Pod 資源,可以通過如下代碼實現(xiàn):
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通過 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
config.APIPath = "api"
// 手動指定版本
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
// 創(chuàng)建 RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
// 查詢 default 的 pod
result := &corev1.PodList{}
if err := restClient.
Get().
Namespace("default").
Resource("pods").
Do(context.TODO()).
Into(result); err != nil {
panic(err)
}
for _, item := range result.Items {
fmt.Printf("%v \t [%v]\n", item.Name, item.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
ClientSet
如果清楚了 RESTClient 的使用,ClientSet 實際很簡單,首先 ClientSet 對外提供了一層各種版本的接口:
// k8s.io/client-go/kubernetes/clientset.go
type Interface interface {
Discovery() discovery.DiscoveryInterface
AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
AuthenticationV1() authenticationv1.AuthenticationV1Interface
AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
AutoscalingV1() autoscalingv1.AutoscalingV1Interface
AutoscalingV2() autoscalingv2.AutoscalingV2Interface
AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
BatchV1() batchv1.BatchV1Interface
BatchV1beta1() batchv1beta1.BatchV1beta1Interface
CertificatesV1() certificatesv1.CertificatesV1Interface
CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
CoordinationV1() coordinationv1.CoordinationV1Interface
CoreV1() corev1.CoreV1Interface
DiscoveryV1() discoveryv1.DiscoveryV1Interface
DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
EventsV1() eventsv1.EventsV1Interface
EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
NetworkingV1() networkingv1.NetworkingV1Interface
NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
NodeV1() nodev1.NodeV1Interface
NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
NodeV1beta1() nodev1beta1.NodeV1beta1Interface
PolicyV1() policyv1.PolicyV1Interface
PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
RbacV1() rbacv1.RbacV1Interface
RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
SchedulingV1() schedulingv1.SchedulingV1Interface
StorageV1beta1() storagev1beta1.StorageV1beta1Interface
StorageV1() storagev1.StorageV1Interface
StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}
以 CoreV1 為例,又繼續(xù)提供了一層 CoreV1 版本的資源對象接口,其中內(nèi)置了 RESTClient 客戶端:
// k8s.io/client-go/kubernetes/typed/core/v1/core_client.go
type CoreV1Interface interface {
RESTClient() rest.Interface
ComponentStatusesGetter
ConfigMapsGetter
EndpointsGetter
EventsGetter
LimitRangesGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
ResourceQuotasGetter
SecretsGetter
ServicesGetter
ServiceAccountsGetter
}
以 PodsGetter 接口為例,該接口對 Pod 資源的各種操作進行了封裝:
// k8s.io/client-go/kubernetes/typed/core/v1/pod.go
type PodsGetter interface {
Pods(namespace string) PodInterface
}
type PodInterface interface {
Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
PodExpansion
}
對于 Pod 資源 List 方法的實現(xiàn):
type pods struct {
client rest.Interface
ns string
}
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
實際就是我們之前使用 RESTClient 獲取 Pod 資源列表的寫法。
基于對各種資源操作的封裝,現(xiàn)在使用 ClientSet 客戶端來獲取 Pod 的寫法就簡單多了:
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通過 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 創(chuàng)建 ClientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 查詢 default 的 pod
pods, err := clientSet.
CoreV1().
Pods("default").
List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, pod := range pods.Items {
fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
DynamicClient
從 ClientSet 客戶端的源碼實現(xiàn)就可以看出,它只支持 K8s 內(nèi)置資源的操作。對于 CRD 自定義資源,就需要使用 DynamicClient 動態(tài)客戶端。
接口定義如下:
// k8s.io/client-go/dynamic/interface.go
type Interface interface {
Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}
type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
}
type NamespaceableResourceInterface interface {
Namespace(string) ResourceInterface
ResourceInterface
}
DynamicClient 客戶端的實現(xiàn),同樣內(nèi)置 RESTClient 客戶端:
// k8s.io/client-go/dynamic/simple.go
type DynamicClient struct {
client rest.Interface
}
func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
具體的資源操作實現(xiàn)在 dynamicResourceClient ,以 List 方法為例:
type dynamicResourceClient struct {
client *DynamicClient
namespace string
resource schema.GroupVersionResource
}
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err
}
// 調(diào)用 RESTClient 客戶端
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
// 返回 []byte 類型的結(jié)果
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
// 將結(jié)果解碼到 UnstructuredList 類型(無法預知的數(shù)據(jù)結(jié)構(gòu))
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
DynamicClient 客戶端本質(zhì)上也是調(diào)用 RESTClient 客戶端,只是最后返回的請求結(jié)果是一個 Unstructured 或 UnstructuredList 類型(無法預知的數(shù)據(jù)結(jié)構(gòu)),這也是為什么被稱為動態(tài)客戶端。
type UnstructuredList struct {
Object map[string]interface{}
// Items is a list of unstructured objects.
Items []Unstructured `json:"items"`
}
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
DynamicClient 可以處理 ClientSet 無法處理的 CRD 資源,既是優(yōu)點也是缺點,動態(tài)就意味著缺乏類型安全,返回的請求結(jié)果也需要通過反射才能轉(zhuǎn)換成實際的結(jié)構(gòu)體類型,處理起來就略微麻煩了點:
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通過 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 創(chuàng)建 DynamicClient
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 查詢 default 的 pod
unstructuredList, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}).Namespace("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
// 使用反射將 unstructuredList 的數(shù)據(jù)轉(zhuǎn)成對應的結(jié)構(gòu)體類型 corev1.PodList
pods := &corev1.PodList{}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(
unstructuredList.UnstructuredContent(),
pods,
); err != nil {
panic(err.Error())
}
for _, pod := range pods.Items {
fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
DiscoveryClient
在上面示例中,請求某個資源時需要指定 GVR :資源組(Group)、資源版本(Versions)和資源信息(Resources)。例如 Pod 資源對應的 GVR 為:
schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
在 K8s 中有很多的資源對象,我們不可能全部記住,除了翻閱官方文檔,還可以通過 DiscoveryClient 客戶端獲取,我們熟悉的 kubectl api-resources 命令實際也是通過該客戶端獲取的:
$ kubectl api-resources
NAME SHORTNAMES APIVERSION NAMESPACED KIND
bindings v1 true Binding
componentstatuses cs v1 false ComponentStatus
configmaps cm v1 true ConfigMap
endpoints ep v1 true Endpoints
events ev v1 true Event
limitranges limits v1 true LimitRange
namespaces ns v1 false Namespace
nodes no v1 false Node
persistentvolumeclaims pvc v1 true PersistentVolumeClaim
persistentvolumes pv v1 false PersistentVolume
pods po v1 true Pod
podtemplates v1 true PodTemplate
replicationcontrollers rc v1 true ReplicationController
resourcequotas quota v1 true ResourceQuota
secrets v1 true Secret
serviceaccounts sa v1 true ServiceAccount
services svc v1 true Service
mutatingwebhookconfigurations admissionregistration.k8s.io/v1 false MutatingWebhookConfiguration
validatingwebhookconfigurations admissionregistration.k8s.io/v1 false ValidatingWebhookConfiguration
customresourcedefinitions crd,crds apiextensions.k8s.io/v1 false CustomResourceDefinition
apiservices apiregistration.k8s.io/v1 false APIService
controllerrevisions apps/v1 true ControllerRevision
daemonsets ds apps/v1 true DaemonSet
deployments deploy apps/v1 true Deployment
replicasets rs apps/v1 true ReplicaSet
statefulsets sts apps/v1 true StatefulSet
tokenreviews authentication.k8s.io/v1 false TokenReview
localsubjectaccessreviews authorization.k8s.io/v1 true LocalSubjectAccessReview
selfsubjectaccessreviews authorization.k8s.io/v1 false SelfSubjectAccessReview
selfsubjectrulesreviews authorization.k8s.io/v1 false SelfSubjectRulesReview
subjectaccessreviews authorization.k8s.io/v1 false SubjectAccessReview
horizontalpodautoscalers hpa autoscaling/v2 true HorizontalPodAutoscaler
cronjobs cj batch/v1 true CronJob
jobs batch/v1 true Job
certificatesigningrequests csr certificates.k8s.io/v1 false CertificateSigningRequest
leases coordination.k8s.io/v1 true Lease
endpointslices discovery.k8s.io/v1 true EndpointSlice
events ev events.k8s.io/v1 true Event
flowschemas flowcontrol.apiserver.k8s.io/v1beta3 false FlowSchema
prioritylevelconfigurations flowcontrol.apiserver.k8s.io/v1beta3 false PriorityLevelConfiguration
ingressclasses networking.k8s.io/v1 false IngressClass
ingresses ing networking.k8s.io/v1 true Ingress
networkpolicies netpol networking.k8s.io/v1 true NetworkPolicy
runtimeclasses node.k8s.io/v1 false RuntimeClass
poddisruptionbudgets pdb policy/v1 true PodDisruptionBudget
clusterrolebindings rbac.authorization.k8s.io/v1 false ClusterRoleBinding
clusterroles rbac.authorization.k8s.io/v1 false ClusterRole
rolebindings rbac.authorization.k8s.io/v1 true RoleBinding
roles rbac.authorization.k8s.io/v1 true Role
priorityclasses pc scheduling.k8s.io/v1 false PriorityClass
csidrivers storage.k8s.io/v1 false CSIDriver
csinodes storage.k8s.io/v1 false CSINode
csistoragecapacities storage.k8s.io/v1 true CSIStorageCapacity
storageclasses sc storage.k8s.io/v1 false StorageClass
volumeattachments storage.k8s.io/v1 false VolumeAttachment
以下是 DiscoveryClient 客戶端的接口定義,可以分為聚合發(fā)現(xiàn)和緩存發(fā)現(xiàn)兩種,同樣都內(nèi)置了 RESTClient 客戶端:
// k8s.io/client-go/discovery/discovery_client.go
// 聚合發(fā)現(xiàn)
type AggregatedDiscoveryInterface interface {
DiscoveryInterface
GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error)
}
// 緩存發(fā)現(xiàn)
type CachedDiscoveryInterface interface {
DiscoveryInterface
Fresh() bool
Invalidate()
}
type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
OpenAPIV3SchemaInterface
// Returns copy of current discovery client that will only
// receive the legacy discovery format, or pointer to current
// discovery client if it does not support legacy-only discovery.
WithLegacy() DiscoveryInterface
}
type ServerGroupsInterface interface {
// 返回資源組
ServerGroups() (*metav1.APIGroupList, error)
}
type ServerResourcesInterface interface {
// 返回指定資源組和版本的資源信息
ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
// 返回所有資源組和資源信息
ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
// 返回所有資源信息(首選版本)
ServerPreferredResources() ([]*metav1.APIResourceList, error)
// 返回所有支持命名空間的資源信息(首選版本)
ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error)
}
type ServerVersionInterface interface {
ServerVersion() (*version.Info, error)
}
type OpenAPISchemaInterface interface {
OpenAPISchema() (*openapi_v2.Document, error)
}
type OpenAPIV3SchemaInterface interface {
OpenAPIV3() openapi.Client
}
先看聚合發(fā)現(xiàn)客戶端的 ServerGroupsAndResources 方法實現(xiàn),即最基本的 DiscoveryClient :
type DiscoveryClient struct {
restClient restclient.Interface
LegacyPrefix string
// Forces the client to request only "unaggregated" (legacy) discovery.
UseLegacyDiscovery bool
}
var _ AggregatedDiscoveryInterface = &DiscoveryClient{}
func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
// 調(diào)用 ServerGroupsAndResources 方法
return ServerGroupsAndResources(d)
})
}
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 如果是聚合發(fā)現(xiàn)客戶端
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// 調(diào)用 GroupsAndMaybeResources 方法
sgs, resourcesByGV, failedGVs, err = ad.GroupsAndMaybeResources()
for _, resourceList := range resourcesByGV {
// 資源信息
resources = append(resources, resourceList)
}
} else {
sgs, err = d.ServerGroups()
}
if sgs == nil {
return nil, nil, err
}
resultGroups := []*metav1.APIGroup{}
for i := range sgs.Groups {
resultGroups = append(resultGroups, &sgs.Groups[i])
}
if resources != nil {
var ferr error
if len(failedGVs) > 0 {
ferr = &ErrGroupDiscoveryFailed{Groups: failedGVs}
}
// 如果資源信息不為空,則返回
return resultGroups, resources, ferr
}
// 如果沒有查詢到資源信息,繼續(xù)往下處理,此時主要為緩存發(fā)現(xiàn)的邏輯,暫時略過
// ...
}
對于 DiscoveryClient 聚合發(fā)現(xiàn)的主要實現(xiàn)在 GroupsAndMaybeResources 方法:
func (d *DiscoveryClient) GroupsAndMaybeResources() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
// 首先從 /api 下載遺留的組和資源
groups, resources, failedGVs, err := d.downloadLegacy()
if err != nil {
return nil, nil, nil, err
}
// 從 /apis 下載發(fā)現(xiàn)的組和(可能的)資源
apiGroups, apiResources, failedApisGVs, aerr := d.downloadAPIs()
if aerr != nil {
return nil, nil, nil, aerr
}
// 將從 /apis 下載的組合并到遺留的組中
for _, group := range apiGroups.Groups {
groups.Groups = append(groups.Groups, group)
}
// 僅當兩個端點都返回資源時,才返回資源
if resources != nil && apiResources != nil {
for gv, resourceList := range apiResources {
resources[gv] = resourceList
}
} else if resources != nil {
resources = nil
}
// 將從 /api 和 /apis 下載的失敗的 GroupVersion 合并
for gv, err := range failedApisGVs {
failedGVs[gv] = err
}
return groups, resources, failedGVs, err
}
func (d *DiscoveryClient) downloadLegacy() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
}
var responseContentType string
// 使用 RESTClient 客戶端發(fā)起請求
body, err := d.restClient.Get().
AbsPath("/api").
SetHeader("Accept", accept).
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
if err != nil {
// Tolerate 404, since aggregated api servers can return it.
if errors.IsNotFound(err) {
// Return empty structures and no error.
emptyGVMap := map[schema.GroupVersion]*metav1.APIResourceList{}
return apiGroupList, emptyGVMap, failedGVs, nil
} else {
return nil, nil, nil, err
}
}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
var v metav1.APIVersions
err = json.Unmarshal(body, &v)
if err != nil {
return nil, nil, nil, err
}
apiGroup := metav1.APIGroup{}
if len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(&v)
}
apiGroupList.Groups = []metav1.APIGroup{apiGroup}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
func (d *DiscoveryClient) downloadAPIs() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
}
var responseContentType string
// 使用 RESTClient 客戶端發(fā)起請求
body, err := d.restClient.Get().
AbsPath("/apis").
SetHeader("Accept", accept).
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
if err != nil {
return nil, nil, nil, err
}
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
可以看到 DiscoveryClient 客戶端就是通過 RESTClient 客戶端調(diào)用 /api 和 /apis 獲取到所有的資源信息聚合后返回。
使用示例如下:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通過 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 創(chuàng)建 DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}
_, apiResources, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, apiResource := range apiResources {
gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range apiResource.APIResources {
fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
}
}
}
// $ go run main.go
// [G]node.k8s.io [V]v1 [R]runtimeclasses
// [G]authorization.k8s.io [V]v1 [R]localsubjectaccessreviews
// [G]authorization.k8s.io [V]v1 [R]selfsubjectaccessreviews
// [G]authorization.k8s.io [V]v1 [R]selfsubjectrulesreviews
// [G]authorization.k8s.io [V]v1 [R]subjectaccessreviews
// [G]discovery.k8s.io [V]v1 [R]endpointslices
// [G]storage.k8s.io [V]v1 [R]csidrivers
// [G]storage.k8s.io [V]v1 [R]csinodes
// [G]storage.k8s.io [V]v1 [R]csistoragecapacities
// [G]storage.k8s.io [V]v1 [R]storageclasses
// [G]storage.k8s.io [V]v1 [R]volumeattachments
// [G]storage.k8s.io [V]v1 [R]volumeattachments/status
// [G]coordination.k8s.io [V]v1 [R]leases
// [G]autoscaling [V]v1 [R]horizontalpodautoscalers
// [G]autoscaling [V]v1 [R]horizontalpodautoscalers/status
// [G]authentication.k8s.io [V]v1 [R]tokenreviews
// [G]networking.k8s.io [V]v1 [R]ingressclasses
// [G]networking.k8s.io [V]v1 [R]ingresses
// [G]networking.k8s.io [V]v1 [R]ingresses/status
// [G]networking.k8s.io [V]v1 [R]networkpolicies
// [G]networking.k8s.io [V]v1 [R]networkpolicies/status
// [G]admissionregistration.k8s.io [V]v1 [R]mutatingwebhookconfigurations
// [G]admissionregistration.k8s.io [V]v1 [R]validatingwebhookconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas/status
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations/status
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/approval
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/status
// [G]events.k8s.io [V]v1 [R]events
// [G]apiregistration.k8s.io [V]v1 [R]apiservices
// [G]apiregistration.k8s.io [V]v1 [R]apiservices/status
// [G] [V]v1 [R]bindings
// [G] [V]v1 [R]componentstatuses
// [G] [V]v1 [R]configmaps
// [G] [V]v1 [R]endpoints
// [G] [V]v1 [R]events
// [G] [V]v1 [R]limitranges
// [G] [V]v1 [R]namespaces
// [G] [V]v1 [R]namespaces/finalize
// [G] [V]v1 [R]namespaces/status
// [G] [V]v1 [R]nodes
// [G] [V]v1 [R]nodes/proxy
// [G] [V]v1 [R]nodes/status
// [G] [V]v1 [R]persistentvolumeclaims
// [G] [V]v1 [R]persistentvolumeclaims/status
// [G] [V]v1 [R]persistentvolumes
// [G] [V]v1 [R]persistentvolumes/status
// [G] [V]v1 [R]pods
// [G] [V]v1 [R]pods/attach
// [G] [V]v1 [R]pods/binding
// [G] [V]v1 [R]pods/ephemeralcontainers
// [G] [V]v1 [R]pods/eviction
// [G] [V]v1 [R]pods/exec
// [G] [V]v1 [R]pods/log
// [G] [V]v1 [R]pods/portforward
// [G] [V]v1 [R]pods/proxy
// [G] [V]v1 [R]pods/status
// [G] [V]v1 [R]podtemplates
// [G] [V]v1 [R]replicationcontrollers
// [G] [V]v1 [R]replicationcontrollers/scale
// [G] [V]v1 [R]replicationcontrollers/status
// [G] [V]v1 [R]resourcequotas
// [G] [V]v1 [R]resourcequotas/status
// [G] [V]v1 [R]secrets
// [G] [V]v1 [R]serviceaccounts
// [G] [V]v1 [R]serviceaccounts/token
// [G] [V]v1 [R]services
// [G] [V]v1 [R]services/proxy
// [G] [V]v1 [R]services/status
// [G]apps [V]v1 [R]controllerrevisions
// [G]apps [V]v1 [R]daemonsets
// [G]apps [V]v1 [R]daemonsets/status
// [G]apps [V]v1 [R]deployments
// [G]apps [V]v1 [R]deployments/scale
// [G]apps [V]v1 [R]deployments/status
// [G]apps [V]v1 [R]replicasets
// [G]apps [V]v1 [R]replicasets/scale
// [G]apps [V]v1 [R]replicasets/status
// [G]apps [V]v1 [R]statefulsets
// [G]apps [V]v1 [R]statefulsets/scale
// [G]apps [V]v1 [R]statefulsets/status
// [G]scheduling.k8s.io [V]v1 [R]priorityclasses
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas/status
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations/status
// [G]policy [V]v1 [R]poddisruptionbudgets
// [G]policy [V]v1 [R]poddisruptionbudgets/status
// [G]rbac.authorization.k8s.io [V]v1 [R]clusterrolebindings
// [G]rbac.authorization.k8s.io [V]v1 [R]clusterroles
// [G]rbac.authorization.k8s.io [V]v1 [R]rolebindings
// [G]rbac.authorization.k8s.io [V]v1 [R]roles
// [G]batch [V]v1 [R]cronjobs
// [G]batch [V]v1 [R]cronjobs/status
// [G]batch [V]v1 [R]jobs
// [G]batch [V]v1 [R]jobs/status
// [G]autoscaling [V]v2 [R]horizontalpodautoscalers
// [G]autoscaling [V]v2 [R]horizontalpodautoscalers/status
// [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions
// [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions/status
對于緩存發(fā)現(xiàn)客戶端,又分為基于內(nèi)存(memCacheClient)和基于磁盤(CachedDiscoveryClient)兩種,其中 CachedDiscoveryClient 實際上套用了 memCacheClient ,而 memCacheClient 則套用了聚合發(fā)現(xiàn) DiscoveryClient :
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
type CachedDiscoveryClient struct {
// 內(nèi)部套的緩存發(fā)現(xiàn)客戶端,這里實際是 memCacheClient
delegate discovery.DiscoveryInterface
// 緩存的磁盤路徑
cacheDirectory string
// 緩存有效期
ttl time.Duration
// ...省略
}
func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
if len(httpCacheDir) > 0 {
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
})
}
// 聚合發(fā)現(xiàn) DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
// 構(gòu)建基于磁盤的緩存發(fā)現(xiàn),其中套了一層基于內(nèi)存的,基于內(nèi)存的里面套了一層基本的 DiscoveryClient
return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
}
func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
return &CachedDiscoveryClient{
delegate: delegate,
cacheDirectory: cacheDirectory,
ttl: ttl,
ourFiles: map[string]struct{}{},
fresh: true,
}
}
CachedDiscoveryClient 客戶端的 ServerGroupsAndResources 方法實現(xiàn):
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
// 實際上調(diào)用的就是之前的聚合發(fā)現(xiàn) DiscoveryClient 所調(diào)用的 discovery.ServerGroupsAndResources 方法
return discovery.ServerGroupsAndResources(d)
}
看到之前的 discovery.ServerGroupsAndResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 聚合發(fā)現(xiàn)客戶端,上面講過了,省略 ...
} else {
// 對于緩存發(fā)現(xiàn)客戶端,調(diào)用其 ServerGroups 方法
sgs, err = d.ServerGroups()
}
// ... 稍后繼續(xù)
}
這里的 d 是 CachedDiscoveryClient ,看到其 ServerGroups 方法:
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
// 從本地磁盤文件中查詢緩存數(shù)據(jù)
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
cachedBytes, err := d.getCachedFile(filename)
if err == nil {
// 從緩存中查詢成功,解碼后返回資源組
cachedGroups := &metav1.APIGroupList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedGroups, nil
}
}
// 未能從緩存中查詢到,或緩存已過期,則調(diào)用內(nèi)部 delegate 的 ServerGroups 方法
// 這里 delegate 是 memCacheClient
liveGroups, err := d.delegate.ServerGroups()
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveGroups, err
}
if liveGroups == nil || len(liveGroups.Groups) == 0 {
klog.V(3).Infof("skipped caching discovery info, no groups found")
return liveGroups, err
}
// 將數(shù)據(jù)寫入到本地磁盤緩存,以供下次查詢時使用
if err := d.writeCachedFile(filename, liveGroups); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveGroups, nil
}
查詢緩存文件和寫入緩存文件的方式很簡單,實際就是對文件的一個讀寫,以及 ttl 的判斷,直接貼出:
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
// after invalidation ignore cache files not created by this process
d.mutex.Lock()
_, ourFile := d.ourFiles[filename]
if d.invalidated && !ourFile {
d.mutex.Unlock()
return nil, errors.New("cache invalidated")
}
d.mutex.Unlock()
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
return nil, errors.New("cache expired")
}
// the cache is present and its valid. Try to read and use it.
cachedBytes, err := io.ReadAll(file)
if err != nil {
return nil, err
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.fresh = d.fresh && ourFile
return cachedBytes, nil
}
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
return err
}
bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
if err != nil {
return err
}
f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
if err != nil {
return err
}
defer os.Remove(f.Name())
_, err = f.Write(bytes)
if err != nil {
return err
}
err = os.Chmod(f.Name(), 0660)
if err != nil {
return err
}
name := f.Name()
err = f.Close()
if err != nil {
return err
}
// atomic rename
d.mutex.Lock()
defer d.mutex.Unlock()
err = os.Rename(name, filename)
if err == nil {
d.ourFiles[filename] = struct{}{}
}
return err
}
主要是 CachedDiscoveryClient 在緩存文件中查詢不到數(shù)據(jù)或者緩存過期后,會調(diào)用 memCacheClient 的 ServerGroups 方法:
// k8s.io/client-go/discovery/cached/memory/memcache.go
// 資源信息緩存實體
type cacheEntry struct {
resourceList *metav1.APIResourceList
err error
}
type memCacheClient struct {
// 內(nèi)部套的發(fā)現(xiàn)客戶端,這里是最基本的聚合發(fā)現(xiàn) DiscoveryClient
delegate discovery.DiscoveryInterface
lock sync.RWMutex
// 資源信息緩存
groupToServerResources map[string]*cacheEntry
// 資源組緩存
groupList *metav1.APIGroupList
cacheValid bool
openapiClient openapi.Client
receivedAggregatedDiscovery bool
}
func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
// 調(diào)用 GroupsAndMaybeResources 方法
groups, _, _, err := d.GroupsAndMaybeResources()
if err != nil {
return nil, err
}
return groups, nil
}
func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
d.lock.Lock()
defer d.lock.Unlock()
if !d.cacheValid {
// 刷新內(nèi)存的緩存狀態(tài)
if err := d.refreshLocked(); err != nil {
return nil, nil, nil, err
}
}
// 從內(nèi)存中構(gòu)造出需要返回的數(shù)據(jù)結(jié)構(gòu),省略
return d.groupList, resourcesMap, failedGVs, nil
}
func (d *memCacheClient) refreshLocked() error {
var gl *metav1.APIGroupList
var err error
// delegate 是最基本的聚合發(fā)現(xiàn) DiscoveryClient
if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
var resources map[schema.GroupVersion]*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
// 調(diào)用 DiscoveryClient 的 GroupsAndMaybeResources 方法
// 最開始的時候講過了
// 通過 RESTClient 客戶端調(diào)用 /api 和 /apis 獲取到所有的資源信息聚合后返回
gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
if resources != nil && err == nil {
// Cache the resources.
d.groupToServerResources = map[string]*cacheEntry{}
d.groupList = gl
for gv, resources := range resources {
d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
}
// Cache GroupVersion discovery errors
for gv, err := range failedGVs {
d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
}
d.receivedAggregatedDiscovery = true
d.cacheValid = true
return nil
}
} else {
gl, err = d.delegate.ServerGroups()
}
// ...
return nil
}
memCacheClient 實際上就是調(diào)用 DiscoveryClient ,然后將數(shù)據(jù)緩存在內(nèi)存中。
總結(jié)一下就是,CachedDiscoveryClient 首先從本地磁盤文件中取緩存的資源組信息,如果取不到或者過期了,就去 memCacheClient 的內(nèi)存中取,內(nèi)存中也取不到或者被刷新了,就調(diào)用 DiscoveryClient 向 kube-apiserver 發(fā)起請求獲取。
回到 discovery.ServerGroupsAndResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 聚合發(fā)現(xiàn)客戶端,上面講過了,省略 ...
} else {
// 對于緩存發(fā)現(xiàn)客戶端,調(diào)用其 ServerGroups 方法,得到資源組 sgs
sgs, err = d.ServerGroups()
}
if sgs == nil {
return nil, nil, err
}
// 調(diào)用 fetchGroupVersionResources 傳入資源組 sgs 獲取資源信息
groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)
// order results by group/version discovery order
result := []*metav1.APIResourceList{}
for _, apiGroup := range sgs.Groups {
for _, version := range apiGroup.Versions {
gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
if resources, ok := groupVersionResources[gv]; ok {
result = append(result, resources)
}
}
}
if len(failedGroups) == 0 {
return resultGroups, result, nil
}
return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
看到最終的 fetchGroupVersionResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
// 循環(huán)資源組
for _, apiGroup := range apiGroups.Groups {
// 循環(huán)資源版本
for _, version := range apiGroup.Versions {
// 根據(jù)資源組和資源版本去獲取資源信息
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
// 調(diào)用 CachedDiscoveryClient 的 ServerResourcesForGroupVersion 方法
apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())
// lock to record results
resultLock.Lock()
defer resultLock.Unlock()
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned
groupVersionResources[groupVersion] = apiResourceList
}
}()
}
}
// 等待所有并發(fā)查詢完成
wg.Wait()
return groupVersionResources, failedGroups
}
fetchGroupVersionResources 方法開啟了并發(fā)去調(diào)用 ServerResourcesForGroupVersion 方法根據(jù)資源組版本來查詢資源信息:
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
// 從本地磁盤緩存文件中查詢資源信息
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedResources := &metav1.APIResourceList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedResources, nil
}
}
// 如果緩存沒有或者過期了,則調(diào)用 memCacheClient 的 ServerResourcesForGroupVersion 方法
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveResources, err
}
if liveResources == nil || len(liveResources.APIResources) == 0 {
klog.V(3).Infof("skipped caching discovery info, no resources found")
return liveResources, err
}
// 將緩存寫入本地磁盤緩存
if err := d.writeCachedFile(filename, liveResources); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveResources, nil
}
ServerResourcesForGroupVersion 方法和 ServerGroups 方法的邏輯是完全一樣的,這里不重復了。感興趣自己去看。
最后,基于緩存的發(fā)現(xiàn)客戶端的使用示例如下:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/tools/clientcmd"
"time"
)
func main() {
// 通過 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 創(chuàng)建 DiscoveryClient
//discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
// 創(chuàng)建帶緩存的 DiscoveryClient
discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(config, ".kube/cache", ".kube/http-cache", time.Minute*10)
if err != nil {
panic(err.Error())
}
_, apiResources, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, apiResource := range apiResources {
gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range apiResource.APIResources {
fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
}
}
}
本地緩存目錄如下:
本回完,下一回介紹 client-go 的 Informer 機制。
