From bc8bafd825816844b1a93af98b22b86a3f32a65e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez=20L=C3=B3pez?= Date: Fri, 19 Apr 2019 13:56:36 +0200 Subject: [PATCH] kubeadm: improve resiliency when conflicts arise when updating the kubeadm-config ConfigMap Add the functionality to support `CreateOrMutateConfigMap` and `MutateConfigMap`. * `CreateOrMutateConfigMap` will try to create a given ConfigMap object; if this ConfigMap already exists, a new version of the resource will be retrieved from the server and a mutator callback will be called on it. Then, an `Update` of the mutated object will be performed. If there's a conflict during this `Update` operation, retry until no conflict happens. On every retry the object is refreshed from the server to the latest version. * `MutateConfigMap` will try to get the latest version of the ConfigMap from the server, call the mutator callback and then try to `Update` the mutated object. If there's a conflict during this `Update` operation, retry until no conflict happens. On every retry the object is refreshed from the server to the latest version. Add unit tests for `MutateConfigMap` * One test checks that in case of no conflicts, the update of the given ConfigMap happens without any issues. * Another test mimics 5 consecutive CONFLICT responses when updating the given ConfigMap, whereas the sixth try it will work. --- cmd/kubeadm/app/phases/uploadconfig/BUILD | 1 + .../app/phases/uploadconfig/uploadconfig.go | 120 ++++++++---------- .../phases/uploadconfig/uploadconfig_test.go | 34 +++++ cmd/kubeadm/app/util/apiclient/BUILD | 4 + cmd/kubeadm/app/util/apiclient/idempotency.go | 45 ++++++- .../app/util/apiclient/idempotency_test.go | 100 ++++++++++++++- 6 files changed, 233 insertions(+), 71 deletions(-) diff --git a/cmd/kubeadm/app/phases/uploadconfig/BUILD b/cmd/kubeadm/app/phases/uploadconfig/BUILD index 0611f60cd7..a794b35144 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/BUILD +++ b/cmd/kubeadm/app/phases/uploadconfig/BUILD @@ -48,6 +48,7 @@ go_test( "//cmd/kubeadm/app/apis/kubeadm/v1beta1:go_default_library", "//cmd/kubeadm/app/constants:go_default_library", "//cmd/kubeadm/app/util/config:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go index b2ccf4ed60..88be825558 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go +++ b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig.go @@ -45,60 +45,21 @@ func ResetClusterStatusForNode(nodeName string, client clientset.Interface) erro fmt.Printf("[reset] Removing info for node %q from the ConfigMap %q in the %q Namespace\n", nodeName, kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem) - // Get the kubeadm ConfigMap - configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(kubeadmconstants.KubeadmConfigConfigMap, metav1.GetOptions{}) - if err != nil { - return errors.Wrap(err, "failed to get config map") - } - - // Handle missing ClusterConfiguration in the ConfigMap. Should only happen if someone manually - // interacted with the ConfigMap. - clusterConfigurationYaml, ok := configMap.Data[kubeadmconstants.ClusterConfigurationConfigMapKey] - if !ok { - return errors.Errorf("cannot find key %q in ConfigMap %q in the %q Namespace", - kubeadmconstants.ClusterConfigurationConfigMapKey, kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem) - } - - // Obtain the existing ClusterStatus object - clusterStatus, err := configutil.UnmarshalClusterStatus(configMap.Data) - if err != nil { - return err - } - - // Handle a nil APIEndpoints map. Should only happen if someone manually - // interacted with the ConfigMap. - if clusterStatus.APIEndpoints == nil { - return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil", - kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem) - } - - // Check for existence of the nodeName key in the list of APIEndpoints. - // Return early if it's missing. - apiEndpoint, ok := clusterStatus.APIEndpoints[nodeName] - if !ok { - klog.Warningf("No APIEndpoint registered for node %q", nodeName) - return nil - } - - klog.V(2).Infof("Removing APIEndpoint %#v for node %q", apiEndpoint, nodeName) - delete(clusterStatus.APIEndpoints, nodeName) - - // Marshal the ClusterStatus back into YAML - clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus) - if err != nil { - return err - } - - // Update the ClusterStatus in the ConfigMap - return apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: kubeadmconstants.KubeadmConfigConfigMap, - Namespace: metav1.NamespaceSystem, - }, - Data: map[string]string{ - kubeadmconstants.ClusterConfigurationConfigMapKey: clusterConfigurationYaml, - kubeadmconstants.ClusterStatusConfigMapKey: string(clusterStatusYaml), - }, + return apiclient.MutateConfigMap(client, metav1.ObjectMeta{ + Name: kubeadmconstants.KubeadmConfigConfigMap, + Namespace: metav1.NamespaceSystem, + }, func(cm *v1.ConfigMap) error { + return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error { + // Handle a nil APIEndpoints map. Should only happen if someone manually + // interacted with the ConfigMap. + if cs.APIEndpoints == nil { + return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil", + kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem) + } + klog.V(2).Infof("Removing APIEndpoint for Node %q", nodeName) + delete(cs.APIEndpoints, nodeName) + return nil + }) }) } @@ -119,26 +80,18 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int } // Prepare the ClusterStatus for upload - // Gets the current cluster status - // TODO: use configmap locks on this object on the get before the update. - clusterStatus, err := configutil.GetClusterStatus(client) - if err != nil { - return err + clusterStatus := &kubeadmapi.ClusterStatus{ + APIEndpoints: map[string]kubeadmapi.APIEndpoint{ + cfg.NodeRegistration.Name: cfg.LocalAPIEndpoint, + }, } - - // Updates the ClusterStatus with the current control plane instance - if clusterStatus.APIEndpoints == nil { - clusterStatus.APIEndpoints = map[string]kubeadmapi.APIEndpoint{} - } - clusterStatus.APIEndpoints[cfg.NodeRegistration.Name] = cfg.LocalAPIEndpoint - - // Marshal the ClusterStatus back into YAML + // Marshal the ClusterStatus into YAML clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus) if err != nil { return err } - err = apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{ + err = apiclient.CreateOrMutateConfigMap(client, &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: kubeadmconstants.KubeadmConfigConfigMap, Namespace: metav1.NamespaceSystem, @@ -147,6 +100,17 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int kubeadmconstants.ClusterConfigurationConfigMapKey: string(clusterConfigurationYaml), kubeadmconstants.ClusterStatusConfigMapKey: string(clusterStatusYaml), }, + }, func(cm *v1.ConfigMap) error { + return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error { + // Handle a nil APIEndpoints map. Should only happen if someone manually + // interacted with the ConfigMap. + if cs.APIEndpoints == nil { + return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil", + kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem) + } + cs.APIEndpoints[cfg.NodeRegistration.Name] = cfg.LocalAPIEndpoint + return nil + }) }) if err != nil { return err @@ -191,3 +155,23 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int }, }) } + +func mutateClusterStatus(cm *v1.ConfigMap, mutator func(*kubeadmapi.ClusterStatus) error) error { + // Obtain the existing ClusterStatus object + clusterStatus, err := configutil.UnmarshalClusterStatus(cm.Data) + if err != nil { + return err + } + // Mutate the ClusterStatus + if err := mutator(clusterStatus); err != nil { + return err + } + // Marshal the ClusterStatus back into YAML + clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus) + if err != nil { + return err + } + // Write the marshaled mutated cluster status back to the ConfigMap + cm.Data[kubeadmconstants.ClusterStatusConfigMapKey] = string(clusterStatusYaml) + return nil +} diff --git a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go index ad6e3e60ee..4ffc981dba 100644 --- a/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go +++ b/cmd/kubeadm/app/phases/uploadconfig/uploadconfig_test.go @@ -20,6 +20,7 @@ import ( "reflect" "testing" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -155,3 +156,36 @@ func TestUploadConfiguration(t *testing.T) { }) } } + +func TestMutateClusterStatus(t *testing.T) { + cm := &v1.ConfigMap{ + Data: map[string]string{ + kubeadmconstants.ClusterStatusConfigMapKey: "", + }, + } + + endpoints := map[string]kubeadmapi.APIEndpoint{ + "some-node": { + AdvertiseAddress: "127.0.0.1", + BindPort: 6443, + }, + } + + err := mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error { + cs.APIEndpoints = endpoints + return nil + }) + if err != nil { + t.Fatalf("could not mutate cluster status: %v", err) + } + + // Try to unmarshal the cluster status back and compare with the original mutated structure + cs, err := configutil.UnmarshalClusterStatus(cm.Data) + if err != nil { + t.Fatalf("could not unmarshal cluster status: %v", err) + } + + if !reflect.DeepEqual(cs.APIEndpoints, endpoints) { + t.Fatalf("mutation of cluster status failed: %v", err) + } +} diff --git a/cmd/kubeadm/app/util/apiclient/BUILD b/cmd/kubeadm/app/util/apiclient/BUILD index c16c612c4f..f305b9fee7 100644 --- a/cmd/kubeadm/app/util/apiclient/BUILD +++ b/cmd/kubeadm/app/util/apiclient/BUILD @@ -40,6 +40,7 @@ go_library( "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library", + "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", ], ) @@ -68,9 +69,12 @@ go_test( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/rbac/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/cmd/kubeadm/app/util/apiclient/idempotency.go b/cmd/kubeadm/app/util/apiclient/idempotency.go index 03ba1dd476..5a72caaab8 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency.go @@ -19,11 +19,12 @@ package apiclient import ( "encoding/json" "fmt" + "time" "github.com/pkg/errors" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,11 +32,14 @@ import ( "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + clientsetretry "k8s.io/client-go/util/retry" "k8s.io/kubernetes/cmd/kubeadm/app/constants" ) +// ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error +type ConfigMapMutator func(*v1.ConfigMap) error + // TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type -// TODO: We may want to retry if .Update() fails on 409 Conflict // CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error { @@ -51,6 +55,43 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error return nil } +// CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from +// the cluster and mutator callback will be called on it, then an Update of the mutated ConfigMap will be performed. This function is resilient +// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was +// taking place) +func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error { + if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(cm); err != nil { + if !apierrors.IsAlreadyExists(err) { + return errors.Wrap(err, "unable to create ConfigMap") + } + return MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator) + } + return nil +} + +// MutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it +// by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient +// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was +// taking place). +func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error { + return clientsetretry.RetryOnConflict(wait.Backoff{ + Steps: 20, + Duration: 500 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, + }, func() error { + configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(meta.Name, metav1.GetOptions{}) + if err != nil { + return err + } + if err = mutator(configMap); err != nil { + return errors.Wrap(err, "unable to mutate ConfigMap") + } + _, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(configMap) + return err + }) +} + // CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead. func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error { if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(configMapName, metav1.GetOptions{}); err != nil { diff --git a/cmd/kubeadm/app/util/apiclient/idempotency_test.go b/cmd/kubeadm/app/util/apiclient/idempotency_test.go index d306f62268..22db993b69 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency_test.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency_test.go @@ -19,11 +19,17 @@ package apiclient import ( "testing" - "k8s.io/api/core/v1" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" ) +const configMapName = "configmap" + func TestPatchNodeNonErrorCases(t *testing.T) { testcases := []struct { name string @@ -81,3 +87,95 @@ func TestPatchNodeNonErrorCases(t *testing.T) { }) } } + +func TestCreateOrMutateConfigMap(t *testing.T) { + client := fake.NewSimpleClientset() + err := CreateOrMutateConfigMap(client, &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configMapName, + Namespace: metav1.NamespaceSystem, + }, + Data: map[string]string{ + "key": "some-value", + }, + }, func(cm *v1.ConfigMap) error { + t.Fatal("mutate should not have been called, since the ConfigMap should have been created instead of mutated") + return nil + }) + if err != nil { + t.Fatalf("error creating ConfigMap: %v", err) + } + _, err = client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("error retrieving ConfigMap: %v", err) + } +} + +func createClientAndConfigMap(t *testing.T) *fake.Clientset { + client := fake.NewSimpleClientset() + _, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configMapName, + Namespace: metav1.NamespaceSystem, + }, + Data: map[string]string{ + "key": "some-value", + }, + }) + if err != nil { + t.Fatalf("error creating ConfigMap: %v", err) + } + return client +} + +func TestMutateConfigMap(t *testing.T) { + client := createClientAndConfigMap(t) + + err := MutateConfigMap(client, metav1.ObjectMeta{ + Name: configMapName, + Namespace: metav1.NamespaceSystem, + }, func(cm *v1.ConfigMap) error { + cm.Data["key"] = "some-other-value" + return nil + }) + if err != nil { + t.Fatalf("error mutating regular ConfigMap: %v", err) + } + + cm, _ := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{}) + if cm.Data["key"] != "some-other-value" { + t.Fatalf("ConfigMap mutation was invalid, has: %q", cm.Data["key"]) + } +} + +func TestMutateConfigMapWithConflict(t *testing.T) { + client := createClientAndConfigMap(t) + + // Mimic that the first 5 updates of the ConfigMap returns a conflict, whereas the sixth update + // succeeds + conflict := 5 + client.PrependReactor("update", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + if conflict > 0 { + conflict-- + return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), configMapName, errors.New("Conflict")) + } + return false, update.GetObject(), nil + }) + + err := MutateConfigMap(client, metav1.ObjectMeta{ + Name: configMapName, + Namespace: metav1.NamespaceSystem, + }, func(cm *v1.ConfigMap) error { + cm.Data["key"] = "some-other-value" + return nil + }) + if err != nil { + t.Fatalf("error mutating conflicting ConfigMap: %v", err) + } + + cm, _ := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{}) + if cm.Data["key"] != "some-other-value" { + t.Fatalf("ConfigMap mutation with conflict was invalid, has: %q", cm.Data["key"]) + } +}