Merge pull request #63221 from mtaufen/dkcfg-live-configmap

Automatic merge from submit-queue (batch tested with PRs 63881, 64046, 63409, 63402, 63221). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Kubelet responds to ConfigMap mutations for dynamic Kubelet config

This PR makes dynamic Kubelet config easier to reason about by leaving less room for silent skew scenarios. The new behavior is as follows:
- ConfigMap does not exist: Kubelet reports error status due to missing source
- ConfigMap is created: Kubelet starts using it
- ConfigMap is updated: Kubelet respects the update (but we discourage this pattern, in favor of incrementally migrating to a new ConfigMap)
- ConfigMap is deleted: Kubelet keeps using the config (non-disruptive), but reports error status due to missing source
- ConfigMap is recreated: Kubelet respects any updates (but, again, we discourage this pattern)

This PR also makes a small change to the config checkpoint file tree structure, because ResourceVersion is now taken into account when saving checkpoints. The new structure is as follows:
```
- dir named by --dynamic-config-dir (root for managing dynamic config)
| - meta
  | - assigned (encoded kubeletconfig/v1beta1.SerializedNodeConfigSource object, indicating the assigned config)
  | - last-known-good (encoded kubeletconfig/v1beta1.SerializedNodeConfigSource object, indicating the last-known-good config)
| - checkpoints
  | - uid1 (dir for versions of object identified by uid1)
    | - resourceVersion1 (dir for unpacked files from resourceVersion1)
    | - ...
  | - ...
```


fixes: #61643

```release-note
The dynamic Kubelet config feature will now update config in the event of a ConfigMap mutation, which reduces the chance for silent config skew. Only name, namespace, and kubeletConfigKey may now be set in Node.Spec.ConfigSource.ConfigMap. The least disruptive pattern for config management is still to create a new ConfigMap and incrementally roll out a new Node.Spec.ConfigSource.
```
pull/8/head
Kubernetes Submit Queue 2018-05-21 17:05:42 -07:00 committed by GitHub
commit 2a989c60ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1303 additions and 694 deletions

View File

@ -615,7 +615,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName))
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
if kubeDeps.Auth == nil {

View File

@ -4150,11 +4150,10 @@ func validateNodeConfigSourceSpec(source *core.NodeConfigSource, fldPath *field.
// validation specific to Node.Spec.ConfigSource.ConfigMap
func validateConfigMapNodeConfigSourceSpec(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
// TODO(#61643): Prevent ref.UID from being set here when we switch from requiring UID to respecting all ConfigMap updates
if string(source.UID) == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in spec"))
// uid and resourceVersion must not be set in spec
if string(source.UID) != "" {
allErrs = append(allErrs, field.Forbidden(fldPath.Child("uid"), "uid must not be set in spec"))
}
// resourceVersion must not be set in spec
if source.ResourceVersion != "" {
allErrs = append(allErrs, field.Forbidden(fldPath.Child("resourceVersion"), "resourceVersion must not be set in spec"))
}
@ -4196,12 +4195,13 @@ func validateNodeConfigSourceStatus(source *core.NodeConfigSource, fldPath *fiel
// validation specific to Node.Status.Config.(Active|Assigned|LastKnownGood).ConfigMap
func validateConfigMapNodeConfigSourceStatus(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
// uid and resourceVersion must be set in status
if string(source.UID) == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in status"))
}
// TODO(#63221): require ResourceVersion in status when we start respecting ConfigMap mutations (the Kubelet isn't tracking it internally until
// that PR, which makes it difficult to report for now).
if source.ResourceVersion == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("resourceVersion"), "resourceVersion must be set in status"))
}
return append(allErrs, validateConfigMapNodeConfigSource(source, fldPath)...)
}

View File

@ -19,7 +19,9 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
@ -40,8 +42,11 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -34,9 +34,11 @@ var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil to be a Payload")
} else if len(cm.ObjectMeta.UID) == 0 {
return nil, fmt.Errorf("ConfigMap must have a UID to be a Payload")
return nil, fmt.Errorf("ConfigMap must be non-nil")
} else if cm.ObjectMeta.UID == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty UID")
} else if cm.ObjectMeta.ResourceVersion == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion")
}
return &configMapPayload{cm}, nil
@ -46,6 +48,10 @@ func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
func (p *configMapPayload) ResourceVersion() string {
return p.cm.ResourceVersion
}
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}

