From d5d7d6d6845cdfcb6b0a631a3e503332e799d65a Mon Sep 17 00:00:00 2001 From: Michael Taufen Date: Tue, 12 Dec 2017 10:05:03 -0800 Subject: [PATCH] Send an event just before the Kubelet restarts to use a new config --- cmd/kubelet/app/server.go | 2 +- pkg/kubelet/kubeletconfig/BUILD | 3 + .../kubeletconfig/checkpoint/download.go | 9 ++ .../kubeletconfig/checkpoint/download_test.go | 15 ++ pkg/kubelet/kubeletconfig/configsync.go | 86 ++++++++-- pkg/kubelet/kubeletconfig/controller.go | 6 +- test/e2e_node/BUILD | 1 + test/e2e_node/dynamic_kubelet_config_test.go | 151 ++++++++++++++---- 8 files changed, 224 insertions(+), 49 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 8100b44afa..1d6901b544 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -401,7 +401,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { // Alpha Dynamic Configuration Implementation; // if the kubelet config controller is available, inject the latest to start the config and status sync loops if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce { - kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, string(nodeName)) + kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)) } if kubeDeps.Auth == nil { diff --git a/pkg/kubelet/kubeletconfig/BUILD b/pkg/kubelet/kubeletconfig/BUILD index 71bbe9fb14..ba381a6fed 100644 --- a/pkg/kubelet/kubeletconfig/BUILD +++ b/pkg/kubelet/kubeletconfig/BUILD @@ -25,13 +25,16 @@ go_library( "//pkg/kubelet/kubeletconfig/util/log:go_default_library", "//pkg/kubelet/kubeletconfig/util/panic:go_default_library", "//pkg/util/filesystem:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download.go b/pkg/kubelet/kubeletconfig/checkpoint/download.go index 297d974a1a..9778f318b7 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download.go @@ -34,6 +34,8 @@ import ( type RemoteConfigSource interface { // UID returns the UID of the remote config source object UID() string + // APIPath returns the API path to the remote resource, e.g. its SelfLink + APIPath() string // Download downloads the remote config source object returns a Checkpoint backed by the object, // or a sanitized failure reason and error if the download fails Download(client clientset.Interface) (Checkpoint, string, error) @@ -110,6 +112,13 @@ func (r *remoteConfigMap) UID() string { return string(r.source.ConfigMapRef.UID) } +const configMapAPIPathFmt = "/api/v1/namespaces/%s/configmaps/%s" + +func (r *remoteConfigMap) APIPath() string { + ref := r.source.ConfigMapRef + return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name) +} + func (r *remoteConfigMap) Download(client clientset.Interface) (Checkpoint, string, error) { var reason string uid := string(r.source.ConfigMapRef.UID) diff --git a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go index a902b6384e..ccca6c3161 100644 --- a/pkg/kubelet/kubeletconfig/checkpoint/download_test.go +++ b/pkg/kubelet/kubeletconfig/checkpoint/download_test.go @@ -17,6 +17,7 @@ limitations under the License. package checkpoint import ( + "fmt" "testing" "github.com/davecgh/go-spew/spew" @@ -92,6 +93,20 @@ func TestRemoteConfigMapUID(t *testing.T) { } } +func TestRemoteConfigMapAPIPath(t *testing.T) { + name := "name" + namespace := "namespace" + cpt := &remoteConfigMap{ + &apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{Name: name, Namespace: namespace, UID: ""}}, + } + expect := fmt.Sprintf(configMapAPIPathFmt, cpt.source.ConfigMapRef.Namespace, cpt.source.ConfigMapRef.Name) + // APIPath() method should return the correct path to the referenced resource + path := cpt.APIPath() + if expect != path { + t.Errorf("expect APIPath() to return %q, but got %q", expect, namespace) + } +} + func TestRemoteConfigMapDownload(t *testing.T) { _, kubeletCodecs, err := kubeletscheme.NewSchemeAndCodecs() if err != nil { diff --git a/pkg/kubelet/kubeletconfig/configsync.go b/pkg/kubelet/kubeletconfig/configsync.go index ac7e40656f..66f9eb9160 100644 --- a/pkg/kubelet/kubeletconfig/configsync.go +++ b/pkg/kubelet/kubeletconfig/configsync.go @@ -19,15 +19,30 @@ package kubeletconfig import ( "fmt" "os" + "time" + + "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status" utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log" ) +const ( + // KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration + KubeletConfigChangedEventReason = "KubeletConfigChanged" + // EventMessageFmt is the message format for Kubelet config change events + EventMessageFmt = "Kubelet will restart to use: %s" + // LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults) + LocalConfigMessage = "local config" +) + // pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done func (cc *Controller) pokeConfigSourceWorker() { select { @@ -37,7 +52,7 @@ func (cc *Controller) pokeConfigSourceWorker() { } // syncConfigSource checks if work needs to be done to use a new configuration, and does that work if necessary -func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName string) { +func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) { select { case <-cc.pendingConfigSource: default: @@ -62,13 +77,22 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri } // check the Node and download any new config - if updated, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil { + if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil { cc.configOK.SetFailSyncCondition(reason) syncerr = fmt.Errorf("%s, error: %v", reason, err) return } else if updated { - // TODO(mtaufen): Consider adding a "currently restarting kubelet" ConfigOK message for this case - utillog.Infof("config updated, Kubelet will restart to begin using new config") + path := LocalConfigMessage + if cur != nil { + path = cur.APIPath() + } + // we directly log and send the event, instead of using the event recorder, + // because the event recorder won't flush its queue before we exit (we'd lose the event) + event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path) + glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message) + if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil { + utillog.Errorf("failed to send event, error: %v", err) + } os.Exit(0) } @@ -81,31 +105,31 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri // doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config, // depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result -func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, string, error) { +func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) { if source == nil { utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults") updated, reason, err := cc.resetConfig() if err != nil { - return false, reason, err + return false, nil, reason, err } - return updated, "", nil + return updated, nil, "", nil } // if the NodeConfigSource is non-nil, download the config utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary") remote, reason, err := checkpoint.NewRemoteConfigSource(source) if err != nil { - return false, reason, err + return false, nil, reason, err } reason, err = cc.checkpointConfigSource(client, remote) if err != nil { - return false, reason, err + return false, nil, reason, err } updated, reason, err := cc.setCurrentConfig(remote) if err != nil { - return false, reason, err + return false, nil, reason, err } - return updated, "", nil + return updated, remote, "", nil } // checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist, @@ -181,3 +205,43 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) { } return node, nil } + +// eventf constructs and returns an event containing a formatted message +// similar to k8s.io/client-go/tools/record/event.go +func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event { + return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...)) +} + +// makeEvent constructs an event +// similar to makeEvent in k8s.io/client-go/tools/record/event.go +func makeEvent(nodeName, eventtype, reason, message string) *apiv1.Event { + const componentKubelet = "kubelet" + // NOTE(mtaufen): This is consistent with pkg/kubelet/kubelet.go. Even though setting the node + // name as the UID looks strange, it appears to be conventional for events sent by the Kubelet. + ref := apiv1.ObjectReference{ + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeName), + Namespace: "", + } + + t := metav1.Time{Time: time.Now()} + namespace := ref.Namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + return &apiv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), + Namespace: namespace, + }, + InvolvedObject: ref, + Reason: reason, + Message: message, + FirstTimestamp: t, + LastTimestamp: t, + Count: 1, + Type: eventtype, + Source: apiv1.EventSource{Component: componentKubelet, Host: string(nodeName)}, + } +} diff --git a/pkg/kubelet/kubeletconfig/controller.go b/pkg/kubelet/kubeletconfig/controller.go index 6ff4cd6b28..0b493ba053 100644 --- a/pkg/kubelet/kubeletconfig/controller.go +++ b/pkg/kubelet/kubeletconfig/controller.go @@ -24,10 +24,10 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation" - "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status" @@ -201,7 +201,7 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) { // StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty. // It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops // if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged. -func (cc *Controller) StartSync(client clientset.Interface, nodeName string) { +func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) { if client == nil { utillog.Infof("nil client, will not start sync loops") return @@ -236,7 +236,7 @@ func (cc *Controller) StartSync(client clientset.Interface, nodeName string) { go utilpanic.HandlePanic(func() { utillog.Infof("starting config source sync loop") wait.JitterUntil(func() { - cc.syncConfigSource(client, nodeName) + cc.syncConfigSource(client, eventClient, nodeName) }, 10*time.Second, 0.2, true, wait.NeverStop) })() } else { diff --git a/test/e2e_node/BUILD b/test/e2e_node/BUILD index dfd6e95789..df312031ed 100644 --- a/test/e2e_node/BUILD +++ b/test/e2e_node/BUILD @@ -128,6 +128,7 @@ go_test( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/dockershim/libdocker:go_default_library", "//pkg/kubelet/images:go_default_library", + "//pkg/kubelet/kubeletconfig:go_default_library", "//pkg/kubelet/kubeletconfig/status:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/test/e2e_node/dynamic_kubelet_config_test.go b/test/e2e_node/dynamic_kubelet_config_test.go index dd83b3d1bc..a0e9da8085 100644 --- a/test/e2e_node/dynamic_kubelet_config_test.go +++ b/test/e2e_node/dynamic_kubelet_config_test.go @@ -26,6 +26,7 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + controller "k8s.io/kubernetes/pkg/kubelet/kubeletconfig" "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status" "k8s.io/kubernetes/test/e2e/framework" @@ -39,6 +40,11 @@ type configState struct { configSource *apiv1.NodeConfigSource expectConfigOK *apiv1.NodeCondition expectConfig *kubeletconfig.KubeletConfiguration + // whether the state would cause a config change event as a result of the update to Node.Spec.ConfigSource, + // assuming that the current source would have also caused a config change event. + // for example, some malformed references may result in a download failure, in which case the Kubelet + // does not restart to change config, while an invalid payload will be detected upon restart + event bool } // This test is marked [Disruptive] because the Kubelet restarts several times during this test. @@ -82,7 +88,8 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: fmt.Sprintf(status.CurRemoteMessageFmt, originalConfigMap.UID), Reason: status.CurRemoteOKReason}, - expectConfig: originalKC}) + expectConfig: originalKC, + }, false) }) Context("When setting new NodeConfigSources that cause transitions between ConfigOK conditions", func() { @@ -121,7 +128,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: status.CurDefaultMessage, Reason: status.CurDefaultOKReason}, - expectConfig: nil}, + expectConfig: nil, + event: true, + }, // Node.Spec.ConfigSource has all nil subfields {desc: "Node.Spec.ConfigSource has all nil subfields", @@ -129,7 +138,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: "", Reason: fmt.Sprintf(status.FailSyncReasonFmt, status.FailSyncReasonAllNilSubfields)}, - expectConfig: nil}, + expectConfig: nil, + event: false, + }, // Node.Spec.ConfigSource.ConfigMapRef is partial {desc: "Node.Spec.ConfigSource.ConfigMapRef is partial", @@ -140,17 +151,21 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: "", Reason: fmt.Sprintf(status.FailSyncReasonFmt, status.FailSyncReasonPartialObjectReference)}, - expectConfig: nil}, + expectConfig: nil, + event: false, + }, // Node.Spec.ConfigSource's UID does not align with namespace/name - {desc: "Node.Spec.ConfigSource's UID does not align with namespace/name", + {desc: "Node.Spec.ConfigSource.ConfigMapRef.UID does not align with Namespace/Name", configSource: &apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{UID: "foo", Namespace: correctConfigMap.Namespace, Name: correctConfigMap.Name}}, expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: "", Reason: fmt.Sprintf(status.FailSyncReasonFmt, fmt.Sprintf(status.FailSyncReasonUIDMismatchFmt, "foo", correctConfigMap.UID))}, - expectConfig: nil}, + expectConfig: nil, + event: false, + }, // correct {desc: "correct", @@ -161,7 +176,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: fmt.Sprintf(status.CurRemoteMessageFmt, correctConfigMap.UID), Reason: status.CurRemoteOKReason}, - expectConfig: correctKC}, + expectConfig: correctKC, + event: true, + }, // fail-parse {desc: "fail-parse", @@ -172,7 +189,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: status.LkgDefaultMessage, Reason: fmt.Sprintf(status.CurFailParseReasonFmt, failParseConfigMap.UID)}, - expectConfig: nil}, + expectConfig: nil, + event: true, + }, // fail-validate {desc: "fail-validate", @@ -183,7 +202,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: status.LkgDefaultMessage, Reason: fmt.Sprintf(status.CurFailValidateReasonFmt, failValidateConfigMap.UID)}, - expectConfig: nil}, + expectConfig: nil, + event: true, + }, } L := len(states) @@ -194,8 +215,8 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube }) }) - Context("When a remote config becomes the new last-known-good before the Kubelet is updated to use a new, bad config", func() { - It("it should report a status and configz indicating that it rolled back to the new last-known-good", func() { + Context("When a remote config becomes the new last-known-good, and then the Kubelet is updated to use a new, bad config", func() { + It("the Kubelet should report a status and configz indicating that it rolled back to the new last-known-good", func() { var err error // we base the "lkg" configmap off of the current configuration, but set the trial // duration very low so that it quickly becomes the last-known-good @@ -225,7 +246,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: fmt.Sprintf(status.CurRemoteMessageFmt, lkgConfigMap.UID), Reason: status.CurRemoteOKReason}, - expectConfig: lkgKC}, + expectConfig: lkgKC, + event: true, + }, // bad config {desc: "bad config", @@ -236,7 +259,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionFalse, Message: fmt.Sprintf(status.LkgRemoteMessageFmt, lkgConfigMap.UID), Reason: fmt.Sprintf(status.CurFailParseReasonFmt, badConfigMap.UID)}, - expectConfig: lkgKC}, + expectConfig: lkgKC, + event: true, + }, } testBothDirections(f, &states[0], states[1:]) @@ -271,7 +296,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: fmt.Sprintf(status.CurRemoteMessageFmt, cm1.UID), Reason: status.CurRemoteOKReason}, - expectConfig: kc1}, + expectConfig: kc1, + event: true, + }, {desc: "cm2", configSource: &apiv1.NodeConfigSource{ConfigMapRef: &apiv1.ObjectReference{ UID: cm2.UID, @@ -280,7 +307,9 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube expectConfigOK: &apiv1.NodeCondition{Type: apiv1.NodeConfigOK, Status: apiv1.ConditionTrue, Message: fmt.Sprintf(status.CurRemoteMessageFmt, cm2.UID), Reason: status.CurRemoteOKReason}, - expectConfig: kc2}, + expectConfig: kc2, + event: true, + }, } for i := 0; i < 50; i++ { // change the config 101 times (changes 3 times in the first iteration, 2 times in each subsequent iteration) @@ -296,61 +325,68 @@ var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:DynamicKube func testBothDirections(f *framework.Framework, first *configState, states []configState) { // set to first and check that everything got set up properly By(fmt.Sprintf("setting configSource to state %q", first.desc)) - setAndTestKubeletConfigState(f, first) + // we don't always expect an event here, because setting "first" might not represent + // a change from the current configuration + setAndTestKubeletConfigState(f, first, false) // for each state, set to that state, check condition and configz, then reset to first and check again for i := range states { By(fmt.Sprintf("from %q to %q", first.desc, states[i].desc)) - setAndTestKubeletConfigState(f, &states[i]) + // from first -> states[i], states[i].event fully describes whether we should get a config change event + setAndTestKubeletConfigState(f, &states[i], states[i].event) By(fmt.Sprintf("back to %q from %q", first.desc, states[i].desc)) - setAndTestKubeletConfigState(f, first) + // whether first -> states[i] should have produced a config change event partially determines whether states[i] -> first should produce an event + setAndTestKubeletConfigState(f, first, first.event && states[i].event) } } // setAndTestKubeletConfigState tests that after setting the config source, the ConfigOK condition // and (if appropriate) configuration exposed via conifgz are as expected. // The configuration will be converted to the internal type prior to comparison. -func setAndTestKubeletConfigState(f *framework.Framework, state *configState) { +func setAndTestKubeletConfigState(f *framework.Framework, state *configState, expectEvent bool) { // set the desired state, retry a few times in case we are competing with other editors Eventually(func() error { if err := setNodeConfigSource(f, state.configSource); err != nil { - return err + return fmt.Errorf("case %s: error setting Node.Spec.ConfigSource", err) } return nil }, time.Minute, time.Second).Should(BeNil()) // check that config source actually got set to what we expect - checkNodeConfigSource(f, state.configSource) + checkNodeConfigSource(f, state.desc, state.configSource) // check condition - checkConfigOKCondition(f, state.expectConfigOK) + checkConfigOKCondition(f, state.desc, state.expectConfigOK) // check expectConfig if state.expectConfig != nil { - checkConfig(f, state.expectConfig) + checkConfig(f, state.desc, state.expectConfig) + } + // check that an event was sent for the config change + if expectEvent { + checkEvent(f, state.desc, state.configSource) } } // make sure the node's config source matches what we expect, after setting it -func checkNodeConfigSource(f *framework.Framework, expect *apiv1.NodeConfigSource) { +func checkNodeConfigSource(f *framework.Framework, desc string, expect *apiv1.NodeConfigSource) { const ( timeout = time.Minute interval = time.Second ) - Eventually(func() error { node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) if err != nil { - return err + return fmt.Errorf("checkNodeConfigSource: case %s: %v", desc, err) } actual := node.Spec.ConfigSource if !reflect.DeepEqual(expect, actual) { - return fmt.Errorf(spew.Sprintf("expected %#v but got %#v", expect, actual)) + return fmt.Errorf(spew.Sprintf("checkNodeConfigSource: case %s: expected %#v but got %#v", desc, expect, actual)) } return nil }, timeout, interval).Should(BeNil()) } // make sure the ConfigOK node condition eventually matches what we expect -func checkConfigOKCondition(f *framework.Framework, expect *apiv1.NodeCondition) { +func checkConfigOKCondition(f *framework.Framework, desc string, expect *apiv1.NodeCondition) { const ( timeout = time.Minute interval = time.Second @@ -359,14 +395,14 @@ func checkConfigOKCondition(f *framework.Framework, expect *apiv1.NodeCondition) Eventually(func() error { node, err := f.ClientSet.CoreV1().Nodes().Get(framework.TestContext.NodeName, metav1.GetOptions{}) if err != nil { - return err + return fmt.Errorf("checkConfigOKCondition: case %s: %v", desc, err) } actual := getConfigOKCondition(node.Status.Conditions) if actual == nil { - return fmt.Errorf("ConfigOK condition not found on node %q", framework.TestContext.NodeName) + return fmt.Errorf("checkConfigOKCondition: case %s: ConfigOK condition not found on node %q", desc, framework.TestContext.NodeName) } if err := expectConfigOK(expect, actual); err != nil { - return err + return fmt.Errorf("checkConfigOKCondition: case %s: %v", desc, err) } return nil }, timeout, interval).Should(BeNil()) @@ -388,7 +424,7 @@ func expectConfigOK(expect, actual *apiv1.NodeCondition) error { } // make sure config exposed on configz matches what we expect -func checkConfig(f *framework.Framework, expect *kubeletconfig.KubeletConfiguration) { +func checkConfig(f *framework.Framework, desc string, expect *kubeletconfig.KubeletConfiguration) { const ( timeout = time.Minute interval = time.Second @@ -396,11 +432,58 @@ func checkConfig(f *framework.Framework, expect *kubeletconfig.KubeletConfigurat Eventually(func() error { actual, err := getCurrentKubeletConfig() if err != nil { - return err + return fmt.Errorf("checkConfig: case %s: %v", desc, err) } if !reflect.DeepEqual(expect, actual) { - return fmt.Errorf(spew.Sprintf("expected %#v but got %#v", expect, actual)) + return fmt.Errorf(spew.Sprintf("checkConfig: case %s: expected %#v but got %#v", desc, expect, actual)) } return nil }, timeout, interval).Should(BeNil()) } + +// checkEvent makes sure an event was sent marking the Kubelet's restart to use new config, +// and that it mentions the config we expect. +func checkEvent(f *framework.Framework, desc string, expect *apiv1.NodeConfigSource) { + const ( + timeout = time.Minute + interval = time.Second + ) + Eventually(func() error { + events, err := f.ClientSet.CoreV1().Events("").List(metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("checkEvent: case %s: %v", desc, err) + } + // find config changed event with most recent timestamp + var recent *apiv1.Event + for i := range events.Items { + if events.Items[i].Reason == controller.KubeletConfigChangedEventReason { + if recent == nil { + recent = &events.Items[i] + continue + } + // for these events, first and last timestamp are always the same + if events.Items[i].FirstTimestamp.Time.After(recent.FirstTimestamp.Time) { + recent = &events.Items[i] + } + } + } + + // we expect at least one config change event + if recent == nil { + return fmt.Errorf("checkEvent: case %s: no events found with reason %s", desc, controller.KubeletConfigChangedEventReason) + } + + // ensure the message is what we expect (including the resource path) + expectMessage := fmt.Sprintf(controller.EventMessageFmt, controller.LocalConfigMessage) + if expect != nil { + if expect.ConfigMapRef != nil { + expectMessage = fmt.Sprintf(controller.EventMessageFmt, fmt.Sprintf("/api/v1/namespaces/%s/configmaps/%s", expect.ConfigMapRef.Namespace, expect.ConfigMapRef.Name)) + } + } + if expectMessage != recent.Message { + return fmt.Errorf("checkEvent: case %s: expected event message %q but got %q", desc, expectMessage, recent.Message) + } + + return nil + }, timeout, interval).Should(BeNil()) +}