kubeadm: improve resiliency when conflicts arise when updating the kubeadm-config ConfigMap

Add the functionality to support `CreateOrMutateConfigMap` and `MutateConfigMap`.

* `CreateOrMutateConfigMap` will try to create a given ConfigMap object; if this ConfigMap
  already exists, a new version of the resource will be retrieved from the server and a
  mutator callback will be called on it. Then, an `Update` of the mutated object will be
  performed. If there's a conflict during this `Update` operation, retry until no conflict
  happens. On every retry the object is refreshed from the server to the latest version.

* `MutateConfigMap` will try to get the latest version of the ConfigMap from the server,
  call the mutator callback and then try to `Update` the mutated object. If there's a
  conflict during this `Update` operation, retry until no conflict happens. On every retry
  the object is refreshed from the server to the latest version.

Add unit tests for `MutateConfigMap`

* One test checks that in case of no conflicts, the update of the
  given ConfigMap happens without any issues.

* Another test mimics 5 consecutive CONFLICT responses when updating
  the given ConfigMap, whereas the sixth try it will work.
k3s-v1.15.3
Rafael Fernández López 2019-04-19 13:56:36 +02:00
parent 888b81b638
commit bc8bafd825
No known key found for this signature in database
GPG Key ID: 8902294E78418CF9
6 changed files with 233 additions and 71 deletions

View File