View File

@ -34,19 +34,46 @@ func TestNewConfigMapPayload(t *testing.T) {
cm *apiv1.ConfigMap
err string
}{
{"nil v1/ConfigMap", nil, "must be non-nil"},
{"empty v1/ConfigMap", &apiv1.ConfigMap{}, "must have a UID"},
{"populated v1/ConfigMap",
&apiv1.ConfigMap{
{
desc: "nil",
cm: nil,
err: "ConfigMap must be non-nil",
},
{
desc: "missing uid",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
ResourceVersion: "rv",
},
},
err: "ConfigMap must have a non-empty UID",
},
{
desc: "missing resourceVersion",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
},
},
err: "ConfigMap must have a non-empty ResourceVersion",
},
{
desc: "populated v1/ConfigMap",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
ResourceVersion: "rv",
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
}, ""},
},
err: "",
},
}
for _, c := range cases {
@ -66,7 +93,7 @@ func TestNewConfigMapPayload(t *testing.T) {
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect}})
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
@ -76,6 +103,18 @@ func TestConfigMapPayloadUID(t *testing.T) {
}
}
func TestConfigMapPayloadResourceVersion(t *testing.T) {
const expect = "rv"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
resourceVersion := payload.ResourceVersion()
if expect != resourceVersion {
t.Errorf("expect %q, but got %q", expect, resourceVersion)
}
}
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
@ -96,7 +135,7 @@ func TestConfigMapPayloadFiles(t *testing.T) {
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid"}, Data: c.data})
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}

View File

@ -18,12 +18,18 @@ package checkpoint
import (
"fmt"
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
@ -35,8 +41,13 @@ import (
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
// The return value is guaranteed non-empty.
UID() string
// ResourceVersion returns a resource version for the payload.
// The return value is guaranteed non-empty.
ResourceVersion() string
// Files returns a map of filenames to file contents.
Files() map[string]string
@ -46,16 +57,29 @@ type Payload interface {
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// UID returns a globally unique identifier of the source described by the remote config source object
UID() string
// KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
KubeletFilename() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// Download downloads the remote config source object returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails
Download(client clientset.Interface) (Payload, string, error)
// Encode returns a []byte representation of the NodeConfigSource behind the RemoteConfigSource
// UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
UID() string
// ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
ResourceVersion() string
// Download downloads the remote config source's target object and returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails.
// Download takes an optional store as an argument. If provided, Download will check this store for the
// target object prior to contacting the API server.
// Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
Download(client clientset.Interface, store cache.Store) (Payload, string, error)
// Informer returns an informer that can be used to detect changes to the remote config source
Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
@ -104,7 +128,11 @@ func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
source, _, err := NewRemoteConfigSource(&cs.Source)
return source, err
if err != nil {
return nil, err
}
return source, nil
}
// EqualRemoteConfigSources is a helper for comparing remote config sources by
@ -123,10 +151,6 @@ type remoteConfigMap struct {
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
func (r *remoteConfigMap) KubeletFilename() string {
return r.source.ConfigMap.KubeletConfigKey
}
@ -138,32 +162,82 @@ func (r *remoteConfigMap) APIPath() string {
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) Download(client clientset.Interface) (Payload, string, error) {
var reason string
uid := string(r.source.ConfigMap.UID)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
utillog.Infof("attempting to download ConfigMap with UID %q", uid)
func (r *remoteConfigMap) ResourceVersion() string {
return r.source.ConfigMap.ResourceVersion
}
// get the ConfigMap via namespace/name, there doesn't seem to be a way to get it by UID
cm, err := client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
var (
cm *apiv1.ConfigMap
err error
)
// check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
if store != nil {
utillog.Infof("checking in-memory store for %s", r.APIPath())
cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
if err != nil {
// just log the error, we'll attempt a direct download instead
utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
} else if cm != nil {
utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} else {
utillog.Infof("did not find %s in in-memory store", r.APIPath())
}
}
// if we didn't find the ConfigMap in the in-memory store, download it from the API server
if cm == nil {
utillog.Infof("attempting to download %s", r.APIPath())
cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
}
// ensure that UID matches the UID on the source
if r.source.ConfigMap.UID != cm.UID {
reason = fmt.Sprintf(status.UIDMismatchErrorFmt, r.source.ConfigMap.UID, r.APIPath(), cm.UID)
return nil, reason, fmt.Errorf(reason)
}
utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} // Assert: Now we have a non-nil ConfigMap
// construct Payload from the ConfigMap
payload, err := NewConfigMapPayload(cm)
if err != nil {
reason = fmt.Sprintf("invalid downloaded object")
return nil, reason, fmt.Errorf("%s, error: %v", reason, err)
// We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
// never happen on objects in the informer's store, or objects downloaded from the API server
// directly, so we report InternalError.
return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// update internal UID and ResourceVersion based on latest ConfigMap
r.source.ConfigMap.UID = cm.UID
r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
return payload, "", nil
}
func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
// select ConfigMap by name
fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
FieldSelector: fieldselector.String(),
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldselector.String(),
ResourceVersion: options.ResourceVersion,
})
},
}
utillog.Infof("successfully downloaded ConfigMap with UID %q", uid)
return payload, "", nil
informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
informer.AddEventHandler(handler)
return informer
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
@ -182,3 +256,18 @@ func (r *remoteConfigMap) Encode() ([]byte, error) {
func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
return r.source.DeepCopy()
}
func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
obj, ok, err := store.GetByKey(key)
if err != nil || !ok {
return nil, err
}
cm, ok := obj.(*apiv1.ConfigMap)
if !ok {
err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
utillog.Errorf(err.Error())
return nil, err
}
return cm, nil
}

