From b5648c3f61ef168e6a1c9c7b682745e28145dc1e Mon Sep 17 00:00:00 2001 From: Michael Taufen Date: Mon, 16 Apr 2018 15:15:03 -0700 Subject: [PATCH] dynamic Kubelet config reconciles ConfigMap updates --- cmd/kubelet/app/server.go | 4 +- pkg/apis/core/validation/validation.go | 14 +- pkg/kubelet/kubeletconfig/checkpoint/BUILD | 5 + .../kubeletconfig/checkpoint/configmap.go | 12 +- .../checkpoint/configmap_test.go | 53 +- .../kubeletconfig/checkpoint/download.go | 149 ++- .../kubeletconfig/checkpoint/download_test.go | 104 ++- .../kubeletconfig/checkpoint/store/fsstore.go | 46 +- .../checkpoint/store/fsstore_test.go | 375 ++++---- .../kubeletconfig/checkpoint/store/store.go | 1 + pkg/kubelet/kubeletconfig/configsync.go | 181 ++-- pkg/kubelet/kubeletconfig/controller.go | 111 ++- pkg/kubelet/kubeletconfig/status/status.go | 9 +- pkg/kubelet/kubeletconfig/watch.go | 59 +- test/e2e_node/dynamic_kubelet_config_test.go | 867 ++++++++++++------ test/integration/auth/node_test.go | 7 +- 16 files changed, 1303 insertions(+), 694 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 7adfb77b7d..f386400a60 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -619,7 +619,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { // If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce { - kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)) + if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil { + return err + } } if kubeDeps.Auth == nil { diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index 6f0e3eaea7..6a01b7f625 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -4150,11 +4150,10 @@ func validateNodeConfigSourceSpec(source *core.NodeConfigSource, fldPath *field. // validation specific to Node.Spec.ConfigSource.ConfigMap func validateConfigMapNodeConfigSourceSpec(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - // TODO(#61643): Prevent ref.UID from being set here when we switch from requiring UID to respecting all ConfigMap updates - if string(source.UID) == "" { - allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in spec")) + // uid and resourceVersion must not be set in spec + if string(source.UID) != "" { + allErrs = append(allErrs, field.Forbidden(fldPath.Child("uid"), "uid must not be set in spec")) } - // resourceVersion must not be set in spec if source.ResourceVersion != "" { allErrs = append(allErrs, field.Forbidden(fldPath.Child("resourceVersion"), "resourceVersion must not be set in spec")) } @@ -4196,12 +4195,13 @@ func validateNodeConfigSourceStatus(source *core.NodeConfigSource, fldPath *fiel // validation specific to Node.Status.Config.(Active|Assigned|LastKnownGood).ConfigMap func validateConfigMapNodeConfigSourceStatus(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - + // uid and resourceVersion must be set in status if string(source.UID) == "" { allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in status")) } - // TODO(#63221): require ResourceVersion in status when we start respecting ConfigMap mutations (the Kubelet isn't tracking it internally until - // that PR, which makes it difficult to report for now). + if source.ResourceVersion == "" { + allErrs = append(allErrs, field.Required(fldPath.Child("resourceVersion"), "resourceVersion must be set in status")) + } return append(allErrs, validateConfigMapNodeConfigSource(source, fldPath)...) } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/BUILD b/pkg/kubelet/kubeletconfig/checkpoint/BUILD index cb21adad99..a0b7446293 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/BUILD +++ b/pkg/kubelet/kubeletconfig/checkpoint/BUILD @@ -19,7 +19,9 @@ go_test( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) @@ -40,8 +42,11 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/kubelet/kubeletconfig/checkpoint/configmap.go b/pkg/kubelet/kubeletconfig/checkpoint/configmap.go index 2cdc348f37..b79e67e75b 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/configmap.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/configmap.go @@ -34,9 +34,11 @@ var _ Payload = (*configMapPayload)(nil) // NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) { if cm == nil { - return nil, fmt.Errorf("ConfigMap must be non-nil to be a Payload") - } else if len(cm.ObjectMeta.UID) == 0 { - return nil, fmt.Errorf("ConfigMap must have a UID to be a Payload") + return nil, fmt.Errorf("ConfigMap must be non-nil") + } else if cm.ObjectMeta.UID == "" { + return nil, fmt.Errorf("ConfigMap must have a non-empty UID") + } else if cm.ObjectMeta.ResourceVersion == "" { + return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion") } return &configMapPayload{cm}, nil @@ -46,6 +48,10 @@ func (p *configMapPayload) UID() string { return string(p.cm.UID) } +func (p *configMapPayload) ResourceVersion() string { + return p.cm.ResourceVersion +} + func (p *configMapPayload) Files() map[string]string { return p.cm.Data } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go b/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go index d7770f05a4..db44fab3bc 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/configmap_test.go @@ -34,19 +34,46 @@ func TestNewConfigMapPayload(t *testing.T) { cm *apiv1.ConfigMap err string }{ - {"nil v1/ConfigMap", nil, "must be non-nil"}, - {"empty v1/ConfigMap", &apiv1.ConfigMap{}, "must have a UID"}, - {"populated v1/ConfigMap", - &apiv1.ConfigMap{ + { + desc: "nil", + cm: nil, + err: "ConfigMap must be non-nil", + }, + { + desc: "missing uid", + cm: &apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + ResourceVersion: "rv", + }, + }, + err: "ConfigMap must have a non-empty UID", + }, + { + desc: "missing resourceVersion", + cm: &apiv1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: "name", UID: "uid", }, + }, + err: "ConfigMap must have a non-empty ResourceVersion", + }, + { + desc: "populated v1/ConfigMap", + cm: &apiv1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + UID: "uid", + ResourceVersion: "rv", + }, Data: map[string]string{ "key1": "value1", "key2": "value2", }, - }, ""}, + }, + err: "", + }, } for _, c := range cases { @@ -66,7 +93,7 @@ func TestNewConfigMapPayload(t *testing.T) { func TestConfigMapPayloadUID(t *testing.T) { const expect = "uid" - payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect}}) + payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}}) if err != nil { t.Fatalf("error constructing payload: %v", err) } @@ -76,6 +103,18 @@ func TestConfigMapPayloadUID(t *testing.T) { } } +func TestConfigMapPayloadResourceVersion(t *testing.T) { + const expect = "rv" + payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}}) + if err != nil { + t.Fatalf("error constructing payload: %v", err) + } + resourceVersion := payload.ResourceVersion() + if expect != resourceVersion { + t.Errorf("expect %q, but got %q", expect, resourceVersion) + } +} + func TestConfigMapPayloadFiles(t *testing.T) { cases := []struct { desc string @@ -96,7 +135,7 @@ func TestConfigMapPayloadFiles(t *testing.T) { } for _, c := range cases { t.Run(c.desc, func(t *testing.T) { - payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid"}, Data: c.data}) + payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data}) if err != nil { t.Fatalf("error constructing payload: %v", err) } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download.go b/pkg/kubelet/kubeletconfig/checkpoint/download.go index 3b4854c0a5..eb7059806e 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download.go @@ -18,12 +18,18 @@ package checkpoint import ( "fmt" + "math/rand" + "time" apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme" kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1" @@ -35,8 +41,13 @@ import ( // Payload represents a local copy of a config source (payload) object type Payload interface { // UID returns a globally unique (space and time) identifier for the payload. + // The return value is guaranteed non-empty. UID() string + // ResourceVersion returns a resource version for the payload. + // The return value is guaranteed non-empty. + ResourceVersion() string + // Files returns a map of filenames to file contents. Files() map[string]string @@ -46,16 +57,29 @@ type Payload interface { // RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint type RemoteConfigSource interface { - // UID returns a globally unique identifier of the source described by the remote config source object - UID() string // KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files() KubeletFilename() string + // APIPath returns the API path to the remote resource, e.g. its SelfLink APIPath() string - // Download downloads the remote config source object returns a Payload backed by the object, - // or a sanitized failure reason and error if the download fails - Download(client clientset.Interface) (Payload, string, error) - // Encode returns a []byte representation of the NodeConfigSource behind the RemoteConfigSource + + // UID returns the globally unique identifier for the most recently downloaded payload targeted by the source. + UID() string + + // ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source. + ResourceVersion() string + + // Download downloads the remote config source's target object and returns a Payload backed by the object, + // or a sanitized failure reason and error if the download fails. + // Download takes an optional store as an argument. If provided, Download will check this store for the + // target object prior to contacting the API server. + // Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload. + Download(client clientset.Interface, store cache.Store) (Payload, string, error) + + // Informer returns an informer that can be used to detect changes to the remote config source + Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer + + // Encode returns a []byte representation of the object behind the RemoteConfigSource Encode() ([]byte, error) // NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object. @@ -104,7 +128,11 @@ func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) { // we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here source, _, err := NewRemoteConfigSource(&cs.Source) - return source, err + if err != nil { + return nil, err + } + + return source, nil } // EqualRemoteConfigSources is a helper for comparing remote config sources by @@ -123,10 +151,6 @@ type remoteConfigMap struct { var _ RemoteConfigSource = (*remoteConfigMap)(nil) -func (r *remoteConfigMap) UID() string { - return string(r.source.ConfigMap.UID) -} - func (r *remoteConfigMap) KubeletFilename() string { return r.source.ConfigMap.KubeletConfigKey } @@ -138,32 +162,82 @@ func (r *remoteConfigMap) APIPath() string { return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name) } -func (r *remoteConfigMap) Download(client clientset.Interface) (Payload, string, error) { - var reason string - uid := string(r.source.ConfigMap.UID) +func (r *remoteConfigMap) UID() string { + return string(r.source.ConfigMap.UID) +} - utillog.Infof("attempting to download ConfigMap with UID %q", uid) +func (r *remoteConfigMap) ResourceVersion() string { + return r.source.ConfigMap.ResourceVersion +} - // get the ConfigMap via namespace/name, there doesn't seem to be a way to get it by UID - cm, err := client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{}) - if err != nil { - return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err) +func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) { + var ( + cm *apiv1.ConfigMap + err error + ) + // check the in-memory store for the ConfigMap, so we can skip unnecessary downloads + if store != nil { + utillog.Infof("checking in-memory store for %s", r.APIPath()) + cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name) + if err != nil { + // just log the error, we'll attempt a direct download instead + utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err) + } else if cm != nil { + utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion) + } else { + utillog.Infof("did not find %s in in-memory store", r.APIPath()) + } } - - // ensure that UID matches the UID on the source - if r.source.ConfigMap.UID != cm.UID { - reason = fmt.Sprintf(status.UIDMismatchErrorFmt, r.source.ConfigMap.UID, r.APIPath(), cm.UID) - return nil, reason, fmt.Errorf(reason) - } - + // if we didn't find the ConfigMap in the in-memory store, download it from the API server + if cm == nil { + utillog.Infof("attempting to download %s", r.APIPath()) + cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{}) + if err != nil { + return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err) + } + utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion) + } // Assert: Now we have a non-nil ConfigMap + // construct Payload from the ConfigMap payload, err := NewConfigMapPayload(cm) if err != nil { - reason = fmt.Sprintf("invalid downloaded object") - return nil, reason, fmt.Errorf("%s, error: %v", reason, err) + // We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should + // never happen on objects in the informer's store, or objects downloaded from the API server + // directly, so we report InternalError. + return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err) + } + // update internal UID and ResourceVersion based on latest ConfigMap + r.source.ConfigMap.UID = cm.UID + r.source.ConfigMap.ResourceVersion = cm.ResourceVersion + return payload, "", nil +} + +func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer { + // select ConfigMap by name + fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name) + + // add some randomness to resync period, which can help avoid controllers falling into lock-step + minResyncPeriod := 15 * time.Minute + factor := rand.Float64() + 1 + resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor) + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) { + return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{ + FieldSelector: fieldselector.String(), + }) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{ + FieldSelector: fieldselector.String(), + ResourceVersion: options.ResourceVersion, + }) + }, } - utillog.Infof("successfully downloaded ConfigMap with UID %q", uid) - return payload, "", nil + informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod) + informer.AddEventHandler(handler) + + return informer } func (r *remoteConfigMap) Encode() ([]byte, error) { @@ -182,3 +256,18 @@ func (r *remoteConfigMap) Encode() ([]byte, error) { func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource { return r.source.DeepCopy() } + +func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) { + key := fmt.Sprintf("%s/%s", namespace, name) + obj, ok, err := store.GetByKey(key) + if err != nil || !ok { + return nil, err + } + cm, ok := obj.(*apiv1.ConfigMap) + if !ok { + err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key) + utillog.Errorf(err.Error()) + return nil, err + } + return cm, nil +} diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go index 39f394100c..df92ee9f66 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go @@ -25,7 +25,9 @@ import ( apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test" ) @@ -119,75 +121,93 @@ func TestRemoteConfigMapAPIPath(t *testing.T) { func TestRemoteConfigMapDownload(t *testing.T) { cm := &apiv1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Name: "name", - Namespace: "namespace", - UID: "uid", + Name: "name", + Namespace: "namespace", + UID: "uid", + ResourceVersion: "1", }} - client := fakeclient.NewSimpleClientset(cm) - payload, err := NewConfigMapPayload(cm) + + source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + KubeletConfigKey: "kubelet", + }} + + expectPayload, err := NewConfigMapPayload(cm) if err != nil { t.Fatalf("error constructing payload: %v", err) } - makeSource := func(source *apiv1.NodeConfigSource) RemoteConfigSource { - s, _, err := NewRemoteConfigSource(source) - if err != nil { - t.Fatalf("error constructing remote config source %v", err) - } - return s + missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + if err := hasStore.Add(cm); err != nil { + t.Fatalf("unexpected error constructing hasStore") } + missingClient := fakeclient.NewSimpleClientset() + hasClient := fakeclient.NewSimpleClientset(cm) + cases := []struct { desc string - source RemoteConfigSource - expect Payload + client clientset.Interface + store cache.Store err string }{ { - desc: "object doesn't exist", - source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "bogus", - Namespace: "namespace", - UID: "bogus", - KubeletConfigKey: "kubelet", - }}), - expect: nil, + desc: "nil store, object does not exist in API server", + client: missingClient, err: "not found", }, { - desc: "UID is incorrect for namespace/name", - source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: "bogus", - KubeletConfigKey: "kubelet", - }}), - expect: nil, - err: "does not match", + desc: "nil store, object exists in API server", + client: hasClient, }, { - desc: "object exists and reference is correct", - source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: "uid", - KubeletConfigKey: "kubelet", - }}), - expect: payload, - err: "", + desc: "object exists in store and API server", + store: hasStore, + client: hasClient, + }, + { + desc: "object exists in store, but does not exist in API server", + store: hasStore, + client: missingClient, + }, + { + desc: "object does not exist in store, but exists in API server", + store: missingStore, + client: hasClient, + }, + { + desc: "object does not exist in store or API server", + client: missingClient, + store: missingStore, + err: "not found", }, } for _, c := range cases { t.Run(c.desc, func(t *testing.T) { - payload, _, err := c.source.Download(client) + // deep copy so we can always check the UID/ResourceVersion are set after Download + s, _, err := NewRemoteConfigSource(source.DeepCopy()) + if err != nil { + t.Fatalf("error constructing remote config source %v", err) + } + // attempt download + p, _, err := s.Download(c.client, c.store) utiltest.ExpectError(t, err, c.err) if err != nil { return } // downloaded object should match the expected - if !apiequality.Semantic.DeepEqual(c.expect.object(), payload.object()) { - t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(payload)) + if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) { + t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p)) + } + // source UID and ResourceVersion should be updated by Download + if p.UID() != s.UID() { + t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID()) + } + if p.ResourceVersion() != s.ResourceVersion() { + t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion()) } }) } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go index 8fcc99e62a..94cd42fea6 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore.go @@ -75,32 +75,46 @@ func (s *fsStore) Initialize() error { return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir)) } -func (s *fsStore) Exists(c checkpoint.RemoteConfigSource) (bool, error) { +func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) { + const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v" + if len(source.UID()) == 0 { + return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous") + } + if len(source.ResourceVersion()) == 0 { + return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous") + } + // we check whether the directory was created for the resource - uid := c.UID() - ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(uid)) + ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion())) if err != nil { - return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err) + return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err) } return ok, nil } -func (s *fsStore) Save(c checkpoint.Payload) error { +func (s *fsStore) Save(payload checkpoint.Payload) error { + // Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty + path := s.checkpointPath(payload.UID(), payload.ResourceVersion()) + // ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replacee + // to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap + if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil { + return err + } // save the checkpoint's files in the appropriate checkpoint dir - return utilfiles.ReplaceDir(s.fs, s.checkpointPath(c.UID()), c.Files()) + return utilfiles.ReplaceDir(s.fs, path, payload.Files()) } func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) { - sourceFmt := fmt.Sprintf("%s:%s", source.APIPath(), source.UID()) + sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion()) // check if a checkpoint exists for the source if ok, err := s.Exists(source); err != nil { - return nil, fmt.Errorf("failed to determine if a checkpoint exists for source %s", sourceFmt) + return nil, err } else if !ok { return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt) } // load the kubelet config file - utillog.Infof("loading kubelet configuration checkpoint for source %s", sourceFmt) - loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID()), source.KubeletFilename())) + utillog.Infof("loading Kubelet configuration checkpoint for source %s", sourceFmt) + loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename())) if err != nil { return nil, err } @@ -140,8 +154,8 @@ func (s *fsStore) Reset() (bool, error) { return reset(s) } -func (s *fsStore) checkpointPath(uid string) string { - return filepath.Join(s.dir, checkpointsDir, uid) +func (s *fsStore) checkpointPath(uid, resourceVersion string) string { + return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion) } func (s *fsStore) metaPath(name string) string { @@ -163,6 +177,14 @@ func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoin if source == nil { return utilfiles.ReplaceFile(fs, path, []byte{}) } + // check that UID and ResourceVersion are non-empty, + // error to save reference if the checkpoint can't be fully resolved + if source.UID() == "" { + return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous") + } + if source.ResourceVersion() == "" { + return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous") + } // encode the source and save it to the file data, err := source.Encode() if err != nil { diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go index cd73f7d35c..27414a10d0 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/fsstore_test.go @@ -83,8 +83,8 @@ func TestFsStoreInitialize(t *testing.T) { } // check that checkpoints dir exists - if _, err := store.fs.Stat(store.checkpointPath("")); err != nil { - t.Fatalf("expect %q to exist, but stat failed with error: %v", store.checkpointPath(""), err) + if _, err := store.fs.Stat(filepath.Join(store.dir, checkpointsDir)); err != nil { + t.Fatalf("expect %q to exist, but stat failed with error: %v", filepath.Join(store.dir, checkpointsDir), err) } // check that assignedFile exists @@ -105,21 +105,29 @@ func TestFsStoreExists(t *testing.T) { } // checkpoint a payload - const uid = "uid" - p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid}}) + const ( + uid = "uid" + resourceVersion = "1" + ) + p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid, ResourceVersion: resourceVersion}}) if err != nil { - t.Fatalf("could not construct checkpoint, error: %v", err) + t.Fatalf("could not construct Payload, error: %v", err) + } + if err := store.Save(p); err != nil { + t.Fatalf("unexpected error: %v", err) } - store.Save(p) cases := []struct { - desc string - uid types.UID - expect bool - err string + desc string + uid types.UID + resourceVersion string + expect bool + err string }{ - {"exists", uid, true, ""}, - {"does not exist", "bogus-uid", false, ""}, + {"exists", uid, resourceVersion, true, ""}, + {"does not exist", "bogus-uid", "bogus-resourceVersion", false, ""}, + {"ambiguous UID", "", "bogus-resourceVersion", false, "empty UID is ambiguous"}, + {"ambiguous ResourceVersion", "bogus-uid", "", false, "empty ResourceVersion is ambiguous"}, } for _, c := range cases { @@ -129,10 +137,11 @@ func TestFsStoreExists(t *testing.T) { Name: "name", Namespace: "namespace", UID: c.uid, + ResourceVersion: c.resourceVersion, KubeletConfigKey: "kubelet", }}) if err != nil { - t.Fatalf("error constructing remote config source: %v", err) + t.Fatalf("unexpected error: %v", err) } ok, err := store.Exists(source) utiltest.ExpectError(t, err, c.err) @@ -160,38 +169,44 @@ func TestFsStoreSave(t *testing.T) { return s }() + const ( + uid = "uid" + resourceVersion = "1" + ) + cases := []struct { - desc string - files map[string]string - err string + desc string + uid types.UID + resourceVersion string + files map[string]string + err string }{ - {"valid payload", map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""}, - {"empty key name", map[string]string{"": "foocontent"}, "must not be empty"}, - {"key name is not a base file name (foo/bar)", map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"}, - {"key name is not a base file name (/foo)", map[string]string{"/bar": "foocontent"}, "only base names are allowed"}, - {"used .", map[string]string{".": "foocontent"}, "may not be '.' or '..'"}, - {"used ..", map[string]string{"..": "foocontent"}, "may not be '.' or '..'"}, - {"length violation", map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"}, + {"valid payload", uid, resourceVersion, map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""}, + {"empty key name", uid, resourceVersion, map[string]string{"": "foocontent"}, "must not be empty"}, + {"key name is not a base file name (foo/bar)", uid, resourceVersion, map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"}, + {"key name is not a base file name (/foo)", uid, resourceVersion, map[string]string{"/bar": "foocontent"}, "only base names are allowed"}, + {"used .", uid, resourceVersion, map[string]string{".": "foocontent"}, "may not be '.' or '..'"}, + {"used ..", uid, resourceVersion, map[string]string{"..": "foocontent"}, "may not be '.' or '..'"}, + {"length violation", uid, resourceVersion, map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"}, } for _, c := range cases { t.Run(c.desc, func(t *testing.T) { // construct the payload p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{UID: "uid"}, + ObjectMeta: metav1.ObjectMeta{UID: c.uid, ResourceVersion: c.resourceVersion}, Data: c.files, }) - if err != nil { - t.Fatalf("error constructing payload: %v", err) + // if no error, save the payload, otherwise skip straight to error handler + if err == nil { + err = store.Save(p) } - // save the payload - err = store.Save(p) utiltest.ExpectError(t, err, c.err) if err != nil { return } // read the saved checkpoint - m, err := mapFromCheckpoint(store, p.UID()) + m, err := mapFromCheckpoint(store, p.UID(), p.ResourceVersion()) if err != nil { t.Fatalf("error loading checkpoint to map: %v", err) } @@ -220,11 +235,12 @@ func TestFsStoreLoad(t *testing.T) { } // construct a payload that contains the kubeletconfig const ( - uid = "uid" - kubeletKey = "kubelet" + uid = "uid" + resourceVersion = "1" + kubeletKey = "kubelet" ) p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)}, + ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), ResourceVersion: resourceVersion}, Data: map[string]string{ kubeletKey: string(data), }, @@ -239,12 +255,15 @@ func TestFsStoreLoad(t *testing.T) { } cases := []struct { - desc string - uid types.UID - err string + desc string + uid types.UID + resourceVersion string + err string }{ - {"checkpoint exists", uid, ""}, - {"checkpoint does not exist", "bogus-uid", "no checkpoint for source"}, + {"checkpoint exists", uid, resourceVersion, ""}, + {"checkpoint does not exist", "bogus-uid", "bogus-resourceVersion", "no checkpoint for source"}, + {"ambiguous UID", "", "bogus-resourceVersion", "empty UID is ambiguous"}, + {"ambiguous ResourceVersion", "bogus-uid", "", "empty ResourceVersion is ambiguous"}, } for _, c := range cases { t.Run(c.desc, func(t *testing.T) { @@ -253,10 +272,11 @@ func TestFsStoreLoad(t *testing.T) { Name: "name", Namespace: "namespace", UID: c.uid, + ResourceVersion: c.resourceVersion, KubeletConfigKey: kubeletKey, }}) if err != nil { - t.Fatalf("error constructing remote config source: %v", err) + t.Fatalf("unexpected error: %v", err) } loaded, err := store.Load(source) utiltest.ExpectError(t, err, c.err) @@ -389,35 +409,80 @@ func TestFsStoreSetAssigned(t *testing.T) { t.Fatalf("error constructing store: %v", err) } - const uid = "uid" - expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1 + cases := []struct { + desc string + source *apiv1.NodeConfigSource + expect string + err string + }{ + { + desc: "nil source", + expect: "", // empty file + }, + { + desc: "non-nil source", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + UID: "uid", + ResourceVersion: "1", + KubeletConfigKey: "kubelet", + }}, + expect: `apiVersion: kubelet.config.k8s.io/v1beta1 kind: SerializedNodeConfigSource source: configMap: kubeletConfigKey: kubelet name: name namespace: namespace - uid: %s -`, uid) - source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: types.UID(uid), - KubeletConfigKey: "kubelet", - }}) - if err != nil { - t.Fatalf("unexpected error: %v", err) + resourceVersion: "1" + uid: uid +`, + }, + { + desc: "missing UID", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + ResourceVersion: "1", + KubeletConfigKey: "kubelet", + }}, + err: "failed to write RemoteConfigSource, empty UID is ambiguous", + }, + { + desc: "missing ResourceVersion", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + UID: "uid", + KubeletConfigKey: "kubelet", + }}, + err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous", + }, } - // save the assigned source - if err := store.SetAssigned(source); err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // check that the source saved as we would expect - data := readTestSourceFile(t, store, assignedFile) - if expect != string(data) { - t.Errorf("expect assigned source file to contain %q, but got %q", expect, string(data)) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + var source checkpoint.RemoteConfigSource + if c.source != nil { + s, _, err := checkpoint.NewRemoteConfigSource(c.source) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + source = s + } + // save the assigned source + err = store.SetAssigned(source) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // check that the source saved as we would expect + data := readTestSourceFile(t, store, assignedFile) + if c.expect != string(data) { + t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data)) + } + }) } } @@ -427,35 +492,80 @@ func TestFsStoreSetLastKnownGood(t *testing.T) { t.Fatalf("error constructing store: %v", err) } - const uid = "uid" - expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1 + cases := []struct { + desc string + source *apiv1.NodeConfigSource + expect string + err string + }{ + { + desc: "nil source", + expect: "", // empty file + }, + { + desc: "non-nil source", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + UID: "uid", + ResourceVersion: "1", + KubeletConfigKey: "kubelet", + }}, + expect: `apiVersion: kubelet.config.k8s.io/v1beta1 kind: SerializedNodeConfigSource source: configMap: kubeletConfigKey: kubelet name: name namespace: namespace - uid: %s -`, uid) - source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: types.UID(uid), - KubeletConfigKey: "kubelet", - }}) - if err != nil { - t.Fatalf("unexpected error: %v", err) + resourceVersion: "1" + uid: uid +`, + }, + { + desc: "missing UID", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + ResourceVersion: "1", + KubeletConfigKey: "kubelet", + }}, + err: "failed to write RemoteConfigSource, empty UID is ambiguous", + }, + { + desc: "missing ResourceVersion", + source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Name: "name", + Namespace: "namespace", + UID: "uid", + KubeletConfigKey: "kubelet", + }}, + err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous", + }, } - // save the last known good source - if err := store.SetLastKnownGood(source); err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // check that the source saved as we would expect - data := readTestSourceFile(t, store, lastKnownGoodFile) - if expect != string(data) { - t.Errorf("expect last-known-good source file to contain %q, but got %q", expect, string(data)) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + var source checkpoint.RemoteConfigSource + if c.source != nil { + s, _, err := checkpoint.NewRemoteConfigSource(c.source) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + source = s + } + // save the assigned source + err = store.SetLastKnownGood(source) + utiltest.ExpectError(t, err, c.err) + if err != nil { + return + } + // check that the source saved as we would expect + data := readTestSourceFile(t, store, lastKnownGoodFile) + if c.expect != string(data) { + t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data)) + } + }) } } @@ -536,107 +646,8 @@ func TestFsStoreReset(t *testing.T) { } } -func TestFsStoreReadRemoteConfigSource(t *testing.T) { - store, err := newInitializedFakeFsStore() - if err != nil { - t.Fatalf("error constructing store: %v", err) - } - - source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ - ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: "uid", - KubeletConfigKey: "kubelet", - }}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - cases := []struct { - desc string - expect checkpoint.RemoteConfigSource - err string - }{ - {"default source", nil, ""}, - {"non-default source", source, ""}, - } - - const name = "some-source-file" - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - saveTestSourceFile(t, store, name, c.expect) - source, err := readRemoteConfigSource(store.fs, store.metaPath(name)) - utiltest.ExpectError(t, err, c.err) - if err != nil { - return - } - if !checkpoint.EqualRemoteConfigSources(c.expect, source) { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source)) - } - }) - } -} - -func TestFsStoreWriteRemoteConfigSource(t *testing.T) { - store, err := newInitializedFakeFsStore() - if err != nil { - t.Fatalf("error constructing store: %v", err) - } - - source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Name: "name", - Namespace: "namespace", - UID: "uid", - KubeletConfigKey: "kubelet", - }}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - cases := []struct { - desc string - source checkpoint.RemoteConfigSource - }{ - {"nil source", nil}, - {"non-nil source", source}, - } - - const name = "some-source-file" - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - // set the source file - err := writeRemoteConfigSource(store.fs, store.metaPath(name), c.source) - if err != nil { - t.Fatalf("unable to set source file, error: %v", err) - } - // read back the file - data := readTestSourceFile(t, store, name) - str := string(data) - - if c.source != nil { - // expect the contents to match the encoding of the source - data, err := c.source.Encode() - expect := string(data) - if err != nil { - t.Fatalf("couldn't encode source, error: %v", err) - } - if expect != str { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) - } - } else { - // expect empty file - expect := "" - if expect != str { - t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str) - } - } - }) - } -} - -func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) { - files, err := store.fs.ReadDir(store.checkpointPath(uid)) +func mapFromCheckpoint(store *fsStore, uid, resourceVersion string) (map[string]string, error) { + files, err := store.fs.ReadDir(store.checkpointPath(uid, resourceVersion)) if err != nil { return nil, err } @@ -647,7 +658,7 @@ func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) { return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid) } // read the file contents and build the map - data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid), f.Name())) + data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid, resourceVersion), f.Name())) if err != nil { return nil, err } diff --git a/pkg/kubelet/kubeletconfig/checkpoint/store/store.go b/pkg/kubelet/kubeletconfig/checkpoint/store/store.go index cd4602876d..0ceb34a600 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/store/store.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/store/store.go @@ -30,6 +30,7 @@ type Store interface { Initialize() error // Exists returns true if the object referenced by `source` has been checkpointed. + // The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion. Exists(source checkpoint.RemoteConfigSource) (bool, error) // Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration. // The following payload types are supported: diff --git a/pkg/kubelet/kubeletconfig/configsync.go b/pkg/kubelet/kubeletconfig/configsync.go index 8662e568d5..cb92fb7e65 100644 --- a/pkg/kubelet/kubeletconfig/configsync.go +++ b/pkg/kubelet/kubeletconfig/configsync.go @@ -37,10 +37,10 @@ import ( const ( // KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration KubeletConfigChangedEventReason = "KubeletConfigChanged" - // EventMessageFmt is the message format for Kubelet config change events - EventMessageFmt = "Kubelet will restart to use: %s" - // LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults) - LocalConfigMessage = "local config" + // LocalEventMessage is sent when the Kubelet restarts to use local config + LocalEventMessage = "Kubelet restarting to use local config" + // RemoteEventMessageFmt is sent when the Kubelet restarts to use a remote config + RemoteEventMessageFmt = "Kubelet restarting to use %s, UID: %s, ResourceVersion: %s, KubeletConfigKey: %s" ) // pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done @@ -69,111 +69,116 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v } }() - node, err := latestNode(cc.informer.GetStore(), nodeName) + // get the latest Node.Spec.ConfigSource from the informer + source, err := latestNodeConfigSource(cc.nodeInformer.GetStore(), nodeName) if err != nil { cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, status.InternalError)) syncerr = fmt.Errorf("%s, error: %v", status.InternalError, err) return } - // check the Node and download any new config - if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil { - cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, reason)) + // a nil source simply means we reset to local defaults + if source == nil { + utillog.Infof("Node.Spec.ConfigSource is empty, will reset assigned and last-known-good to defaults") + if updated, reason, err := cc.resetConfig(); err != nil { + reason = fmt.Sprintf(status.SyncErrorFmt, reason) + cc.configStatus.SetErrorOverride(reason) + syncerr = fmt.Errorf("%s, error: %v", reason, err) + return + } else if updated { + restartForNewConfig(eventClient, nodeName, nil) + } + return + } + + // a non-nil source means we should attempt to download the config, and checkpoint it if necessary + utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary") + + // TODO(mtaufen): It would be nice if we could check the payload's metadata before (re)downloading the whole payload + // we at least try pulling the latest configmap out of the local informer store. + + // construct the interface that can dynamically dispatch the correct Download, etc. methods for the given source type + remote, reason, err := checkpoint.NewRemoteConfigSource(source) + if err != nil { + reason = fmt.Sprintf(status.SyncErrorFmt, reason) + cc.configStatus.SetErrorOverride(reason) + syncerr = fmt.Errorf("%s, error: %v", reason, err) + return + } + + // "download" source, either from informer's in-memory store or directly from the API server, if the informer doesn't have a copy + payload, reason, err := cc.downloadConfigPayload(client, remote) + if err != nil { + reason = fmt.Sprintf(status.SyncErrorFmt, reason) + cc.configStatus.SetErrorOverride(reason) + syncerr = fmt.Errorf("%s, error: %v", reason, err) + return + } + + // save a checkpoint for the payload, if one does not already exist + if reason, err := cc.saveConfigCheckpoint(remote, payload); err != nil { + reason = fmt.Sprintf(status.SyncErrorFmt, reason) + cc.configStatus.SetErrorOverride(reason) + syncerr = fmt.Errorf("%s, error: %v", reason, err) + return + } + + // update the local, persistent record of assigned config + if updated, reason, err := cc.setAssignedConfig(remote); err != nil { + reason = fmt.Sprintf(status.SyncErrorFmt, reason) + cc.configStatus.SetErrorOverride(reason) syncerr = fmt.Errorf("%s, error: %v", reason, err) return } else if updated { - path := LocalConfigMessage - if cur != nil { - path = cur.APIPath() - } - // we directly log and send the event, instead of using the event recorder, - // because the event recorder won't flush its queue before we exit (we'd lose the event) - event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path) - glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message) - if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil { - utillog.Errorf("failed to send event, error: %v", err) - } - os.Exit(0) + restartForNewConfig(eventClient, nodeName, remote) } // If we get here: - // - there is no need to restart to update the current config + // - there is no need to restart to use new config // - there was no error trying to sync configuration // - if, previously, there was an error trying to sync configuration, we need to clear that error from the status cc.configStatus.SetErrorOverride("") } -// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config, -// depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result -func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) { - if source == nil { - utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults") - updated, reason, err := cc.resetConfig() - if err != nil { - return false, nil, reason, err - } - return updated, nil, "", nil +// Note: source has up-to-date uid and resourceVersion after calling downloadConfigPayload. +func (cc *Controller) downloadConfigPayload(client clientset.Interface, source checkpoint.RemoteConfigSource) (checkpoint.Payload, string, error) { + var store cache.Store + if cc.remoteConfigSourceInformer != nil { + store = cc.remoteConfigSourceInformer.GetStore() } - - // if the NodeConfigSource is non-nil, download the config - utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary") - remote, reason, err := checkpoint.NewRemoteConfigSource(source) - if err != nil { - return false, nil, reason, err - } - reason, err = cc.checkpointConfigSource(client, remote) - if err != nil { - return false, nil, reason, err - } - updated, reason, err := cc.setAssignedConfig(remote) - if err != nil { - return false, nil, reason, err - } - return updated, remote, "", nil + return source.Download(client, store) } -// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist, -// if a failure occurs, returns a sanitized failure reason and an error -func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) { - // if the checkpoint already exists, skip downloading - if ok, err := cc.checkpointStore.Exists(source); err != nil { +func (cc *Controller) saveConfigCheckpoint(source checkpoint.RemoteConfigSource, payload checkpoint.Payload) (string, error) { + ok, err := cc.checkpointStore.Exists(source) + if err != nil { return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err) - } else if ok { - // TODO(mtaufen): update this to include ResourceVersion in #63221 - utillog.Infof("checkpoint already exists for object %s with UID %s, skipping download", source.APIPath(), source.UID()) + } + if ok { + utillog.Infof("checkpoint already exists for %s, UID: %s, ResourceVersion: %s", source.APIPath(), payload.UID(), payload.ResourceVersion()) return "", nil } - - // download - payload, reason, err := source.Download(client) - if err != nil { - return reason, fmt.Errorf("%s, error: %v", reason, err) - } - - // save - err = cc.checkpointStore.Save(payload) - if err != nil { + if err := cc.checkpointStore.Save(payload); err != nil { return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err) } - return "", nil } // setAssignedConfig updates the assigned checkpoint config in the store. -// Returns whether the current config changed as a result, or a sanitized failure reason and an error. +// Returns whether the assigned config changed as a result, or a sanitized failure reason and an error. func (cc *Controller) setAssignedConfig(source checkpoint.RemoteConfigSource) (bool, string, error) { - current, err := cc.checkpointStore.Assigned() + assigned, err := cc.checkpointStore.Assigned() if err != nil { return false, status.InternalError, err } if err := cc.checkpointStore.SetAssigned(source); err != nil { return false, status.InternalError, err } - return !checkpoint.EqualRemoteConfigSources(current, source), "", nil + return !checkpoint.EqualRemoteConfigSources(assigned, source), "", nil } -// resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and -// returns whether the current checkpoint changed as a result, or a sanitized failure reason and an error. +// resetConfig resets the assigned and last-known-good checkpoints in the checkpoint store to their default values and +// returns whether the assigned checkpoint changed as a result, or a sanitized failure reason and an error. func (cc *Controller) resetConfig() (bool, string, error) { updated, err := cc.checkpointStore.Reset() if err != nil { @@ -182,8 +187,26 @@ func (cc *Controller) resetConfig() (bool, string, error) { return updated, "", nil } -// latestNode returns the most recent Node with `nodeName` from `store` -func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) { +// restartForNewConfig presumes the Kubelet is managed by a babysitter, e.g. systemd +// It will send an event before exiting. +func restartForNewConfig(eventClient v1core.EventsGetter, nodeName string, source checkpoint.RemoteConfigSource) { + message := LocalEventMessage + if source != nil { + message = fmt.Sprintf(RemoteEventMessageFmt, source.APIPath(), source.UID(), source.ResourceVersion(), source.KubeletFilename()) + } + // we directly log and send the event, instead of using the event recorder, + // because the event recorder won't flush its queue before we exit (we'd lose the event) + event := makeEvent(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, message) + glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message) + if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil { + utillog.Errorf("failed to send event, error: %v", err) + } + utillog.Infof(message) + os.Exit(0) +} + +// latestNodeConfigSource returns a copy of the most recent NodeConfigSource from the Node with `nodeName` in `store` +func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConfigSource, error) { obj, ok, err := store.GetByKey(nodeName) if err != nil { err := fmt.Errorf("failed to retrieve Node %q from informer's store, error: %v", nodeName, err) @@ -200,13 +223,11 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) { utillog.Errorf(err.Error()) return nil, err } - return node, nil -} - -// eventf constructs and returns an event containing a formatted message -// similar to k8s.io/client-go/tools/record/event.go -func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event { - return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...)) + // Copy the source, so anyone who modifies it after here doesn't mess up the informer's store! + // This was previously the cause of a bug that made the Kubelet frequently resync config; Download updated + // the UID and ResourceVersion on the NodeConfigSource, but the pointer was still drilling all the way + // into the informer's copy! + return node.Spec.ConfigSource.DeepCopy(), nil } // makeEvent constructs an event diff --git a/pkg/kubelet/kubeletconfig/controller.go b/pkg/kubelet/kubeletconfig/controller.go index f665745ab2..098a3ed6ce 100644 --- a/pkg/kubelet/kubeletconfig/controller.go +++ b/pkg/kubelet/kubeletconfig/controller.go @@ -21,6 +21,7 @@ import ( "path/filepath" "time" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -52,8 +53,11 @@ type Controller struct { // configStatus manages the status we report on the Node object configStatus status.NodeConfigStatus - // informer is the informer that watches the Node object - informer cache.SharedInformer + // nodeInformer is the informer that watches the Node object + nodeInformer cache.SharedInformer + + // remoteConfigSourceInformer is the informer that watches the assigned config source + remoteConfigSourceInformer cache.SharedInformer // checkpointStore persists config source checkpoints to a storage layer checkpointStore store.Store @@ -139,51 +143,80 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) { return nil, err } - // update the active source to the non-nil last-known-good source + // set status to indicate the active source is the non-nil last-known-good source cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource()) return lastKnownGoodConfig, nil } -// StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty. -// It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops -// if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged. -func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) { +// StartSync tells the controller to start the goroutines that sync status/config to/from the API server. +// The clients must be non-nil, and the nodeName must be non-empty. +func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error { + const errFmt = "cannot start Kubelet config sync: %s" if client == nil { - utillog.Infof("nil client, will not start sync loops") - return - } else if len(nodeName) == 0 { - utillog.Errorf("cannot start sync loops with empty nodeName") - return + return fmt.Errorf(errFmt, "nil client") + } + if eventClient == nil { + return fmt.Errorf(errFmt, "nil event client") + } + if nodeName == "" { + return fmt.Errorf(errFmt, "empty nodeName") } - // start the status sync loop - go utilpanic.HandlePanic(func() { - utillog.Infof("starting status sync loop") - wait.JitterUntil(func() { - cc.configStatus.Sync(client, nodeName) - }, 10*time.Second, 0.2, true, wait.NeverStop) - })() - - cc.informer = newSharedNodeInformer(client, nodeName, - cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent) - // start the informer loop // Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet, // we use HandlePanic to manually call the panic handlers and then crash. // We have a better chance of recovering normal operation if we just restart the Kubelet in the event // of a Go runtime error. - go utilpanic.HandlePanic(func() { - utillog.Infof("starting Node informer sync loop") - cc.informer.Run(wait.NeverStop) - })() + // NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run! + // This was EVIL to debug (difficult to see missing `()`). + // The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion. - // start the config source sync loop - go utilpanic.HandlePanic(func() { - utillog.Infof("starting config source sync loop") + // status sync worker + statusSyncLoopFunc := utilpanic.HandlePanic(func() { + utillog.Infof("starting status sync loop") + wait.JitterUntil(func() { + cc.configStatus.Sync(client, nodeName) + }, 10*time.Second, 0.2, true, wait.NeverStop) + }) + // remote config source informer, if we have a remote source to watch + assignedSource, err := cc.checkpointStore.Assigned() + if err != nil { + return fmt.Errorf(errFmt, err) + } else if assignedSource == nil { + utillog.Infof("local source is assigned, will not start remote config source informer") + } else { + cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{ + AddFunc: cc.onAddRemoteConfigSourceEvent, + UpdateFunc: cc.onUpdateRemoteConfigSourceEvent, + DeleteFunc: cc.onDeleteRemoteConfigSourceEvent, + }, + ) + } + remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() { + if cc.remoteConfigSourceInformer != nil { + utillog.Infof("starting remote config source informer") + cc.remoteConfigSourceInformer.Run(wait.NeverStop) + } + }) + // node informer + cc.nodeInformer = newSharedNodeInformer(client, nodeName, + cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent) + nodeInformerFunc := utilpanic.HandlePanic(func() { + utillog.Infof("starting Node informer") + cc.nodeInformer.Run(wait.NeverStop) + }) + // config sync worker + configSyncLoopFunc := utilpanic.HandlePanic(func() { + utillog.Infof("starting Kubelet config sync loop") wait.JitterUntil(func() { cc.syncConfigSource(client, eventClient, nodeName) }, 10*time.Second, 0.2, true, wait.NeverStop) - })() + }) + go statusSyncLoopFunc() + go remoteConfigSourceInformerFunc() + go nodeInformerFunc() + go configSyncLoopFunc() + return nil } // loadConfig loads Kubelet config from a checkpoint @@ -213,7 +246,6 @@ func (cc *Controller) checkTrial(duration time.Duration) { if trial, err := cc.inTrial(duration); err != nil { utillog.Errorf("failed to check trial period for assigned config, error: %v", err) } else if !trial { - utillog.Infof("assigned config passed trial period, will set as last-known-good") if err := cc.graduateAssignedToLastKnownGood(); err != nil { utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err) } @@ -236,17 +268,28 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) { // graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore // to the same value as the assigned config maintained by the checkpointStore func (cc *Controller) graduateAssignedToLastKnownGood() error { - // get the assigned config + // get assigned assigned, err := cc.checkpointStore.Assigned() if err != nil { return err } - // update the last-known-good config + // get last-known-good + lastKnownGood, err := cc.checkpointStore.LastKnownGood() + if err != nil { + return err + } + // if the sources are equal, no need to change + if assigned == lastKnownGood || + assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) { + return nil + } + // update last-known-good err = cc.checkpointStore.SetLastKnownGood(assigned) if err != nil { return err } // update the status to reflect the new last-known-good config cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource()) + utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion()) return nil } diff --git a/pkg/kubelet/kubeletconfig/status/status.go b/pkg/kubelet/kubeletconfig/status/status.go index f683ea212b..ae8cdec069 100644 --- a/pkg/kubelet/kubeletconfig/status/status.go +++ b/pkg/kubelet/kubeletconfig/status/status.go @@ -80,9 +80,12 @@ type nodeConfigStatus struct { // NewNodeConfigStatus returns a new NodeConfigStatus interface func NewNodeConfigStatus() NodeConfigStatus { + // channels must have capacity at least 1, since we signal with non-blocking writes + syncCh := make(chan bool, 1) + // prime new status managers to sync with the API server on the first call to Sync + syncCh <- true return &nodeConfigStatus{ - // channels must have capacity at least 1, since we signal with non-blocking writes - syncCh: make(chan bool, 1), + syncCh: syncCh, } } @@ -142,6 +145,8 @@ func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) { return } + utillog.Infof("updating Node.Status.Config") + // grab the lock s.mux.Lock() defer s.mux.Unlock() diff --git a/pkg/kubelet/kubeletconfig/watch.go b/pkg/kubelet/kubeletconfig/watch.go index 09b22233e0..879fb747c7 100644 --- a/pkg/kubelet/kubeletconfig/watch.go +++ b/pkg/kubelet/kubeletconfig/watch.go @@ -86,6 +86,7 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{}) } if oldObj == nil { // Node was just added, need to sync + utillog.Infof("initial Node watch event") cc.pokeConfigSourceWorker() return } @@ -95,31 +96,59 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{}) return } if !apiequality.Semantic.DeepEqual(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) { + utillog.Infof("Node.Spec.ConfigSource was updated") cc.pokeConfigSourceWorker() } } -// onDeleteNodeEvent logs a message if the Node was deleted and may log errors -// if an unexpected DeletedFinalStateUnknown was received. +// onDeleteNodeEvent logs a message if the Node was deleted // We allow the sync-loop to continue, because it is possible that the Kubelet detected // a Node with unexpected externalID and is attempting to delete and re-create the Node // (see pkg/kubelet/kubelet_node_status.go), or that someone accidentally deleted the Node // (the Kubelet will re-create it). func (cc *Controller) onDeleteNodeEvent(deletedObj interface{}) { - node, ok := deletedObj.(*apiv1.Node) + // For this case, we just log the event. + // We don't want to poke the worker, because a temporary deletion isn't worth reporting an error for. + // If the Node is deleted because the VM is being deleted, then the Kubelet has nothing to do. + utillog.Infof("Node was deleted") +} + +// onAddRemoteConfigSourceEvent calls onUpdateConfigMapEvent with the new object and a nil old object +func (cc *Controller) onAddRemoteConfigSourceEvent(newObj interface{}) { + cc.onUpdateRemoteConfigSourceEvent(nil, newObj) +} + +// onUpdateRemoteConfigSourceEvent checks whether the configSource changed between oldObj and newObj, +// and pokes the sync worker if there was a change +func (cc *Controller) onUpdateRemoteConfigSourceEvent(oldObj interface{}, newObj interface{}) { + // since ConfigMap is currently the only source type, we handle that here + newConfigMap, ok := newObj.(*apiv1.ConfigMap) if !ok { - tombstone, ok := deletedObj.(cache.DeletedFinalStateUnknown) - if !ok { - utillog.Errorf("couldn't cast deleted object to DeletedFinalStateUnknown, object: %+v", deletedObj) - return - } - node, ok = tombstone.Obj.(*apiv1.Node) - if !ok { - utillog.Errorf("received DeletedFinalStateUnknown object but it did not contain a Node, object: %+v", deletedObj) - return - } - utillog.Infof("Node was deleted (DeletedFinalStateUnknown), sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node) + utillog.Errorf("failed to cast new object to ConfigMap, couldn't handle event") return } - utillog.Infof("Node was deleted, sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node) + if oldObj == nil { + // ConfigMap was just added, need to sync + utillog.Infof("initial ConfigMap watch event") + cc.pokeConfigSourceWorker() + return + } + oldConfigMap, ok := oldObj.(*apiv1.ConfigMap) + if !ok { + utillog.Errorf("failed to cast old object to ConfigMap, couldn't handle event") + return + } + if !apiequality.Semantic.DeepEqual(oldConfigMap, newConfigMap) { + utillog.Infof("assigned ConfigMap was updated") + cc.pokeConfigSourceWorker() + } +} + +// onDeleteRemoteConfigSourceEvent logs a message if the ConfigMap was deleted and pokes the sync worker +func (cc *Controller) onDeleteRemoteConfigSourceEvent(deletedObj interface{}) { + // If the ConfigMap we're watching is deleted, we log the event and poke the sync worker. + // This requires a sync, because if the Node is still configured to use the deleted ConfigMap, + // the Kubelet should report a DownloadError. + utillog.Infof("assigned ConfigMap was deleted") + cc.pokeConfigSourceWorker() } diff --git a/test/e2e_node/dynamic_kubelet_config_test.go b/test/e2e_node/dynamic_kubelet_config_test.go index eee41be6e5..3d709b99d1 100644 --- a/test/e2e_node/dynamic_kubelet_config_test.go +++ b/test/e2e_node/dynamic_kubelet_config_test.go @@ -18,7 +18,6 @@ package e2e_node import ( "fmt" - "reflect" "strings" "time" @@ -26,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" controller "k8s.io/kubernetes/pkg/kubelet/kubeletconfig" @@ -37,18 +37,23 @@ import ( . "github.com/onsi/gomega" ) -type configStateStatus struct { - apiv1.NodeConfigStatus +const itDescription = "status and events should match expectations" - SkipActive bool - SkipAssigned bool - SkipLkg bool +type expectNodeConfigStatus struct { + lastKnownGood *apiv1.NodeConfigSource + err string + // If true, expect Status.Config.Active == Status.Config.LastKnownGood, + // otherwise expect Status.Config.Active == Status.Config.Assigned. + lkgActive bool + // If true, skip checking Status.Config.LastKnownGood == this.lastKnownGood in the status. + skipLkg bool } -type configState struct { +type nodeConfigTestCase struct { desc string configSource *apiv1.NodeConfigSource - expectConfigStatus *configStateStatus + configMap *apiv1.ConfigMap + expectConfigStatus expectNodeConfigStatus expectConfig *kubeletconfig.KubeletConfiguration // whether to expect this substring in an error returned from the API server when updating the config source apierr string @@ -60,63 +65,77 @@ type configState struct { } // This test is marked [Disruptive] because the Kubelet restarts several times during this test. -var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKubeletConfig] [Serial] [Disruptive]", func() { +var _ = framework.KubeDescribe("[Feature:DynamicKubeletConfig] [Serial] [Disruptive]", func() { f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test") - var originalKC *kubeletconfig.KubeletConfiguration - var originalConfigMap *apiv1.ConfigMap + var beforeNode *apiv1.Node + var beforeConfigMap *apiv1.ConfigMap + var beforeKC *kubeletconfig.KubeletConfiguration + var localKC *kubeletconfig.KubeletConfiguration // Dummy context to prevent framework's AfterEach from cleaning up before this test's AfterEach can run Context("", func() { BeforeEach(func() { - var err error - if originalConfigMap == nil { - originalKC, err = getCurrentKubeletConfig() - framework.ExpectNoError(err) - originalConfigMap = newKubeletConfigMap("original-values", originalKC) - originalConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(originalConfigMap) - framework.ExpectNoError(err) - } // make sure Dynamic Kubelet Configuration feature is enabled on the Kubelet we are about to test enabled, err := isKubeletConfigEnabled(f) framework.ExpectNoError(err) if !enabled { framework.ExpectNoError(fmt.Errorf("The Dynamic Kubelet Configuration feature is not enabled.\n" + - "Pass --feature-gates=DynamicKubeletConfig=true to the Kubelet to enable this feature.\n" + + "Pass --feature-gates=DynamicKubeletConfig=true to the Kubelet and API server to enable this feature.\n" + "For `make test-e2e-node`, you can set `TEST_ARGS='--feature-gates=DynamicKubeletConfig=true'`.")) } + // record before state so we can restore it after the test + if beforeNode == nil { + node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + beforeNode = node + } + if source := beforeNode.Spec.ConfigSource; source != nil { + if source.ConfigMap != nil { + cm, err := f.ClientSet.CoreV1().ConfigMaps(source.ConfigMap.Namespace).Get(source.ConfigMap.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + beforeConfigMap = cm + } + } + if beforeKC == nil { + kc, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + beforeKC = kc + } + // reset the node's assigned/active/last-known-good config by setting the source to nil, + // so each test starts from a clean-slate + (&nodeConfigTestCase{ + desc: "reset via nil config source", + configSource: nil, + }).run(f, setConfigSourceFunc, false, 0) + // record local KC so we can check it during tests that roll back to nil last-known-good + if localKC == nil { + kc, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + localKC = kc + } }) AfterEach(func() { - // Set the config back to the original values before moving on. - // We care that the values are the same, not where they come from, so it - // should be fine to reset the values using a remote config, even if they - // were initially set via the locally provisioned configuration. - // This is the same strategy several other e2e node tests use. - - source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: originalConfigMap.UID, - Namespace: originalConfigMap.Namespace, - Name: originalConfigMap.Name, - KubeletConfigKey: "kubelet", - }} - setAndTestKubeletConfigState(f, &configState{desc: "reset to original values", - configSource: source, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: source, - Assigned: source, - }, - SkipLkg: true, - }, - expectConfig: originalKC, - }, false) + // clean-slate the Node again (prevents last-known-good from any tests from leaking through) + (&nodeConfigTestCase{ + desc: "reset via nil config source", + configSource: nil, + }).run(f, setConfigSourceFunc, false, 0) + // restore the values from before the test before moving on + restore := &nodeConfigTestCase{ + desc: "restore values from before test", + configSource: beforeNode.Spec.ConfigSource, + configMap: beforeConfigMap, + expectConfig: beforeKC, + } + restore.run(f, setConfigSourceFunc, false, 0) }) - Context("When changing NodeConfigSources", func() { - It("the Kubelet should report the appropriate status and configz", func() { + Context("update Node.Spec.ConfigSource: state transitions:", func() { + It(itDescription, func() { var err error - // we base the "correct" configmap off of the current configuration - correctKC := originalKC.DeepCopy() + // we base the "correct" configmap off of the configuration from before the test + correctKC := beforeKC.DeepCopy() correctConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-correct", correctKC) correctConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(correctConfigMap) framework.ExpectNoError(err) @@ -131,42 +150,34 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube failParseConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(failParseConfigMap) framework.ExpectNoError(err) - // fail to validate, we make a copy and set an invalid KubeAPIQPS on kc before serializing + // fail to validate, we make a copy of correct and set an invalid KubeAPIQPS on kc before serializing invalidKC := correctKC.DeepCopy() - invalidKC.KubeAPIQPS = -1 failValidateConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-fail-validate", invalidKC) failValidateConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(failValidateConfigMap) framework.ExpectNoError(err) correctSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: correctConfigMap.UID, Namespace: correctConfigMap.Namespace, Name: correctConfigMap.Name, KubeletConfigKey: "kubelet", }} failParseSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: failParseConfigMap.UID, Namespace: failParseConfigMap.Namespace, Name: failParseConfigMap.Name, KubeletConfigKey: "kubelet", }} failValidateSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: failValidateConfigMap.UID, Namespace: failValidateConfigMap.Namespace, Name: failValidateConfigMap.Name, KubeletConfigKey: "kubelet", }} - // Note: since we start with the nil source (resets lkg), and we don't wait longer than the 10-minute internal - // qualification period before changing it again, we can assume lkg source will be nil in the status - // for this entire test, which is why we never set SkipLkg=true here. - - states := []configState{ + cases := []nodeConfigTestCase{ { desc: "Node.Spec.ConfigSource is nil", configSource: nil, - expectConfigStatus: &configStateStatus{}, + expectConfigStatus: expectNodeConfigStatus{}, expectConfig: nil, event: true, }, @@ -178,7 +189,6 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube { desc: "Node.Spec.ConfigSource.ConfigMap is missing namespace", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", Name: "bar", KubeletConfigKey: "kubelet", }}, // missing Namespace @@ -187,8 +197,7 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube { desc: "Node.Spec.ConfigSource.ConfigMap is missing name", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", - Namespace: "bar", + Namespace: "foo", KubeletConfigKey: "kubelet", }}, // missing Name apierr: "spec.configSource.configMap.name: Required value: name must be set", @@ -196,24 +205,24 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube { desc: "Node.Spec.ConfigSource.ConfigMap is missing kubeletConfigKey", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", - Namespace: "bar", - Name: "baz", + Namespace: "foo", + Name: "bar", }}, // missing KubeletConfigKey apierr: "spec.configSource.configMap.kubeletConfigKey: Required value: kubeletConfigKey must be set", }, { - desc: "Node.Spec.ConfigSource.ConfigMap is missing uid", - configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - Namespace: "bar", - Name: "baz", - KubeletConfigKey: "kubelet", - }}, // missing uid - apierr: "spec.configSource.configMap.uid: Required value: uid must be set in spec", - }, - {desc: "Node.Spec.ConfigSource.ConfigMap.ResourceVersion is illegally specified", + desc: "Node.Spec.ConfigSource.ConfigMap.UID is illegally specified", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ UID: "foo", + Name: "bar", + Namespace: "baz", + KubeletConfigKey: "kubelet", + }}, + apierr: "spec.configSource.configMap.uid: Forbidden: uid must not be set in spec", + }, + { + desc: "Node.Spec.ConfigSource.ConfigMap.ResourceVersion is illegally specified", + configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ Name: "bar", Namespace: "baz", ResourceVersion: "1", @@ -221,106 +230,77 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube }}, apierr: "spec.configSource.configMap.resourceVersion: Forbidden: resourceVersion must not be set in spec", }, - {desc: "Node.Spec.ConfigSource.ConfigMap has invalid namespace", + { + desc: "Node.Spec.ConfigSource.ConfigMap has invalid namespace", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", Name: "bar", Namespace: "../baz", KubeletConfigKey: "kubelet", }}, apierr: "spec.configSource.configMap.namespace: Invalid value", }, - {desc: "Node.Spec.ConfigSource.ConfigMap has invalid name", + { + desc: "Node.Spec.ConfigSource.ConfigMap has invalid name", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", Name: "../bar", Namespace: "baz", KubeletConfigKey: "kubelet", }}, apierr: "spec.configSource.configMap.name: Invalid value", }, - {desc: "Node.Spec.ConfigSource.ConfigMap has invalid kubeletConfigKey", + { + desc: "Node.Spec.ConfigSource.ConfigMap has invalid kubeletConfigKey", configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", Name: "bar", Namespace: "baz", KubeletConfigKey: "../qux", }}, apierr: "spec.configSource.configMap.kubeletConfigKey: Invalid value", }, - { - // TODO(mtaufen): remove in #63221 - desc: "Node.Spec.ConfigSource.ConfigMap.UID does not align with Namespace/Name", - configSource: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: "foo", - Namespace: correctConfigMap.Namespace, - Name: correctConfigMap.Name, - KubeletConfigKey: "kubelet", - }}, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Error: fmt.Sprintf(status.SyncErrorFmt, fmt.Sprintf(status.UIDMismatchErrorFmt, "foo", configMapAPIPath(correctConfigMap), correctConfigMap.UID)), - }, - // skip assigned and active, because we don't know what the prior source will be - SkipAssigned: true, - SkipActive: true, - }, - expectConfig: nil, - event: false, - }, { desc: "correct", configSource: correctSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: correctSource, - Assigned: correctSource, - }, - }, + configMap: correctConfigMap, expectConfig: correctKC, event: true, }, { desc: "fail-parse", configSource: failParseSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Assigned: failParseSource, - Error: status.LoadError, - }, - SkipActive: true, + configMap: failParseConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.LoadError, + lkgActive: true, }, - expectConfig: nil, + expectConfig: localKC, event: true, }, { desc: "fail-validate", configSource: failValidateSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Assigned: failValidateSource, - Error: status.ValidateError, - }, - SkipActive: true, + configMap: failValidateConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.ValidateError, + lkgActive: true, }, - expectConfig: nil, + expectConfig: localKC, event: true, }, } - L := len(states) - for i := 1; i <= L; i++ { // need one less iteration than the number of states - testBothDirections(f, &states[i-1 : i][0], states[i:L], 0) + L := len(cases) + for i := 1; i <= L; i++ { // need one less iteration than the number of cases + testBothDirections(f, setConfigSourceFunc, &cases[i-1 : i][0], cases[i:L], 0) } }) }) - Context("When a remote config becomes the new last-known-good, and then the Kubelet is updated to use a new, bad config", func() { - It("the Kubelet should report a status and configz indicating that it rolled back to the new last-known-good", func() { + Context("update Node.Spec.ConfigSource: recover to last-known-good ConfigMap", func() { + It(itDescription, func() { var err error - // we base the "lkg" configmap off of the current configuration - lkgKC := originalKC.DeepCopy() + // we base the "lkg" configmap off of the configuration from before the test + lkgKC := beforeKC.DeepCopy() lkgConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-intended-lkg", lkgKC) lkgConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(lkgConfigMap) framework.ExpectNoError(err) @@ -336,43 +316,39 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube framework.ExpectNoError(err) lkgSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: lkgConfigMap.UID, Namespace: lkgConfigMap.Namespace, Name: lkgConfigMap.Name, KubeletConfigKey: "kubelet", }} + lkgStatus := lkgSource.DeepCopy() + lkgStatus.ConfigMap.UID = lkgConfigMap.UID + lkgStatus.ConfigMap.ResourceVersion = lkgConfigMap.ResourceVersion + badSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: badConfigMap.UID, Namespace: badConfigMap.Namespace, Name: badConfigMap.Name, KubeletConfigKey: "kubelet", }} - states := []configState{ - // intended lkg - {desc: "intended last-known-good", + cases := []nodeConfigTestCase{ + { + desc: "intended last-known-good", configSource: lkgSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: lkgSource, - Assigned: lkgSource, - }, - SkipLkg: true, + configMap: lkgConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, }, expectConfig: lkgKC, event: true, }, - - // bad config - {desc: "bad config", + { + desc: "bad config", configSource: badSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: lkgSource, - Assigned: badSource, - LastKnownGood: lkgSource, - Error: status.LoadError, - }, + configMap: badConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, + err: status.LoadError, + lkgActive: true, }, expectConfig: lkgKC, event: true, @@ -380,59 +356,53 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube } // wait 12 minutes after setting the first config to ensure it has time to pass the trial duration - testBothDirections(f, &states[0], states[1:], 12*time.Minute) + testBothDirections(f, setConfigSourceFunc, &cases[0], cases[1:], 12*time.Minute) }) }) - Context("When a remote config becomes the new last-known-good, and then Node.ConfigSource.ConfigMap.KubeletConfigKey is updated to use a new, bad config", func() { - It("the Kubelet should report a status and configz indicating that it rolled back to the new last-known-good", func() { + Context("update Node.Spec.ConfigSource: recover to last-known-good ConfigMap.KubeletConfigKey", func() { + It(itDescription, func() { const badConfigKey = "bad" var err error - // we base the "lkg" configmap off of the current configuration - lkgKC := originalKC.DeepCopy() + // we base the "lkg" configmap off of the configuration from before the test + lkgKC := beforeKC.DeepCopy() combinedConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-combined", lkgKC) combinedConfigMap.Data[badConfigKey] = "{0xdeadbeef}" combinedConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(combinedConfigMap) framework.ExpectNoError(err) lkgSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: combinedConfigMap.UID, Namespace: combinedConfigMap.Namespace, Name: combinedConfigMap.Name, KubeletConfigKey: "kubelet", }} - badSource := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: combinedConfigMap.UID, - Namespace: combinedConfigMap.Namespace, - Name: combinedConfigMap.Name, - KubeletConfigKey: badConfigKey, - }} - states := []configState{ - // intended lkg - {desc: "intended last-known-good", + lkgStatus := lkgSource.DeepCopy() + lkgStatus.ConfigMap.UID = combinedConfigMap.UID + lkgStatus.ConfigMap.ResourceVersion = combinedConfigMap.ResourceVersion + + badSource := lkgSource.DeepCopy() + badSource.ConfigMap.KubeletConfigKey = badConfigKey + + cases := []nodeConfigTestCase{ + { + desc: "intended last-known-good", configSource: lkgSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: lkgSource, - Assigned: lkgSource, - }, - SkipLkg: true, + configMap: combinedConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, }, expectConfig: lkgKC, event: true, }, - - // bad config - {desc: "bad config", + { + desc: "bad config", configSource: badSource, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: lkgSource, - Assigned: badSource, - LastKnownGood: lkgSource, - Error: status.LoadError, - }, + configMap: combinedConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, + err: status.LoadError, + lkgActive: true, }, expectConfig: lkgKC, event: true, @@ -440,18 +410,17 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube } // wait 12 minutes after setting the first config to ensure it has time to pass the trial duration - testBothDirections(f, &states[0], states[1:], 12*time.Minute) + testBothDirections(f, setConfigSourceFunc, &cases[0], cases[1:], 12*time.Minute) }) }) - // This stress test will help turn up resource leaks across kubelet restarts that can, over time, - // break our ability to dynamically update kubelet config - Context("When changing the configuration 100 times", func() { - It("the Kubelet should report the appropriate status and configz", func() { + // exposes resource leaks across config changes + Context("update Node.Spec.ConfigSource: 100 update stress test", func() { + It(itDescription, func() { var err error // we just create two configmaps with the same config but different names and toggle between them - kc1 := originalKC.DeepCopy() + kc1 := beforeKC.DeepCopy() cm1 := newKubeletConfigMap("dynamic-kubelet-config-test-cm1", kc1) cm1, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(cm1) framework.ExpectNoError(err) @@ -464,113 +433,451 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube framework.ExpectNoError(err) cm1Source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: cm1.UID, Namespace: cm1.Namespace, Name: cm1.Name, KubeletConfigKey: "kubelet", }} + cm2Source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{ - UID: cm2.UID, Namespace: cm2.Namespace, Name: cm2.Name, KubeletConfigKey: "kubelet", }} - states := []configState{ - {desc: "cm1", + cases := []nodeConfigTestCase{ + { + desc: "cm1", configSource: cm1Source, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: cm1Source, - Assigned: cm1Source, - }, - SkipLkg: true, - }, + configMap: cm1, expectConfig: kc1, event: true, }, - - {desc: "cm2", + { + desc: "cm2", configSource: cm2Source, - expectConfigStatus: &configStateStatus{ - NodeConfigStatus: apiv1.NodeConfigStatus{ - Active: cm2Source, - Assigned: cm2Source, - }, - SkipLkg: true, - }, + configMap: cm2, expectConfig: kc2, event: true, }, } for i := 0; i < 50; i++ { // change the config 101 times (changes 3 times in the first iteration, 2 times in each subsequent iteration) - testBothDirections(f, &states[0], states[1:], 0) + testBothDirections(f, setConfigSourceFunc, &cases[0], cases[1:], 0) } }) }) + + // Please note: This behavior is tested to ensure implementation correctness. We do not, however, recommend ConfigMap mutations + // as a usage pattern for dynamic Kubelet config in large clusters. It is much safer to create a new ConfigMap, and incrementally + // roll out a new Node.Spec.ConfigSource that references the new ConfigMap. In-place ConfigMap updates, including deletion + // followed by re-creation, will cause all observing Kubelets to immediately restart for new config, because these operations + // change the ResourceVersion of the ConfigMap. + Context("update ConfigMap in-place: state transitions", func() { + It(itDescription, func() { + var err error + // we base the "correct" configmap off of the configuration from before the test + correctKC := beforeKC.DeepCopy() + correctConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-in-place", correctKC) + correctConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(correctConfigMap) + framework.ExpectNoError(err) + + // we reuse the same name, namespace + failParseConfigMap := correctConfigMap.DeepCopy() + failParseConfigMap.Data = map[string]string{ + "kubelet": "{0xdeadbeef}", + } + + // fail to validate, we make a copy and set an invalid KubeAPIQPS on kc before serializing + invalidKC := correctKC.DeepCopy() + invalidKC.KubeAPIQPS = -1 + failValidateConfigMap := correctConfigMap.DeepCopy() + failValidateConfigMap.Data = newKubeletConfigMap("", invalidKC).Data + + // ensure node config source is set to the config map we will mutate in-place, + // since updateConfigMapFunc doesn't mutate Node.Spec.ConfigSource + source := &apiv1.NodeConfigSource{ + ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Namespace: correctConfigMap.Namespace, + Name: correctConfigMap.Name, + KubeletConfigKey: "kubelet", + }, + } + (&nodeConfigTestCase{ + desc: "initial state (correct)", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + }).run(f, setConfigSourceFunc, false, 0) + + cases := []nodeConfigTestCase{ + { + desc: "correct", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + event: true, + }, + { + desc: "fail-parse", + configSource: source, + configMap: failParseConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.LoadError, + lkgActive: true, + }, + expectConfig: localKC, + event: true, + }, + { + desc: "fail-validate", + configSource: source, + configMap: failValidateConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.ValidateError, + lkgActive: true, + }, + expectConfig: localKC, + event: true, + }, + } + L := len(cases) + for i := 1; i <= L; i++ { // need one less iteration than the number of cases + testBothDirections(f, updateConfigMapFunc, &cases[i-1 : i][0], cases[i:L], 0) + } + }) + }) + + // Please note: This behavior is tested to ensure implementation correctness. We do not, however, recommend ConfigMap mutations + // as a usage pattern for dynamic Kubelet config in large clusters. It is much safer to create a new ConfigMap, and incrementally + // roll out a new Node.Spec.ConfigSource that references the new ConfigMap. In-place ConfigMap updates, including deletion + // followed by re-creation, will cause all observing Kubelets to immediately restart for new config, because these operations + // change the ResourceVersion of the ConfigMap. + Context("update ConfigMap in-place: recover to last-known-good version", func() { + It(itDescription, func() { + var err error + // we base the "lkg" configmap off of the configuration from before the test + lkgKC := beforeKC.DeepCopy() + lkgConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-in-place-lkg", lkgKC) + lkgConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(lkgConfigMap) + framework.ExpectNoError(err) + + // bad config map, we insert some bogus stuff into the configMap + badConfigMap := lkgConfigMap.DeepCopy() + badConfigMap.Data = map[string]string{ + "kubelet": "{0xdeadbeef}", + } + // ensure node config source is set to the config map we will mutate in-place + source := &apiv1.NodeConfigSource{ + ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Namespace: lkgConfigMap.Namespace, + Name: lkgConfigMap.Name, + KubeletConfigKey: "kubelet", + }, + } + + // Even though the first test case will PUT the lkgConfigMap again, no-op writes don't increment + // ResourceVersion, so the expected status we record here will still be correct. + lkgStatus := source.DeepCopy() + lkgStatus.ConfigMap.UID = lkgConfigMap.UID + lkgStatus.ConfigMap.ResourceVersion = lkgConfigMap.ResourceVersion + + (&nodeConfigTestCase{ + desc: "initial state (correct)", + configSource: source, + configMap: lkgConfigMap, + expectConfig: lkgKC, + }).run(f, setConfigSourceFunc, false, 0) // wait 0 here, and we should not expect LastKnownGood to have changed yet (hence nil) + + cases := []nodeConfigTestCase{ + { + desc: "intended last-known-good", + configSource: source, + configMap: lkgConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, + }, + expectConfig: lkgKC, + event: true, + }, + { + // NOTE(mtaufen): If you see a strange "expected assigned x but got assigned y" error on this case, + // it is possible that the Kubelet didn't start the informer that watches the currently assigned + // ConfigMap, or didn't get updates from that informer. Other tests don't always catch this because + // they quickly change config. The sync loop will always happen once, a bit after the Kubelet starts + // up, because other informers' initial "add" events can queue a sync. If you wait long enough before + // changing config (waiting for the config to become last-known-good, for example), the syncs queued by + // add events will have already been processed, and the lack of a running ConfigMap informer will result + // in a missed update, no config change, and the above error when we check the status. + desc: "bad config", + configSource: source, + configMap: badConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + lastKnownGood: lkgStatus, + err: status.LoadError, + lkgActive: true, + }, + expectConfig: lkgKC, + event: true, + }, + } + + // wait 12 minutes after setting the first config to ensure it has time to pass the trial duration + testBothDirections(f, updateConfigMapFunc, &cases[0], cases[1:], 12*time.Minute) + }) + }) + + // Please note: This behavior is tested to ensure implementation correctness. We do not, however, recommend ConfigMap mutations + // as a usage pattern for dynamic Kubelet config in large clusters. It is much safer to create a new ConfigMap, and incrementally + // roll out a new Node.Spec.ConfigSource that references the new ConfigMap. In-place ConfigMap updates, including deletion + // followed by re-creation, will cause all observing Kubelets to immediately restart for new config, because these operations + // change the ResourceVersion of the ConfigMap. + Context("delete and recreate ConfigMap: state transitions", func() { + It(itDescription, func() { + var err error + // we base the "correct" configmap off of the configuration from before the test + correctKC := beforeKC.DeepCopy() + correctConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-delete-createe", correctKC) + correctConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(correctConfigMap) + framework.ExpectNoError(err) + + // we reuse the same name, namespace + failParseConfigMap := correctConfigMap.DeepCopy() + failParseConfigMap.Data = map[string]string{ + "kubelet": "{0xdeadbeef}", + } + + // fail to validate, we make a copy and set an invalid KubeAPIQPS on kc before serializing + invalidKC := correctKC.DeepCopy() + invalidKC.KubeAPIQPS = -1 + failValidateConfigMap := correctConfigMap.DeepCopy() + failValidateConfigMap.Data = newKubeletConfigMap("", invalidKC).Data + + // ensure node config source is set to the config map we will mutate in-place, + // since recreateConfigMapFunc doesn't mutate Node.Spec.ConfigSource + source := &apiv1.NodeConfigSource{ + ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Namespace: correctConfigMap.Namespace, + Name: correctConfigMap.Name, + KubeletConfigKey: "kubelet", + }, + } + (&nodeConfigTestCase{ + desc: "initial state (correct)", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + }).run(f, setConfigSourceFunc, false, 0) + + cases := []nodeConfigTestCase{ + { + desc: "correct", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + event: true, + }, + { + desc: "fail-parse", + configSource: source, + configMap: failParseConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.LoadError, + lkgActive: true, + }, + expectConfig: localKC, + event: true, + }, + { + desc: "fail-validate", + configSource: source, + configMap: failValidateConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: status.ValidateError, + lkgActive: true, + }, + expectConfig: localKC, + event: true, + }, + } + L := len(cases) + for i := 1; i <= L; i++ { // need one less iteration than the number of cases + testBothDirections(f, recreateConfigMapFunc, &cases[i-1 : i][0], cases[i:L], 0) + } + }) + }) + + // Please note: This behavior is tested to ensure implementation correctness. We do not, however, recommend ConfigMap mutations + // as a usage pattern for dynamic Kubelet config in large clusters. It is much safer to create a new ConfigMap, and incrementally + // roll out a new Node.Spec.ConfigSource that references the new ConfigMap. In-place ConfigMap updates, including deletion + // followed by re-creation, will cause all observing Kubelets to immediately restart for new config, because these operations + // change the ResourceVersion of the ConfigMap. + Context("delete and recreate ConfigMap: error while ConfigMap is absent", func() { + It(itDescription, func() { + var err error + // we base the "correct" configmap off of the configuration from before the test + correctKC := beforeKC.DeepCopy() + correctConfigMap := newKubeletConfigMap("dynamic-kubelet-config-test-delete-createe", correctKC) + correctConfigMap, err = f.ClientSet.CoreV1().ConfigMaps("kube-system").Create(correctConfigMap) + framework.ExpectNoError(err) + + // ensure node config source is set to the config map we will mutate in-place, + // since our mutation functions don't mutate Node.Spec.ConfigSource + source := &apiv1.NodeConfigSource{ + ConfigMap: &apiv1.ConfigMapNodeConfigSource{ + Namespace: correctConfigMap.Namespace, + Name: correctConfigMap.Name, + KubeletConfigKey: "kubelet", + }, + } + (&nodeConfigTestCase{ + desc: "correct", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + }).run(f, setConfigSourceFunc, false, 0) + + // delete the ConfigMap, and ensure an error is reported by the Kubelet while the ConfigMap is absent + (&nodeConfigTestCase{ + desc: "correct", + configSource: source, + configMap: correctConfigMap, + expectConfigStatus: expectNodeConfigStatus{ + err: fmt.Sprintf(status.SyncErrorFmt, status.DownloadError), + }, + expectConfig: correctKC, + }).run(f, deleteConfigMapFunc, false, 0) + + // re-create the ConfigMap, and ensure the error disappears + (&nodeConfigTestCase{ + desc: "correct", + configSource: source, + configMap: correctConfigMap, + expectConfig: correctKC, + }).run(f, createConfigMapFunc, false, 0) + }) + }) }) }) -// testBothDirections tests the state change represented by each edge, where each state is a vertex, -// and there are edges in each direction between first and each of the states. -func testBothDirections(f *framework.Framework, first *configState, states []configState, waitAfterFirst time.Duration) { +// testBothDirections tests the state change represented by each edge, where each case is a vertex, +// and there are edges in each direction between first and each of the cases. +func testBothDirections(f *framework.Framework, fn func(f *framework.Framework, tc *nodeConfigTestCase) error, + first *nodeConfigTestCase, cases []nodeConfigTestCase, waitAfterFirst time.Duration) { // set to first and check that everything got set up properly - By(fmt.Sprintf("setting configSource to state %q", first.desc)) + By(fmt.Sprintf("setting initial state %q", first.desc)) // we don't always expect an event here, because setting "first" might not represent // a change from the current configuration - setAndTestKubeletConfigState(f, first, false) + first.run(f, fn, false, waitAfterFirst) - time.Sleep(waitAfterFirst) + // for each case, set up, check expectations, then reset to first and check again + for i := range cases { + tc := &cases[i] + By(fmt.Sprintf("from %q to %q", first.desc, tc.desc)) + // from first -> tc, tc.event fully describes whether we should get a config change event + tc.run(f, fn, tc.event, 0) - // for each state, set to that state, check expectations, then reset to first and check again - for i := range states { - By(fmt.Sprintf("from %q to %q", first.desc, states[i].desc)) - // from first -> states[i], states[i].event fully describes whether we should get a config change event - setAndTestKubeletConfigState(f, &states[i], states[i].event) - - By(fmt.Sprintf("back to %q from %q", first.desc, states[i].desc)) - // whether first -> states[i] should have produced a config change event partially determines whether states[i] -> first should produce an event - setAndTestKubeletConfigState(f, first, first.event && states[i].event) + By(fmt.Sprintf("back to %q from %q", first.desc, tc.desc)) + // whether first -> tc should have produced a config change event partially determines whether tc -> first should produce an event + first.run(f, fn, first.event && tc.event, 0) } } -// setAndTestKubeletConfigState tests that after setting the config source, the node spec, status, configz, and latest event match +// run tests that, after performing fn, the node spec, status, configz, and latest event match // the expectations described by state. -func setAndTestKubeletConfigState(f *framework.Framework, state *configState, expectEvent bool) { +func (tc *nodeConfigTestCase) run(f *framework.Framework, fn func(f *framework.Framework, tc *nodeConfigTestCase) error, + expectEvent bool, wait time.Duration) { // set the desired state, retry a few times in case we are competing with other editors Eventually(func() error { - if err := setNodeConfigSource(f, state.configSource); err != nil { - if len(state.apierr) == 0 { - return fmt.Errorf("case %s: expect nil error but got %q", state.desc, err.Error()) - } else if !strings.Contains(err.Error(), state.apierr) { - return fmt.Errorf("case %s: expect error to contain %q but got %q", state.desc, state.apierr, err.Error()) + if err := fn(f, tc); err != nil { + if len(tc.apierr) == 0 { + return fmt.Errorf("case %s: expect nil error but got %q", tc.desc, err.Error()) + } else if !strings.Contains(err.Error(), tc.apierr) { + return fmt.Errorf("case %s: expect error to contain %q but got %q", tc.desc, tc.apierr, err.Error()) } - } else if len(state.apierr) > 0 { - return fmt.Errorf("case %s: expect error to contain %q but got nil error", state.desc, state.apierr) + } else if len(tc.apierr) > 0 { + return fmt.Errorf("case %s: expect error to contain %q but got nil error", tc.desc, tc.apierr) } return nil }, time.Minute, time.Second).Should(BeNil()) // skip further checks if we expected an API error - if len(state.apierr) > 0 { + if len(tc.apierr) > 0 { return } - // check that config source actually got set to what we expect - checkNodeConfigSource(f, state.desc, state.configSource) + // wait for the designated duration before checking the reconciliation + time.Sleep(wait) + // check config source + tc.checkNodeConfigSource(f) // check status - checkConfigStatus(f, state.desc, state.expectConfigStatus) + tc.checkConfigStatus(f) // check expectConfig - if state.expectConfig != nil { - checkConfig(f, state.desc, state.expectConfig) + if tc.expectConfig != nil { + tc.checkConfig(f) } // check that an event was sent for the config change if expectEvent { - checkEvent(f, state.desc, state.configSource) + tc.checkEvent(f) } } +// setConfigSourceFunc sets Node.Spec.ConfigSource to tc.configSource +func setConfigSourceFunc(f *framework.Framework, tc *nodeConfigTestCase) error { + return setNodeConfigSource(f, tc.configSource) +} + +// updateConfigMapFunc updates the ConfigMap described by tc.configMap to contain matching data. +// It also updates the resourceVersion in any non-nil NodeConfigSource.ConfigMap in the expected +// status to match the resourceVersion of the updated ConfigMap. +func updateConfigMapFunc(f *framework.Framework, tc *nodeConfigTestCase) error { + // Clear ResourceVersion from the ConfigMap objects we use to initiate mutations + // so that we don't get 409 (conflict) responses. ConfigMaps always allow updates + // (with respect to concurrency control) when you omit ResourceVersion. + // We know that we won't perform concurrent updates during this test. + tc.configMap.ResourceVersion = "" + cm, err := f.ClientSet.CoreV1().ConfigMaps(tc.configMap.Namespace).Update(tc.configMap) + if err != nil { + return err + } + // update tc.configMap's ResourceVersion to match the updated ConfigMap, this makes + // sure our derived status checks have up-to-date information + tc.configMap.ResourceVersion = cm.ResourceVersion + return nil +} + +// recreateConfigMapFunc deletes and recreates the ConfigMap described by tc.configMap. +// The new ConfigMap will match tc.configMap. +func recreateConfigMapFunc(f *framework.Framework, tc *nodeConfigTestCase) error { + // need to ignore NotFound error, since there could be cases where delete + // fails during a retry because the delete in a previous attempt succeeded, + // before some other error occurred. + err := deleteConfigMapFunc(f, tc) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + return createConfigMapFunc(f, tc) +} + +// deleteConfigMapFunc simply deletes tc.configMap +func deleteConfigMapFunc(f *framework.Framework, tc *nodeConfigTestCase) error { + return f.ClientSet.CoreV1().ConfigMaps(tc.configMap.Namespace).Delete(tc.configMap.Name, &metav1.DeleteOptions{}) +} + +// createConfigMapFunc creates tc.configMap and updates the UID and ResourceVersion on tc.configMap +// to match the created configMap +func createConfigMapFunc(f *framework.Framework, tc *nodeConfigTestCase) error { + tc.configMap.ResourceVersion = "" + cm, err := f.ClientSet.CoreV1().ConfigMaps(tc.configMap.Namespace).Create(tc.configMap) + if err != nil { + return err + } + // update tc.configMap's UID and ResourceVersion to match the new ConfigMap, this makes + // sure our derived status checks have up-to-date information + tc.configMap.UID = cm.UID + tc.configMap.ResourceVersion = cm.ResourceVersion + return nil +} + // make sure the node's config source matches what we expect, after setting it -func checkNodeConfigSource(f *framework.Framework, desc string, expect *apiv1.NodeConfigSource) { +func (tc *nodeConfigTestCase) checkNodeConfigSource(f *framework.Framework) { const ( timeout = time.Minute interval = time.Second @@ -578,62 +885,74 @@ func checkNodeConfigSource(f *framework.Framework, desc string, expect *apiv1.No Eventually(func() error { node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("checkNodeConfigSource: case %s: %v", desc, err) + return fmt.Errorf("checkNodeConfigSource: case %s: %v", tc.desc, err) } actual := node.Spec.ConfigSource - if !reflect.DeepEqual(expect, actual) { - return fmt.Errorf(spew.Sprintf("checkNodeConfigSource: case %s: expected %#v but got %#v", desc, expect, actual)) + if !apiequality.Semantic.DeepEqual(tc.configSource, actual) { + return fmt.Errorf(spew.Sprintf("checkNodeConfigSource: case %s: expected %#v but got %#v", tc.desc, tc.configSource, actual)) } return nil }, timeout, interval).Should(BeNil()) } // make sure the node status eventually matches what we expect -func checkConfigStatus(f *framework.Framework, desc string, expect *configStateStatus) { +func (tc *nodeConfigTestCase) checkConfigStatus(f *framework.Framework) { const ( timeout = time.Minute interval = time.Second ) + errFmt := fmt.Sprintf("checkConfigStatus: case %s:", tc.desc) + " %v" Eventually(func() error { node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("checkConfigStatus: case %s: %v", desc, err) + return fmt.Errorf(errFmt, err) } - if err := expectConfigStatus(expect, node.Status.Config); err != nil { - return fmt.Errorf("checkConfigStatus: case %s: %v", desc, err) + if err := expectConfigStatus(tc, node.Status.Config); err != nil { + return fmt.Errorf(errFmt, err) } return nil }, timeout, interval).Should(BeNil()) } -func expectConfigStatus(expect *configStateStatus, actual *apiv1.NodeConfigStatus) error { - if expect == nil { - return fmt.Errorf("expectConfigStatus requires expect to be non-nil (possible malformed test case)") - } +func expectConfigStatus(tc *nodeConfigTestCase, actual *apiv1.NodeConfigStatus) error { + var errs []string if actual == nil { return fmt.Errorf("expectConfigStatus requires actual to be non-nil (possible Kubelet failed to update status)") } - var errs []string - if !expect.SkipActive && !apiequality.Semantic.DeepEqual(expect.Active, actual.Active) { - errs = append(errs, fmt.Sprintf("expected Active %#v but got %#v", expect.Active, actual.Active)) + // check Assigned matches tc.configSource, with UID and ResourceVersion from tc.configMap + expectAssigned := tc.configSource.DeepCopy() + if expectAssigned != nil && expectAssigned.ConfigMap != nil { + expectAssigned.ConfigMap.UID = tc.configMap.UID + expectAssigned.ConfigMap.ResourceVersion = tc.configMap.ResourceVersion } - if !expect.SkipAssigned && !apiequality.Semantic.DeepEqual(expect.Assigned, actual.Assigned) { - errs = append(errs, fmt.Sprintf("expected Assigned %#v but got %#v", expect.Assigned, actual.Assigned)) + if !apiequality.Semantic.DeepEqual(expectAssigned, actual.Assigned) { + errs = append(errs, spew.Sprintf("expected Assigned %#v but got %#v", expectAssigned, actual.Assigned)) } - if !expect.SkipLkg && !apiequality.Semantic.DeepEqual(expect.LastKnownGood, actual.LastKnownGood) { - errs = append(errs, fmt.Sprintf("expected LastKnownGood %#v but got %#v", expect.LastKnownGood, actual.LastKnownGood)) + // check LastKnownGood matches tc.expectConfigStatus.lastKnownGood + if !tc.expectConfigStatus.skipLkg && !apiequality.Semantic.DeepEqual(tc.expectConfigStatus.lastKnownGood, actual.LastKnownGood) { + errs = append(errs, spew.Sprintf("expected LastKnownGood %#v but got %#v", tc.expectConfigStatus.lastKnownGood, actual.LastKnownGood)) } - if expect.Error != actual.Error { - errs = append(errs, fmt.Sprintf("expected Error %q but got %q", expect.Error, actual.Error)) + // check Active matches Assigned or LastKnownGood, depending on tc.expectConfigStatus.lkgActive + expectActive := expectAssigned + if tc.expectConfigStatus.lkgActive { + expectActive = tc.expectConfigStatus.lastKnownGood } + if !apiequality.Semantic.DeepEqual(expectActive, actual.Active) { + errs = append(errs, spew.Sprintf("expected Active %#v but got %#v", expectActive, actual.Active)) + } + // check Error + if tc.expectConfigStatus.err != actual.Error { + errs = append(errs, fmt.Sprintf("expected Error %q but got %q", tc.expectConfigStatus.err, actual.Error)) + } + // format error list if len(errs) > 0 { - return fmt.Errorf("%s", strings.Join(errs, ",")) + return fmt.Errorf("%s", strings.Join(errs, ", ")) } return nil } // make sure config exposed on configz matches what we expect -func checkConfig(f *framework.Framework, desc string, expect *kubeletconfig.KubeletConfiguration) { +func (tc *nodeConfigTestCase) checkConfig(f *framework.Framework) { const ( timeout = time.Minute interval = time.Second @@ -641,10 +960,10 @@ func checkConfig(f *framework.Framework, desc string, expect *kubeletconfig.Kube Eventually(func() error { actual, err := getCurrentKubeletConfig() if err != nil { - return fmt.Errorf("checkConfig: case %s: %v", desc, err) + return fmt.Errorf("checkConfig: case %s: %v", tc.desc, err) } - if !reflect.DeepEqual(expect, actual) { - return fmt.Errorf(spew.Sprintf("checkConfig: case %s: expected %#v but got %#v", desc, expect, actual)) + if !apiequality.Semantic.DeepEqual(tc.expectConfig, actual) { + return fmt.Errorf(spew.Sprintf("checkConfig: case %s: expected %#v but got %#v", tc.desc, tc.expectConfig, actual)) } return nil }, timeout, interval).Should(BeNil()) @@ -652,7 +971,7 @@ func checkConfig(f *framework.Framework, desc string, expect *kubeletconfig.Kube // checkEvent makes sure an event was sent marking the Kubelet's restart to use new config, // and that it mentions the config we expect. -func checkEvent(f *framework.Framework, desc string, expect *apiv1.NodeConfigSource) { +func (tc *nodeConfigTestCase) checkEvent(f *framework.Framework) { const ( timeout = time.Minute interval = time.Second @@ -660,7 +979,7 @@ func checkEvent(f *framework.Framework, desc string, expect *apiv1.NodeConfigSou Eventually(func() error { events, err := f.ClientSet.CoreV1().Events("").List(metav1.ListOptions{}) if err != nil { - return fmt.Errorf("checkEvent: case %s: %v", desc, err) + return fmt.Errorf("checkEvent: case %s: %v", tc.desc, err) } // find config changed event with most recent timestamp var recent *apiv1.Event @@ -676,23 +995,23 @@ func checkEvent(f *framework.Framework, desc string, expect *apiv1.NodeConfigSou } } } - // we expect at least one config change event if recent == nil { - return fmt.Errorf("checkEvent: case %s: no events found with reason %s", desc, controller.KubeletConfigChangedEventReason) + return fmt.Errorf("checkEvent: case %s: no events found with reason %s", tc.desc, controller.KubeletConfigChangedEventReason) } - - // ensure the message is what we expect (including the resource path) - expectMessage := fmt.Sprintf(controller.EventMessageFmt, controller.LocalConfigMessage) - if expect != nil { - if expect.ConfigMap != nil { - expectMessage = fmt.Sprintf(controller.EventMessageFmt, fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", expect.ConfigMap.Namespace, expect.ConfigMap.Name)) + // construct expected message, based on the test case + expectMessage := controller.LocalEventMessage + if tc.configSource != nil { + if tc.configSource.ConfigMap != nil { + expectMessage = fmt.Sprintf(controller.RemoteEventMessageFmt, + fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", tc.configSource.ConfigMap.Namespace, tc.configSource.ConfigMap.Name), + tc.configMap.UID, tc.configMap.ResourceVersion, tc.configSource.ConfigMap.KubeletConfigKey) } } + // compare messages if expectMessage != recent.Message { - return fmt.Errorf("checkEvent: case %s: expected event message %q but got %q", desc, expectMessage, recent.Message) + return fmt.Errorf("checkEvent: case %s: expected event message %q but got %q", tc.desc, expectMessage, recent.Message) } - return nil }, timeout, interval).Should(BeNil()) } diff --git a/test/integration/auth/node_test.go b/test/integration/auth/node_test.go index a8d58adef1..eb6526136a 100644 --- a/test/integration/auth/node_test.go +++ b/test/integration/auth/node_test.go @@ -287,11 +287,8 @@ func TestNodeAuthorizer(t *testing.T) { } node2.Spec.ConfigSource = &api.NodeConfigSource{ ConfigMap: &api.ConfigMapNodeConfigSource{ - Namespace: "ns", - Name: "myconfigmapconfigsource", - // validation just requires UID to be non-empty and it isn't necessary for GET, - // so we just use a bogus one for the test - UID: "uid", + Namespace: "ns", + Name: "myconfigmapconfigsource", KubeletConfigKey: "kubelet", }, }