@ -48,6 +48,7 @@ go_test(
"//cmd/kubeadm/app/apis/kubeadm/v1beta1:go_default_library",
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -45,60 +45,21 @@ func ResetClusterStatusForNode(nodeName string, client clientset.Interface) erro
fmt.Printf("[reset] Removing info for node %q from the ConfigMap %q in the %q Namespace\n",
nodeName, kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
// Get the kubeadm ConfigMap
configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(kubeadmconstants.KubeadmConfigConfigMap, metav1.GetOptions{})
if err != nil {
return errors.Wrap(err, "failed to get config map")
}
// Handle missing ClusterConfiguration in the ConfigMap. Should only happen if someone manually
// interacted with the ConfigMap.
clusterConfigurationYaml, ok := configMap.Data[kubeadmconstants.ClusterConfigurationConfigMapKey]
if !ok {
return errors.Errorf("cannot find key %q in ConfigMap %q in the %q Namespace",
kubeadmconstants.ClusterConfigurationConfigMapKey, kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
// Obtain the existing ClusterStatus object
clusterStatus, err := configutil.UnmarshalClusterStatus(configMap.Data)
if err != nil {
return err
}
// Handle a nil APIEndpoints map. Should only happen if someone manually
// interacted with the ConfigMap.
if clusterStatus.APIEndpoints == nil {
return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil",
kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
// Check for existence of the nodeName key in the list of APIEndpoints.
// Return early if it's missing.
apiEndpoint, ok := clusterStatus.APIEndpoints[nodeName]
if !ok {
klog.Warningf("No APIEndpoint registered for node %q", nodeName)
return nil
}
klog.V(2).Infof("Removing APIEndpoint %#v for node %q", apiEndpoint, nodeName)
delete(clusterStatus.APIEndpoints, nodeName)
// Marshal the ClusterStatus back into YAML
clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus)
if err != nil {
return err
}
// Update the ClusterStatus in the ConfigMap
return apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
kubeadmconstants.ClusterConfigurationConfigMapKey: clusterConfigurationYaml,
kubeadmconstants.ClusterStatusConfigMapKey: string(clusterStatusYaml),
},
return apiclient.MutateConfigMap(client, metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Namespace: metav1.NamespaceSystem,
}, func(cm *v1.ConfigMap) error {
return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
// Handle a nil APIEndpoints map. Should only happen if someone manually
// interacted with the ConfigMap.
if cs.APIEndpoints == nil {
return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil",
kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
klog.V(2).Infof("Removing APIEndpoint for Node %q", nodeName)
delete(cs.APIEndpoints, nodeName)
return nil
})
})
}
@ -119,26 +80,18 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
}
// Prepare the ClusterStatus for upload
// Gets the current cluster status
// TODO: use configmap locks on this object on the get before the update.
clusterStatus, err := configutil.GetClusterStatus(client)
if err != nil {
return err
clusterStatus := &kubeadmapi.ClusterStatus{
APIEndpoints: map[string]kubeadmapi.APIEndpoint{
cfg.NodeRegistration.Name: cfg.LocalAPIEndpoint,
},
}
// Updates the ClusterStatus with the current control plane instance
if clusterStatus.APIEndpoints == nil {
clusterStatus.APIEndpoints = map[string]kubeadmapi.APIEndpoint{}
}
clusterStatus.APIEndpoints[cfg.NodeRegistration.Name] = cfg.LocalAPIEndpoint
// Marshal the ClusterStatus back into YAML
// Marshal the ClusterStatus into YAML
clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus)
if err != nil {
return err
}
err = apiclient.CreateOrUpdateConfigMap(client, &v1.ConfigMap{
err = apiclient.CreateOrMutateConfigMap(client, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: kubeadmconstants.KubeadmConfigConfigMap,
Namespace: metav1.NamespaceSystem,
@ -147,6 +100,17 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
kubeadmconstants.ClusterConfigurationConfigMapKey: string(clusterConfigurationYaml),
kubeadmconstants.ClusterStatusConfigMapKey: string(clusterStatusYaml),
},
}, func(cm *v1.ConfigMap) error {
return mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
// Handle a nil APIEndpoints map. Should only happen if someone manually
// interacted with the ConfigMap.
if cs.APIEndpoints == nil {
return errors.Errorf("APIEndpoints from ConfigMap %q in the %q Namespace is nil",
kubeadmconstants.KubeadmConfigConfigMap, metav1.NamespaceSystem)
}
cs.APIEndpoints[cfg.NodeRegistration.Name] = cfg.LocalAPIEndpoint
return nil
})
})
if err != nil {
return err
@ -191,3 +155,23 @@ func UploadConfiguration(cfg *kubeadmapi.InitConfiguration, client clientset.Int
},
})
}
func mutateClusterStatus(cm *v1.ConfigMap, mutator func(*kubeadmapi.ClusterStatus) error) error {
// Obtain the existing ClusterStatus object
clusterStatus, err := configutil.UnmarshalClusterStatus(cm.Data)
if err != nil {
return err
}
// Mutate the ClusterStatus
if err := mutator(clusterStatus); err != nil {
return err
}
// Marshal the ClusterStatus back into YAML
clusterStatusYaml, err := configutil.MarshalKubeadmConfigObject(clusterStatus)
if err != nil {
return err
}
// Write the marshaled mutated cluster status back to the ConfigMap
cm.Data[kubeadmconstants.ClusterStatusConfigMapKey] = string(clusterStatusYaml)
return nil
}

View File

@ -20,6 +20,7 @@ import (
"reflect"
"testing"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -155,3 +156,36 @@ func TestUploadConfiguration(t *testing.T) {
})
}
}
func TestMutateClusterStatus(t *testing.T) {
cm := &v1.ConfigMap{
Data: map[string]string{
kubeadmconstants.ClusterStatusConfigMapKey: "",
},
}
endpoints := map[string]kubeadmapi.APIEndpoint{
"some-node": {
AdvertiseAddress: "127.0.0.1",
BindPort: 6443,
},
}
err := mutateClusterStatus(cm, func(cs *kubeadmapi.ClusterStatus) error {
cs.APIEndpoints = endpoints
return nil
})
if err != nil {
t.Fatalf("could not mutate cluster status: %v", err)
}
// Try to unmarshal the cluster status back and compare with the original mutated structure
cs, err := configutil.UnmarshalClusterStatus(cm.Data)
if err != nil {
t.Fatalf("could not unmarshal cluster status: %v", err)
}
if !reflect.DeepEqual(cs.APIEndpoints, endpoints) {
t.Fatalf("mutation of cluster status failed: %v", err)
}
}

View File