View File

@ -25,7 +25,9 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
@ -122,72 +124,90 @@ func TestRemoteConfigMapDownload(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
}}
client := fakeclient.NewSimpleClientset(cm)
payload, err := NewConfigMapPayload(cm)
source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
KubeletConfigKey: "kubelet",
}}
expectPayload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
makeSource := func(source *apiv1.NodeConfigSource) RemoteConfigSource {
s, _, err := NewRemoteConfigSource(source)
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
return s
missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
if err := hasStore.Add(cm); err != nil {
t.Fatalf("unexpected error constructing hasStore")
}
missingClient := fakeclient.NewSimpleClientset()
hasClient := fakeclient.NewSimpleClientset(cm)
cases := []struct {
desc string
source RemoteConfigSource
expect Payload
client clientset.Interface
store cache.Store
err string
}{
{
desc: "object doesn't exist",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "bogus",
Namespace: "namespace",
UID: "bogus",
KubeletConfigKey: "kubelet",
}}),
expect: nil,
desc: "nil store, object does not exist in API server",
client: missingClient,
err: "not found",
},
{
desc: "UID is incorrect for namespace/name",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "bogus",
KubeletConfigKey: "kubelet",
}}),
expect: nil,
err: "does not match",
desc: "nil store, object exists in API server",
client: hasClient,
},
{
desc: "object exists and reference is correct",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}}),
expect: payload,
err: "",
desc: "object exists in store and API server",
store: hasStore,
client: hasClient,
},
{
desc: "object exists in store, but does not exist in API server",
store: hasStore,
client: missingClient,
},
{
desc: "object does not exist in store, but exists in API server",
store: missingStore,
client: hasClient,
},
{
desc: "object does not exist in store or API server",
client: missingClient,
store: missingStore,
err: "not found",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, _, err := c.source.Download(client)
// deep copy so we can always check the UID/ResourceVersion are set after Download
s, _, err := NewRemoteConfigSource(source.DeepCopy())
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
// attempt download
p, _, err := s.Download(c.client, c.store)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(c.expect.object(), payload.object()) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(payload))
if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) {
t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p))
}
// source UID and ResourceVersion should be updated by Download
if p.UID() != s.UID() {
t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID())
}
if p.ResourceVersion() != s.ResourceVersion() {
t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion())
}
})
}

View File

@ -75,32 +75,46 @@ func (s *fsStore) Initialize() error {
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(c checkpoint.RemoteConfigSource) (bool, error) {
func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v"
if len(source.UID()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous")
}
if len(source.ResourceVersion()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous")
}
// we check whether the directory was created for the resource
uid := c.UID()
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(uid))
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion()))
if err != nil {
return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err)
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err)
}
return ok, nil
}
func (s *fsStore) Save(c checkpoint.Payload) error {
func (s *fsStore) Save(payload checkpoint.Payload) error {
// Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty
path := s.checkpointPath(payload.UID(), payload.ResourceVersion())
// ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replacee
// to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap
if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil {
return err
}
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, s.checkpointPath(c.UID()), c.Files())
return utilfiles.ReplaceDir(s.fs, path, payload.Files())
}
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s:%s", source.APIPath(), source.UID())
sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, fmt.Errorf("failed to determine if a checkpoint exists for source %s", sourceFmt)
return nil, err
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// load the kubelet config file
utillog.Infof("loading kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID()), source.KubeletFilename()))
utillog.Infof("loading Kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename()))
if err != nil {
return nil, err
}
@ -140,8 +154,8 @@ func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
func (s *fsStore) checkpointPath(uid string) string {
return filepath.Join(s.dir, checkpointsDir, uid)
func (s *fsStore) checkpointPath(uid, resourceVersion string) string {
return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion)
}
func (s *fsStore) metaPath(name string) string {
@ -163,6 +177,14 @@ func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoin
if source == nil {
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// check that UID and ResourceVersion are non-empty,
// error to save reference if the checkpoint can't be fully resolved
if source.UID() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous")
}
if source.ResourceVersion() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous")
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {

View File

@ -83,8 +83,8 @@ func TestFsStoreInitialize(t *testing.T) {
}
// check that checkpoints dir exists
if _, err := store.fs.Stat(store.checkpointPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.checkpointPath(""), err)
if _, err := store.fs.Stat(filepath.Join(store.dir, checkpointsDir)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", filepath.Join(store.dir, checkpointsDir), err)
}
// check that assignedFile exists
@ -105,21 +105,29 @@ func TestFsStoreExists(t *testing.T) {
}
// checkpoint a payload
const uid = "uid"
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid}})
const (
uid = "uid"
resourceVersion = "1"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid, ResourceVersion: resourceVersion}})
if err != nil {
t.Fatalf("could not construct checkpoint, error: %v", err)
t.Fatalf("could not construct Payload, error: %v", err)
}
if err := store.Save(p); err != nil {
t.Fatalf("unexpected error: %v", err)
}
store.Save(p)
cases := []struct {
desc string
uid types.UID
resourceVersion string
expect bool
err string
}{
{"exists", uid, true, ""},
{"does not exist", "bogus-uid", false, ""},
{"exists", uid, resourceVersion, true, ""},
{"does not exist", "bogus-uid", "bogus-resourceVersion", false, ""},
{"ambiguous UID", "", "bogus-resourceVersion", false, "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", false, "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
@ -129,10 +137,11 @@ func TestFsStoreExists(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
t.Fatalf("unexpected error: %v", err)
}
ok, err := store.Exists(source)
utiltest.ExpectError(t, err, c.err)
@ -160,38 +169,44 @@ func TestFsStoreSave(t *testing.T) {
return s
}()
const (
uid = "uid"
resourceVersion = "1"
)
cases := []struct {
desc string
uid types.UID
resourceVersion string
files map[string]string
err string
}{
{"valid payload", map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
{"valid payload", uid, resourceVersion, map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", uid, resourceVersion, map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", uid, resourceVersion, map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", uid, resourceVersion, map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", uid, resourceVersion, map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", uid, resourceVersion, map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", uid, resourceVersion, map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// construct the payload
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: "uid"},
ObjectMeta: metav1.ObjectMeta{UID: c.uid, ResourceVersion: c.resourceVersion},
Data: c.files,
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
// if no error, save the payload, otherwise skip straight to error handler
if err == nil {
err = store.Save(p)
}
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// read the saved checkpoint
m, err := mapFromCheckpoint(store, p.UID())
m, err := mapFromCheckpoint(store, p.UID(), p.ResourceVersion())
if err != nil {
t.Fatalf("error loading checkpoint to map: %v", err)
}
@ -221,10 +236,11 @@ func TestFsStoreLoad(t *testing.T) {
// construct a payload that contains the kubeletconfig
const (
uid = "uid"
resourceVersion = "1"
kubeletKey = "kubelet"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)},
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), ResourceVersion: resourceVersion},
Data: map[string]string{
kubeletKey: string(data),
},
@ -241,10 +257,13 @@ func TestFsStoreLoad(t *testing.T) {
cases := []struct {
desc string
uid types.UID
resourceVersion string
err string
}{
{"checkpoint exists", uid, ""},
{"checkpoint does not exist", "bogus-uid", "no checkpoint for source"},
{"checkpoint exists", uid, resourceVersion, ""},
{"checkpoint does not exist", "bogus-uid", "bogus-resourceVersion", "no checkpoint for source"},
{"ambiguous UID", "", "bogus-resourceVersion", "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
@ -253,10 +272,11 @@ func TestFsStoreLoad(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: kubeletKey,
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
t.Fatalf("unexpected error: %v", err)
}
loaded, err := store.Load(source)
utiltest.ExpectError(t, err, c.err)
@ -389,35 +409,80 @@ func TestFsStoreSetAssigned(t *testing.T) {
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
uid: %s
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: types.UID(uid),
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}})
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// save the assigned source
if err := store.SetAssigned(source); err != nil {
t.Fatalf("unexpected error: %v", err)
source = s
}
// save the assigned source
err = store.SetAssigned(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, assignedFile)
if expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", expect, string(data))
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
@ -427,35 +492,80 @@ func TestFsStoreSetLastKnownGood(t *testing.T) {
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
uid: %s
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: types.UID(uid),
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}})
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// save the last known good source
if err := store.SetLastKnownGood(source); err != nil {
t.Fatalf("unexpected error: %v", err)
source = s
}
// save the assigned source
err = store.SetLastKnownGood(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, lastKnownGoodFile)
if expect != string(data) {
t.Errorf("expect last-known-good source file to contain %q, but got %q", expect, string(data))
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
@ -536,107 +646,8 @@ func TestFsStoreReset(t *testing.T) {
}
}
func TestFsStoreReadRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
const name = "some-source-file"
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
saveTestSourceFile(t, store, name, c.expect)
source, err := readRemoteConfigSource(store.fs, store.metaPath(name))
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreWriteRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
source checkpoint.RemoteConfigSource
}{
{"nil source", nil},
{"non-nil source", source},
}
const name = "some-source-file"
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// set the source file
err := writeRemoteConfigSource(store.fs, store.metaPath(name), c.source)
if err != nil {
t.Fatalf("unable to set source file, error: %v", err)
}
// read back the file
data := readTestSourceFile(t, store, name)
str := string(data)
if c.source != nil {
// expect the contents to match the encoding of the source
data, err := c.source.Encode()
expect := string(data)
if err != nil {
t.Fatalf("couldn't encode source, error: %v", err)
}
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
} else {
// expect empty file
expect := ""
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
}
})
}
}
func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid))
func mapFromCheckpoint(store *fsStore, uid, resourceVersion string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid, resourceVersion))
if err != nil {
return nil, err
}
@ -647,7 +658,7 @@ func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) {
return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid)
}
// read the file contents and build the map
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid), f.Name()))
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid, resourceVersion), f.Name()))
if err != nil {
return nil, err
}