@ -40,6 +40,7 @@ go_library(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)
@ -68,9 +69,12 @@ go_test(
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/rbac/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)

View File

@ -19,11 +19,12 @@ package apiclient
import (
"encoding/json"
"fmt"
"time"
"github.com/pkg/errors"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,11 +32,14 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientsetretry "k8s.io/client-go/util/retry"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
)
// ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error
type ConfigMapMutator func(*v1.ConfigMap) error
// TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type
// TODO: We may want to retry if .Update() fails on 409 Conflict
// CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
@ -51,6 +55,43 @@ func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error
return nil
}
// CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from
// the cluster and mutator callback will be called on it, then an Update of the mutated ConfigMap will be performed. This function is resilient
// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
// taking place)
func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(cm); err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrap(err, "unable to create ConfigMap")
}
return MutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
}
return nil
}
// MutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it
// by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient
// to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
// taking place).
func MutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
return clientsetretry.RetryOnConflict(wait.Backoff{
Steps: 20,
Duration: 500 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}, func() error {
configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(meta.Name, metav1.GetOptions{})
if err != nil {
return err
}
if err = mutator(configMap); err != nil {
return errors.Wrap(err, "unable to mutate ConfigMap")
}
_, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(configMap)
return err
})
}
// CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(configMapName, metav1.GetOptions{}); err != nil {

View File

@ -19,11 +19,17 @@ package apiclient
import (
"testing"
"k8s.io/api/core/v1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
)
const configMapName = "configmap"
func TestPatchNodeNonErrorCases(t *testing.T) {
testcases := []struct {
name string
@ -81,3 +87,95 @@ func TestPatchNodeNonErrorCases(t *testing.T) {
})
}
}
func TestCreateOrMutateConfigMap(t *testing.T) {
client := fake.NewSimpleClientset()
err := CreateOrMutateConfigMap(client, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
"key": "some-value",
},
}, func(cm *v1.ConfigMap) error {
t.Fatal("mutate should not have been called, since the ConfigMap should have been created instead of mutated")
return nil
})
if err != nil {
t.Fatalf("error creating ConfigMap: %v", err)
}
_, err = client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
if err != nil {
t.Fatalf("error retrieving ConfigMap: %v", err)
}
}
func createClientAndConfigMap(t *testing.T) *fake.Clientset {
client := fake.NewSimpleClientset()
_, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
"key": "some-value",
},
})
if err != nil {
t.Fatalf("error creating ConfigMap: %v", err)
}
return client
}
func TestMutateConfigMap(t *testing.T) {
client := createClientAndConfigMap(t)
err := MutateConfigMap(client, metav1.ObjectMeta{
Name: configMapName,
Namespace: metav1.NamespaceSystem,
}, func(cm *v1.ConfigMap) error {
cm.Data["key"] = "some-other-value"
return nil
})
if err != nil {
t.Fatalf("error mutating regular ConfigMap: %v", err)
}
cm, _ := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
if cm.Data["key"] != "some-other-value" {
t.Fatalf("ConfigMap mutation was invalid, has: %q", cm.Data["key"])
}
}
func TestMutateConfigMapWithConflict(t *testing.T) {
client := createClientAndConfigMap(t)
// Mimic that the first 5 updates of the ConfigMap returns a conflict, whereas the sixth update
// succeeds
conflict := 5
client.PrependReactor("update", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if conflict > 0 {
conflict--
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), configMapName, errors.New("Conflict"))
}
return false, update.GetObject(), nil
})
err := MutateConfigMap(client, metav1.ObjectMeta{
Name: configMapName,
Namespace: metav1.NamespaceSystem,
}, func(cm *v1.ConfigMap) error {
cm.Data["key"] = "some-other-value"
return nil
})
if err != nil {
t.Fatalf("error mutating conflicting ConfigMap: %v", err)
}
cm, _ := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
if cm.Data["key"] != "some-other-value" {
t.Fatalf("ConfigMap mutation with conflict was invalid, has: %q", cm.Data["key"])
}
}