View File

@ -30,6 +30,7 @@ type Store interface {
Initialize() error
// Exists returns true if the object referenced by `source` has been checkpointed.
// The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:

View File

@ -37,10 +37,10 @@ import (
const (
// KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration
KubeletConfigChangedEventReason = "KubeletConfigChanged"
// EventMessageFmt is the message format for Kubelet config change events
EventMessageFmt = "Kubelet will restart to use: %s"
// LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults)
LocalConfigMessage = "local config"
// LocalEventMessage is sent when the Kubelet restarts to use local config
LocalEventMessage = "Kubelet restarting to use local config"
// RemoteEventMessageFmt is sent when the Kubelet restarts to use a remote config
RemoteEventMessageFmt = "Kubelet restarting to use %s, UID: %s, ResourceVersion: %s, KubeletConfigKey: %s"
)
// pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done
@ -69,111 +69,116 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v
}
}()
node, err := latestNode(cc.informer.GetStore(), nodeName)
// get the latest Node.Spec.ConfigSource from the informer
source, err := latestNodeConfigSource(cc.nodeInformer.GetStore(), nodeName)
if err != nil {
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, status.InternalError))
syncerr = fmt.Errorf("%s, error: %v", status.InternalError, err)
return
}
// check the Node and download any new config
if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil {
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, reason))
// a nil source simply means we reset to local defaults
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset assigned and last-known-good to defaults")
if updated, reason, err := cc.resetConfig(); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
path := LocalConfigMessage
if cur != nil {
path = cur.APIPath()
restartForNewConfig(eventClient, nodeName, nil)
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
return
}
os.Exit(0)
// a non-nil source means we should attempt to download the config, and checkpoint it if necessary
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
// TODO(mtaufen): It would be nice if we could check the payload's metadata before (re)downloading the whole payload
// we at least try pulling the latest configmap out of the local informer store.
// construct the interface that can dynamically dispatch the correct Download, etc. methods for the given source type
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// "download" source, either from informer's in-memory store or directly from the API server, if the informer doesn't have a copy
payload, reason, err := cc.downloadConfigPayload(client, remote)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// save a checkpoint for the payload, if one does not already exist
if reason, err := cc.saveConfigCheckpoint(remote, payload); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// update the local, persistent record of assigned config
if updated, reason, err := cc.setAssignedConfig(remote); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
restartForNewConfig(eventClient, nodeName, remote)
}
// If we get here:
// - there is no need to restart to update the current config
// - there is no need to restart to use new config
// - there was no error trying to sync configuration
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the status
cc.configStatus.SetErrorOverride("")
}
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,
// depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result
func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) {
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults")
updated, reason, err := cc.resetConfig()
if err != nil {
return false, nil, reason, err
// Note: source has up-to-date uid and resourceVersion after calling downloadConfigPayload.
func (cc *Controller) downloadConfigPayload(client clientset.Interface, source checkpoint.RemoteConfigSource) (checkpoint.Payload, string, error) {
var store cache.Store
if cc.remoteConfigSourceInformer != nil {
store = cc.remoteConfigSourceInformer.GetStore()
}
return updated, nil, "", nil
}
// if the NodeConfigSource is non-nil, download the config
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
return false, nil, reason, err
}
reason, err = cc.checkpointConfigSource(client, remote)
if err != nil {
return false, nil, reason, err
}
updated, reason, err := cc.setAssignedConfig(remote)
if err != nil {
return false, nil, reason, err
}
return updated, remote, "", nil
return source.Download(client, store)
}
// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist,
// if a failure occurs, returns a sanitized failure reason and an error
func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) {
// if the checkpoint already exists, skip downloading
if ok, err := cc.checkpointStore.Exists(source); err != nil {
func (cc *Controller) saveConfigCheckpoint(source checkpoint.RemoteConfigSource, payload checkpoint.Payload) (string, error) {
ok, err := cc.checkpointStore.Exists(source)
if err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
} else if ok {
// TODO(mtaufen): update this to include ResourceVersion in #63221
utillog.Infof("checkpoint already exists for object %s with UID %s, skipping download", source.APIPath(), source.UID())
}
if ok {
utillog.Infof("checkpoint already exists for %s, UID: %s, ResourceVersion: %s", source.APIPath(), payload.UID(), payload.ResourceVersion())
return "", nil
}
// download
payload, reason, err := source.Download(client)
if err != nil {
return reason, fmt.Errorf("%s, error: %v", reason, err)
}
// save
err = cc.checkpointStore.Save(payload)
if err != nil {
if err := cc.checkpointStore.Save(payload); err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
return "", nil
}
// setAssignedConfig updates the assigned checkpoint config in the store.
// Returns whether the current config changed as a result, or a sanitized failure reason and an error.
// Returns whether the assigned config changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setAssignedConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
current, err := cc.checkpointStore.Assigned()
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return false, status.InternalError, err
}
if err := cc.checkpointStore.SetAssigned(source); err != nil {
return false, status.InternalError, err
}
return !checkpoint.EqualRemoteConfigSources(current, source), "", nil
return !checkpoint.EqualRemoteConfigSources(assigned, source), "", nil
}
// resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the current checkpoint changed as a result, or a sanitized failure reason and an error.
// resetConfig resets the assigned and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the assigned checkpoint changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) resetConfig() (bool, string, error) {
updated, err := cc.checkpointStore.Reset()
if err != nil {
@ -182,8 +187,26 @@ func (cc *Controller) resetConfig() (bool, string, error) {
return updated, "", nil
}
// latestNode returns the most recent Node with `nodeName` from `store`
func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
// restartForNewConfig presumes the Kubelet is managed by a babysitter, e.g. systemd
// It will send an event before exiting.
func restartForNewConfig(eventClient v1core.EventsGetter, nodeName string, source checkpoint.RemoteConfigSource) {
message := LocalEventMessage
if source != nil {
message = fmt.Sprintf(RemoteEventMessageFmt, source.APIPath(), source.UID(), source.ResourceVersion(), source.KubeletFilename())
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := makeEvent(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, message)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
utillog.Infof(message)
os.Exit(0)
}
// latestNodeConfigSource returns a copy of the most recent NodeConfigSource from the Node with `nodeName` in `store`
func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConfigSource, error) {
obj, ok, err := store.GetByKey(nodeName)
if err != nil {
err := fmt.Errorf("failed to retrieve Node %q from informer's store, error: %v", nodeName, err)
@ -200,13 +223,11 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
utillog.Errorf(err.Error())
return nil, err
}
return node, nil
}
// eventf constructs and returns an event containing a formatted message
// similar to k8s.io/client-go/tools/record/event.go
func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event {
return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...))
// Copy the source, so anyone who modifies it after here doesn't mess up the informer's store!
// This was previously the cause of a bug that made the Kubelet frequently resync config; Download updated
// the UID and ResourceVersion on the NodeConfigSource, but the pointer was still drilling all the way
// into the informer's copy!
return node.Spec.ConfigSource.DeepCopy(), nil
}
// makeEvent constructs an event

View File

@ -21,6 +21,7 @@ import (
"path/filepath"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -65,8 +66,11 @@ type Controller struct {
// configStatus manages the status we report on the Node object
configStatus status.NodeConfigStatus
// informer is the informer that watches the Node object
informer cache.SharedInformer
// nodeInformer is the informer that watches the Node object
nodeInformer cache.SharedInformer
// remoteConfigSourceInformer is the informer that watches the assigned config source
remoteConfigSourceInformer cache.SharedInformer
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
@ -161,51 +165,80 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
return nil, err
}
// update the active source to the non-nil last-known-good source
// set status to indicate the active source is the non-nil last-known-good source
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
return lastKnownGoodConfig, nil
}
// StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty.
// It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops
// if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
// The clients must be non-nil, and the nodeName must be non-empty.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
const errFmt = "cannot start Kubelet config sync: %s"
if client == nil {
utillog.Infof("nil client, will not start sync loops")
return
} else if len(nodeName) == 0 {
utillog.Errorf("cannot start sync loops with empty nodeName")
return
return fmt.Errorf(errFmt, "nil client")
}
if eventClient == nil {
return fmt.Errorf(errFmt, "nil event client")
}
if nodeName == "" {
return fmt.Errorf(errFmt, "empty nodeName")
}
// start the status sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
cc.informer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
// start the informer loop
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
// we use HandlePanic to manually call the panic handlers and then crash.
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
// of a Go runtime error.
go utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer sync loop")
cc.informer.Run(wait.NeverStop)
})()
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
// This was EVIL to debug (difficult to see missing `()`).
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
// start the config source sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting config source sync loop")
// status sync worker
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
// remote config source informer, if we have a remote source to watch
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return fmt.Errorf(errFmt, err)
} else if assignedSource == nil {
utillog.Infof("local source is assigned, will not start remote config source informer")
} else {
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
AddFunc: cc.onAddRemoteConfigSourceEvent,
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
},
)
}
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
if cc.remoteConfigSourceInformer != nil {
utillog.Infof("starting remote config source informer")
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
}
})
// node informer
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
nodeInformerFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer")
cc.nodeInformer.Run(wait.NeverStop)
})
// config sync worker
configSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Kubelet config sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
})
go statusSyncLoopFunc()
go remoteConfigSourceInformerFunc()
go nodeInformerFunc()
go configSyncLoopFunc()
return nil
}
// loadConfig loads Kubelet config from a checkpoint
@ -242,7 +275,6 @@ func (cc *Controller) checkTrial(duration time.Duration) {
if trial, err := cc.inTrial(duration); err != nil {
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
} else if !trial {
utillog.Infof("assigned config passed trial period, will set as last-known-good")
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
}
@ -265,17 +297,28 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the assigned config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
// get the assigned config
// get assigned
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return err
}
// update the last-known-good config
// get last-known-good
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return err
}
// if the sources are equal, no need to change
if assigned == lastKnownGood ||
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) {
return nil
}
// update last-known-good
err = cc.checkpointStore.SetLastKnownGood(assigned)
if err != nil {
return err
}
// update the status to reflect the new last-known-good config
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
return nil
}

View File

@ -80,9 +80,12 @@ type nodeConfigStatus struct {
// NewNodeConfigStatus returns a new NodeConfigStatus interface
func NewNodeConfigStatus() NodeConfigStatus {
return &nodeConfigStatus{
// channels must have capacity at least 1, since we signal with non-blocking writes
syncCh: make(chan bool, 1),
syncCh := make(chan bool, 1)
// prime new status managers to sync with the API server on the first call to Sync
syncCh <- true
return &nodeConfigStatus{
syncCh: syncCh,
}
}
@ -142,6 +145,8 @@ func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
return
}
utillog.Infof("updating Node.Status.Config")
// grab the lock
s.mux.Lock()
defer s.mux.Unlock()

View File

@ -86,6 +86,7 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
}
if oldObj == nil {
// Node was just added, need to sync
utillog.Infof("initial Node watch event")
cc.pokeConfigSourceWorker()
return
}
@ -95,31 +96,59 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
return
}
if !apiequality.Semantic.DeepEqual(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) {
utillog.Infof("Node.Spec.ConfigSource was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteNodeEvent logs a message if the Node was deleted and may log errors
// if an unexpected DeletedFinalStateUnknown was received.
// onDeleteNodeEvent logs a message if the Node was deleted
// We allow the sync-loop to continue, because it is possible that the Kubelet detected
// a Node with unexpected externalID and is attempting to delete and re-create the Node
// (see pkg/kubelet/kubelet_node_status.go), or that someone accidentally deleted the Node
// (the Kubelet will re-create it).
func (cc *Controller) onDeleteNodeEvent(deletedObj interface{}) {
node, ok := deletedObj.(*apiv1.Node)
if !ok {
tombstone, ok := deletedObj.(cache.DeletedFinalStateUnknown)
if !ok {
utillog.Errorf("couldn't cast deleted object to DeletedFinalStateUnknown, object: %+v", deletedObj)
return
}
node, ok = tombstone.Obj.(*apiv1.Node)
if !ok {
utillog.Errorf("received DeletedFinalStateUnknown object but it did not contain a Node, object: %+v", deletedObj)
return
}
utillog.Infof("Node was deleted (DeletedFinalStateUnknown), sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
return
}
utillog.Infof("Node was deleted, sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
// For this case, we just log the event.
// We don't want to poke the worker, because a temporary deletion isn't worth reporting an error for.
// If the Node is deleted because the VM is being deleted, then the Kubelet has nothing to do.
utillog.Infof("Node was deleted")
}
// onAddRemoteConfigSourceEvent calls onUpdateConfigMapEvent with the new object and a nil old object
func (cc *Controller) onAddRemoteConfigSourceEvent(newObj interface{}) {
cc.onUpdateRemoteConfigSourceEvent(nil, newObj)
}
// onUpdateRemoteConfigSourceEvent checks whether the configSource changed between oldObj and newObj,
// and pokes the sync worker if there was a change
func (cc *Controller) onUpdateRemoteConfigSourceEvent(oldObj interface{}, newObj interface{}) {
// since ConfigMap is currently the only source type, we handle that here
newConfigMap, ok := newObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("failed to cast new object to ConfigMap, couldn't handle event")
return
}
if oldObj == nil {
// ConfigMap was just added, need to sync
utillog.Infof("initial ConfigMap watch event")
cc.pokeConfigSourceWorker()
return
}
oldConfigMap, ok := oldObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("failed to cast old object to ConfigMap, couldn't handle event")
return
}
if !apiequality.Semantic.DeepEqual(oldConfigMap, newConfigMap) {
utillog.Infof("assigned ConfigMap was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteRemoteConfigSourceEvent logs a message if the ConfigMap was deleted and pokes the sync worker
func (cc *Controller) onDeleteRemoteConfigSourceEvent(deletedObj interface{}) {
// If the ConfigMap we're watching is deleted, we log the event and poke the sync worker.
// This requires a sync, because if the Node is still configured to use the deleted ConfigMap,
// the Kubelet should report a DownloadError.
utillog.Infof("assigned ConfigMap was deleted")
cc.pokeConfigSourceWorker()
}

File diff suppressed because it is too large Load Diff

View File

@ -289,9 +289,6 @@ func TestNodeAuthorizer(t *testing.T) {
ConfigMap: &api.ConfigMapNodeConfigSource{
Namespace: "ns",
Name: "myconfigmapconfigsource",
// validation just requires UID to be non-empty and it isn't necessary for GET,
// so we just use a bogus one for the test
UID: "uid",
KubeletConfigKey: "kubelet",
},
}