dynamic Kubelet config reconciles ConfigMap updates

pull/8/head
Michael Taufen 2018-04-16 15:15:03 -07:00
parent 835afe683f
commit b5648c3f61
16 changed files with 1303 additions and 694 deletions

View File

@ -619,7 +619,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName))
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
if kubeDeps.Auth == nil {

View File

@ -4150,11 +4150,10 @@ func validateNodeConfigSourceSpec(source *core.NodeConfigSource, fldPath *field.
// validation specific to Node.Spec.ConfigSource.ConfigMap
func validateConfigMapNodeConfigSourceSpec(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
// TODO(#61643): Prevent ref.UID from being set here when we switch from requiring UID to respecting all ConfigMap updates
if string(source.UID) == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in spec"))
// uid and resourceVersion must not be set in spec
if string(source.UID) != "" {
allErrs = append(allErrs, field.Forbidden(fldPath.Child("uid"), "uid must not be set in spec"))
}
// resourceVersion must not be set in spec
if source.ResourceVersion != "" {
allErrs = append(allErrs, field.Forbidden(fldPath.Child("resourceVersion"), "resourceVersion must not be set in spec"))
}
@ -4196,12 +4195,13 @@ func validateNodeConfigSourceStatus(source *core.NodeConfigSource, fldPath *fiel
// validation specific to Node.Status.Config.(Active|Assigned|LastKnownGood).ConfigMap
func validateConfigMapNodeConfigSourceStatus(source *core.ConfigMapNodeConfigSource, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
// uid and resourceVersion must be set in status
if string(source.UID) == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("uid"), "uid must be set in status"))
}
// TODO(#63221): require ResourceVersion in status when we start respecting ConfigMap mutations (the Kubelet isn't tracking it internally until
// that PR, which makes it difficult to report for now).
if source.ResourceVersion == "" {
allErrs = append(allErrs, field.Required(fldPath.Child("resourceVersion"), "resourceVersion must be set in status"))
}
return append(allErrs, validateConfigMapNodeConfigSource(source, fldPath)...)
}

View File

@ -19,7 +19,9 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
@ -40,8 +42,11 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -34,9 +34,11 @@ var _ Payload = (*configMapPayload)(nil)
// NewConfigMapPayload constructs a Payload backed by a ConfigMap, which must have a non-empty UID
func NewConfigMapPayload(cm *apiv1.ConfigMap) (Payload, error) {
if cm == nil {
return nil, fmt.Errorf("ConfigMap must be non-nil to be a Payload")
} else if len(cm.ObjectMeta.UID) == 0 {
return nil, fmt.Errorf("ConfigMap must have a UID to be a Payload")
return nil, fmt.Errorf("ConfigMap must be non-nil")
} else if cm.ObjectMeta.UID == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty UID")
} else if cm.ObjectMeta.ResourceVersion == "" {
return nil, fmt.Errorf("ConfigMap must have a non-empty ResourceVersion")
}
return &configMapPayload{cm}, nil
@ -46,6 +48,10 @@ func (p *configMapPayload) UID() string {
return string(p.cm.UID)
}
func (p *configMapPayload) ResourceVersion() string {
return p.cm.ResourceVersion
}
func (p *configMapPayload) Files() map[string]string {
return p.cm.Data
}

View File

@ -34,19 +34,46 @@ func TestNewConfigMapPayload(t *testing.T) {
cm *apiv1.ConfigMap
err string
}{
{"nil v1/ConfigMap", nil, "must be non-nil"},
{"empty v1/ConfigMap", &apiv1.ConfigMap{}, "must have a UID"},
{"populated v1/ConfigMap",
&apiv1.ConfigMap{
{
desc: "nil",
cm: nil,
err: "ConfigMap must be non-nil",
},
{
desc: "missing uid",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
ResourceVersion: "rv",
},
},
err: "ConfigMap must have a non-empty UID",
},
{
desc: "missing resourceVersion",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
},
},
err: "ConfigMap must have a non-empty ResourceVersion",
},
{
desc: "populated v1/ConfigMap",
cm: &apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
UID: "uid",
ResourceVersion: "rv",
},
Data: map[string]string{
"key1": "value1",
"key2": "value2",
},
}, ""},
},
err: "",
},
}
for _, c := range cases {
@ -66,7 +93,7 @@ func TestNewConfigMapPayload(t *testing.T) {
func TestConfigMapPayloadUID(t *testing.T) {
const expect = "uid"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect}})
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: expect, ResourceVersion: "rv"}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
@ -76,6 +103,18 @@ func TestConfigMapPayloadUID(t *testing.T) {
}
}
func TestConfigMapPayloadResourceVersion(t *testing.T) {
const expect = "rv"
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: expect}})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
resourceVersion := payload.ResourceVersion()
if expect != resourceVersion {
t.Errorf("expect %q, but got %q", expect, resourceVersion)
}
}
func TestConfigMapPayloadFiles(t *testing.T) {
cases := []struct {
desc string
@ -96,7 +135,7 @@ func TestConfigMapPayloadFiles(t *testing.T) {
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid"}, Data: c.data})
payload, err := NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: "uid", ResourceVersion: "rv"}, Data: c.data})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}

View File

@ -18,12 +18,18 @@ package checkpoint
import (
"fmt"
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/scheme"
kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/v1beta1"
@ -35,8 +41,13 @@ import (
// Payload represents a local copy of a config source (payload) object
type Payload interface {
// UID returns a globally unique (space and time) identifier for the payload.
// The return value is guaranteed non-empty.
UID() string
// ResourceVersion returns a resource version for the payload.
// The return value is guaranteed non-empty.
ResourceVersion() string
// Files returns a map of filenames to file contents.
Files() map[string]string
@ -46,16 +57,29 @@ type Payload interface {
// RemoteConfigSource represents a remote config source object that can be downloaded as a Checkpoint
type RemoteConfigSource interface {
// UID returns a globally unique identifier of the source described by the remote config source object
UID() string
// KubeletFilename returns the name of the Kubelet config file as it should appear in the keys of Payload.Files()
KubeletFilename() string
// APIPath returns the API path to the remote resource, e.g. its SelfLink
APIPath() string
// Download downloads the remote config source object returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails
Download(client clientset.Interface) (Payload, string, error)
// Encode returns a []byte representation of the NodeConfigSource behind the RemoteConfigSource
// UID returns the globally unique identifier for the most recently downloaded payload targeted by the source.
UID() string
// ResourceVersion returns the resource version of the most recently downloaded payload targeted by the source.
ResourceVersion() string
// Download downloads the remote config source's target object and returns a Payload backed by the object,
// or a sanitized failure reason and error if the download fails.
// Download takes an optional store as an argument. If provided, Download will check this store for the
// target object prior to contacting the API server.
// Download updates the local UID and ResourceVersion tracked by this source, based on the downloaded payload.
Download(client clientset.Interface, store cache.Store) (Payload, string, error)
// Informer returns an informer that can be used to detect changes to the remote config source
Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer
// Encode returns a []byte representation of the object behind the RemoteConfigSource
Encode() ([]byte, error)
// NodeConfigSource returns a copy of the underlying apiv1.NodeConfigSource object.
@ -104,7 +128,11 @@ func DecodeRemoteConfigSource(data []byte) (RemoteConfigSource, error) {
// we use the v1.NodeConfigSource type on internal and external, so no need to convert to external here
source, _, err := NewRemoteConfigSource(&cs.Source)
return source, err
if err != nil {
return nil, err
}
return source, nil
}
// EqualRemoteConfigSources is a helper for comparing remote config sources by
@ -123,10 +151,6 @@ type remoteConfigMap struct {
var _ RemoteConfigSource = (*remoteConfigMap)(nil)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
func (r *remoteConfigMap) KubeletFilename() string {
return r.source.ConfigMap.KubeletConfigKey
}
@ -138,32 +162,82 @@ func (r *remoteConfigMap) APIPath() string {
return fmt.Sprintf(configMapAPIPathFmt, ref.Namespace, ref.Name)
}
func (r *remoteConfigMap) Download(client clientset.Interface) (Payload, string, error) {
var reason string
uid := string(r.source.ConfigMap.UID)
func (r *remoteConfigMap) UID() string {
return string(r.source.ConfigMap.UID)
}
utillog.Infof("attempting to download ConfigMap with UID %q", uid)
func (r *remoteConfigMap) ResourceVersion() string {
return r.source.ConfigMap.ResourceVersion
}
// get the ConfigMap via namespace/name, there doesn't seem to be a way to get it by UID
cm, err := client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
func (r *remoteConfigMap) Download(client clientset.Interface, store cache.Store) (Payload, string, error) {
var (
cm *apiv1.ConfigMap
err error
)
// check the in-memory store for the ConfigMap, so we can skip unnecessary downloads
if store != nil {
utillog.Infof("checking in-memory store for %s", r.APIPath())
cm, err = getConfigMapFromStore(store, r.source.ConfigMap.Namespace, r.source.ConfigMap.Name)
if err != nil {
// just log the error, we'll attempt a direct download instead
utillog.Errorf("failed to check in-memory store for %s, error: %v", r.APIPath(), err)
} else if cm != nil {
utillog.Infof("found %s in in-memory store, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} else {
utillog.Infof("did not find %s in in-memory store", r.APIPath())
}
}
// if we didn't find the ConfigMap in the in-memory store, download it from the API server
if cm == nil {
utillog.Infof("attempting to download %s", r.APIPath())
cm, err = client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Get(r.source.ConfigMap.Name, metav1.GetOptions{})
if err != nil {
return nil, status.DownloadError, fmt.Errorf("%s, error: %v", status.DownloadError, err)
}
// ensure that UID matches the UID on the source
if r.source.ConfigMap.UID != cm.UID {
reason = fmt.Sprintf(status.UIDMismatchErrorFmt, r.source.ConfigMap.UID, r.APIPath(), cm.UID)
return nil, reason, fmt.Errorf(reason)
}
utillog.Infof("successfully downloaded %s, UID: %s, ResourceVersion: %s", r.APIPath(), cm.UID, cm.ResourceVersion)
} // Assert: Now we have a non-nil ConfigMap
// construct Payload from the ConfigMap
payload, err := NewConfigMapPayload(cm)
if err != nil {
reason = fmt.Sprintf("invalid downloaded object")
return nil, reason, fmt.Errorf("%s, error: %v", reason, err)
// We only expect an error here if ObjectMeta is lacking UID or ResourceVersion. This should
// never happen on objects in the informer's store, or objects downloaded from the API server
// directly, so we report InternalError.
return nil, status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
// update internal UID and ResourceVersion based on latest ConfigMap
r.source.ConfigMap.UID = cm.UID
r.source.ConfigMap.ResourceVersion = cm.ResourceVersion
return payload, "", nil
}
utillog.Infof("successfully downloaded ConfigMap with UID %q", uid)
return payload, "", nil
func (r *remoteConfigMap) Informer(client clientset.Interface, handler cache.ResourceEventHandlerFuncs) cache.SharedInformer {
// select ConfigMap by name
fieldselector := fields.OneTermEqualSelector("metadata.name", r.source.ConfigMap.Name)
// add some randomness to resync period, which can help avoid controllers falling into lock-step
minResyncPeriod := 15 * time.Minute
factor := rand.Float64() + 1
resyncPeriod := time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (kuberuntime.Object, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).List(metav1.ListOptions{
FieldSelector: fieldselector.String(),
})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return client.CoreV1().ConfigMaps(r.source.ConfigMap.Namespace).Watch(metav1.ListOptions{
FieldSelector: fieldselector.String(),
ResourceVersion: options.ResourceVersion,
})
},
}
informer := cache.NewSharedInformer(lw, &apiv1.ConfigMap{}, resyncPeriod)
informer.AddEventHandler(handler)
return informer
}
func (r *remoteConfigMap) Encode() ([]byte, error) {
@ -182,3 +256,18 @@ func (r *remoteConfigMap) Encode() ([]byte, error) {
func (r *remoteConfigMap) NodeConfigSource() *apiv1.NodeConfigSource {
return r.source.DeepCopy()
}
func getConfigMapFromStore(store cache.Store, namespace, name string) (*apiv1.ConfigMap, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
obj, ok, err := store.GetByKey(key)
if err != nil || !ok {
return nil, err
}
cm, ok := obj.(*apiv1.ConfigMap)
if !ok {
err := fmt.Errorf("failed to cast object %s from informer's store to ConfigMap", key)
utillog.Errorf(err.Error())
return nil, err
}
return cm, nil
}

View File

@ -25,7 +25,9 @@ import (
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
@ -122,72 +124,90 @@ func TestRemoteConfigMapDownload(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
}}
client := fakeclient.NewSimpleClientset(cm)
payload, err := NewConfigMapPayload(cm)
source := &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
KubeletConfigKey: "kubelet",
}}
expectPayload, err := NewConfigMapPayload(cm)
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
makeSource := func(source *apiv1.NodeConfigSource) RemoteConfigSource {
s, _, err := NewRemoteConfigSource(source)
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
return s
missingStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
hasStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
if err := hasStore.Add(cm); err != nil {
t.Fatalf("unexpected error constructing hasStore")
}
missingClient := fakeclient.NewSimpleClientset()
hasClient := fakeclient.NewSimpleClientset(cm)
cases := []struct {
desc string
source RemoteConfigSource
expect Payload
client clientset.Interface
store cache.Store
err string
}{
{
desc: "object doesn't exist",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "bogus",
Namespace: "namespace",
UID: "bogus",
KubeletConfigKey: "kubelet",
}}),
expect: nil,
desc: "nil store, object does not exist in API server",
client: missingClient,
err: "not found",
},
{
desc: "UID is incorrect for namespace/name",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "bogus",
KubeletConfigKey: "kubelet",
}}),
expect: nil,
err: "does not match",
desc: "nil store, object exists in API server",
client: hasClient,
},
{
desc: "object exists and reference is correct",
source: makeSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}}),
expect: payload,
err: "",
desc: "object exists in store and API server",
store: hasStore,
client: hasClient,
},
{
desc: "object exists in store, but does not exist in API server",
store: hasStore,
client: missingClient,
},
{
desc: "object does not exist in store, but exists in API server",
store: missingStore,
client: hasClient,
},
{
desc: "object does not exist in store or API server",
client: missingClient,
store: missingStore,
err: "not found",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
payload, _, err := c.source.Download(client)
// deep copy so we can always check the UID/ResourceVersion are set after Download
s, _, err := NewRemoteConfigSource(source.DeepCopy())
if err != nil {
t.Fatalf("error constructing remote config source %v", err)
}
// attempt download
p, _, err := s.Download(c.client, c.store)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// downloaded object should match the expected
if !apiequality.Semantic.DeepEqual(c.expect.object(), payload.object()) {
t.Errorf("case %q, expect Checkpoint %s but got %s", c.desc, spew.Sdump(c.expect), spew.Sdump(payload))
if !apiequality.Semantic.DeepEqual(expectPayload.object(), p.object()) {
t.Errorf("expect Checkpoint %s but got %s", spew.Sdump(expectPayload), spew.Sdump(p))
}
// source UID and ResourceVersion should be updated by Download
if p.UID() != s.UID() {
t.Errorf("expect UID to be updated by Download to match payload: %s, but got source UID: %s", p.UID(), s.UID())
}
if p.ResourceVersion() != s.ResourceVersion() {
t.Errorf("expect ResourceVersion to be updated by Download to match payload: %s, but got source ResourceVersion: %s", p.ResourceVersion(), s.ResourceVersion())
}
})
}

View File

@ -75,32 +75,46 @@ func (s *fsStore) Initialize() error {
return utilfiles.EnsureDir(s.fs, filepath.Join(s.dir, checkpointsDir))
}
func (s *fsStore) Exists(c checkpoint.RemoteConfigSource) (bool, error) {
func (s *fsStore) Exists(source checkpoint.RemoteConfigSource) (bool, error) {
const errfmt = "failed to determine whether checkpoint exists for source %s, UID: %s, ResourceVersion: %s exists, error: %v"
if len(source.UID()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty UID is ambiguous")
}
if len(source.ResourceVersion()) == 0 {
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), "empty ResourceVersion is ambiguous")
}
// we check whether the directory was created for the resource
uid := c.UID()
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(uid))
ok, err := utilfiles.DirExists(s.fs, s.checkpointPath(source.UID(), source.ResourceVersion()))
if err != nil {
return false, fmt.Errorf("failed to determine whether checkpoint %q exists, error: %v", uid, err)
return false, fmt.Errorf(errfmt, source.APIPath(), source.UID(), source.ResourceVersion(), err)
}
return ok, nil
}
func (s *fsStore) Save(c checkpoint.Payload) error {
func (s *fsStore) Save(payload checkpoint.Payload) error {
// Note: Payload interface guarantees UID() and ResourceVersion() to be non-empty
path := s.checkpointPath(payload.UID(), payload.ResourceVersion())
// ensure the parent dir (checkpoints/uid) exists, since ReplaceDir requires the parent of the replacee
// to exist, and we checkpoint as checkpoints/uid/resourceVersion/files-from-configmap
if err := utilfiles.EnsureDir(s.fs, filepath.Dir(path)); err != nil {
return err
}
// save the checkpoint's files in the appropriate checkpoint dir
return utilfiles.ReplaceDir(s.fs, s.checkpointPath(c.UID()), c.Files())
return utilfiles.ReplaceDir(s.fs, path, payload.Files())
}
func (s *fsStore) Load(source checkpoint.RemoteConfigSource) (*kubeletconfig.KubeletConfiguration, error) {
sourceFmt := fmt.Sprintf("%s:%s", source.APIPath(), source.UID())
sourceFmt := fmt.Sprintf("%s, UID: %s, ResourceVersion: %s", source.APIPath(), source.UID(), source.ResourceVersion())
// check if a checkpoint exists for the source
if ok, err := s.Exists(source); err != nil {
return nil, fmt.Errorf("failed to determine if a checkpoint exists for source %s", sourceFmt)
return nil, err
} else if !ok {
return nil, fmt.Errorf("no checkpoint for source %s", sourceFmt)
}
// load the kubelet config file
utillog.Infof("loading kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID()), source.KubeletFilename()))
utillog.Infof("loading Kubelet configuration checkpoint for source %s", sourceFmt)
loader, err := configfiles.NewFsLoader(s.fs, filepath.Join(s.checkpointPath(source.UID(), source.ResourceVersion()), source.KubeletFilename()))
if err != nil {
return nil, err
}
@ -140,8 +154,8 @@ func (s *fsStore) Reset() (bool, error) {
return reset(s)
}
func (s *fsStore) checkpointPath(uid string) string {
return filepath.Join(s.dir, checkpointsDir, uid)
func (s *fsStore) checkpointPath(uid, resourceVersion string) string {
return filepath.Join(s.dir, checkpointsDir, uid, resourceVersion)
}
func (s *fsStore) metaPath(name string) string {
@ -163,6 +177,14 @@ func writeRemoteConfigSource(fs utilfs.Filesystem, path string, source checkpoin
if source == nil {
return utilfiles.ReplaceFile(fs, path, []byte{})
}
// check that UID and ResourceVersion are non-empty,
// error to save reference if the checkpoint can't be fully resolved
if source.UID() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty UID is ambiguous")
}
if source.ResourceVersion() == "" {
return fmt.Errorf("failed to write RemoteConfigSource, empty ResourceVersion is ambiguous")
}
// encode the source and save it to the file
data, err := source.Encode()
if err != nil {

View File

@ -83,8 +83,8 @@ func TestFsStoreInitialize(t *testing.T) {
}
// check that checkpoints dir exists
if _, err := store.fs.Stat(store.checkpointPath("")); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", store.checkpointPath(""), err)
if _, err := store.fs.Stat(filepath.Join(store.dir, checkpointsDir)); err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", filepath.Join(store.dir, checkpointsDir), err)
}
// check that assignedFile exists
@ -105,21 +105,29 @@ func TestFsStoreExists(t *testing.T) {
}
// checkpoint a payload
const uid = "uid"
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid}})
const (
uid = "uid"
resourceVersion = "1"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{UID: uid, ResourceVersion: resourceVersion}})
if err != nil {
t.Fatalf("could not construct checkpoint, error: %v", err)
t.Fatalf("could not construct Payload, error: %v", err)
}
if err := store.Save(p); err != nil {
t.Fatalf("unexpected error: %v", err)
}
store.Save(p)
cases := []struct {
desc string
uid types.UID
resourceVersion string
expect bool
err string
}{
{"exists", uid, true, ""},
{"does not exist", "bogus-uid", false, ""},
{"exists", uid, resourceVersion, true, ""},
{"does not exist", "bogus-uid", "bogus-resourceVersion", false, ""},
{"ambiguous UID", "", "bogus-resourceVersion", false, "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", false, "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
@ -129,10 +137,11 @@ func TestFsStoreExists(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
t.Fatalf("unexpected error: %v", err)
}
ok, err := store.Exists(source)
utiltest.ExpectError(t, err, c.err)
@ -160,38 +169,44 @@ func TestFsStoreSave(t *testing.T) {
return s
}()
const (
uid = "uid"
resourceVersion = "1"
)
cases := []struct {
desc string
uid types.UID
resourceVersion string
files map[string]string
err string
}{
{"valid payload", map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
{"valid payload", uid, resourceVersion, map[string]string{"foo": "foocontent", "bar": "barcontent"}, ""},
{"empty key name", uid, resourceVersion, map[string]string{"": "foocontent"}, "must not be empty"},
{"key name is not a base file name (foo/bar)", uid, resourceVersion, map[string]string{"foo/bar": "foocontent"}, "only base names are allowed"},
{"key name is not a base file name (/foo)", uid, resourceVersion, map[string]string{"/bar": "foocontent"}, "only base names are allowed"},
{"used .", uid, resourceVersion, map[string]string{".": "foocontent"}, "may not be '.' or '..'"},
{"used ..", uid, resourceVersion, map[string]string{"..": "foocontent"}, "may not be '.' or '..'"},
{"length violation", uid, resourceVersion, map[string]string{nameTooLong: "foocontent"}, "must be less than 255 characters"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// construct the payload
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: "uid"},
ObjectMeta: metav1.ObjectMeta{UID: c.uid, ResourceVersion: c.resourceVersion},
Data: c.files,
})
if err != nil {
t.Fatalf("error constructing payload: %v", err)
}
// save the payload
// if no error, save the payload, otherwise skip straight to error handler
if err == nil {
err = store.Save(p)
}
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// read the saved checkpoint
m, err := mapFromCheckpoint(store, p.UID())
m, err := mapFromCheckpoint(store, p.UID(), p.ResourceVersion())
if err != nil {
t.Fatalf("error loading checkpoint to map: %v", err)
}
@ -221,10 +236,11 @@ func TestFsStoreLoad(t *testing.T) {
// construct a payload that contains the kubeletconfig
const (
uid = "uid"
resourceVersion = "1"
kubeletKey = "kubelet"
)
p, err := checkpoint.NewConfigMapPayload(&apiv1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid)},
ObjectMeta: metav1.ObjectMeta{UID: types.UID(uid), ResourceVersion: resourceVersion},
Data: map[string]string{
kubeletKey: string(data),
},
@ -241,10 +257,13 @@ func TestFsStoreLoad(t *testing.T) {
cases := []struct {
desc string
uid types.UID
resourceVersion string
err string
}{
{"checkpoint exists", uid, ""},
{"checkpoint does not exist", "bogus-uid", "no checkpoint for source"},
{"checkpoint exists", uid, resourceVersion, ""},
{"checkpoint does not exist", "bogus-uid", "bogus-resourceVersion", "no checkpoint for source"},
{"ambiguous UID", "", "bogus-resourceVersion", "empty UID is ambiguous"},
{"ambiguous ResourceVersion", "bogus-uid", "", "empty ResourceVersion is ambiguous"},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
@ -253,10 +272,11 @@ func TestFsStoreLoad(t *testing.T) {
Name: "name",
Namespace: "namespace",
UID: c.uid,
ResourceVersion: c.resourceVersion,
KubeletConfigKey: kubeletKey,
}})
if err != nil {
t.Fatalf("error constructing remote config source: %v", err)
t.Fatalf("unexpected error: %v", err)
}
loaded, err := store.Load(source)
utiltest.ExpectError(t, err, c.err)
@ -389,35 +409,80 @@ func TestFsStoreSetAssigned(t *testing.T) {
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
uid: %s
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: types.UID(uid),
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}})
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// save the assigned source
if err := store.SetAssigned(source); err != nil {
t.Fatalf("unexpected error: %v", err)
source = s
}
// save the assigned source
err = store.SetAssigned(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, assignedFile)
if expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", expect, string(data))
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
@ -427,35 +492,80 @@ func TestFsStoreSetLastKnownGood(t *testing.T) {
t.Fatalf("error constructing store: %v", err)
}
const uid = "uid"
expect := fmt.Sprintf(`apiVersion: kubelet.config.k8s.io/v1beta1
cases := []struct {
desc string
source *apiv1.NodeConfigSource
expect string
err string
}{
{
desc: "nil source",
expect: "", // empty file
},
{
desc: "non-nil source",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}},
expect: `apiVersion: kubelet.config.k8s.io/v1beta1
kind: SerializedNodeConfigSource
source:
configMap:
kubeletConfigKey: kubelet
name: name
namespace: namespace
uid: %s
`, uid)
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
resourceVersion: "1"
uid: uid
`,
},
{
desc: "missing UID",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: types.UID(uid),
ResourceVersion: "1",
KubeletConfigKey: "kubelet",
}})
}},
err: "failed to write RemoteConfigSource, empty UID is ambiguous",
},
{
desc: "missing ResourceVersion",
source: &apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}},
err: "failed to write RemoteConfigSource, empty ResourceVersion is ambiguous",
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
var source checkpoint.RemoteConfigSource
if c.source != nil {
s, _, err := checkpoint.NewRemoteConfigSource(c.source)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// save the last known good source
if err := store.SetLastKnownGood(source); err != nil {
t.Fatalf("unexpected error: %v", err)
source = s
}
// save the assigned source
err = store.SetLastKnownGood(source)
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
// check that the source saved as we would expect
data := readTestSourceFile(t, store, lastKnownGoodFile)
if expect != string(data) {
t.Errorf("expect last-known-good source file to contain %q, but got %q", expect, string(data))
if c.expect != string(data) {
t.Errorf("expect assigned source file to contain %q, but got %q", c.expect, string(data))
}
})
}
}
@ -536,107 +646,8 @@ func TestFsStoreReset(t *testing.T) {
}
}
func TestFsStoreReadRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{
ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
expect checkpoint.RemoteConfigSource
err string
}{
{"default source", nil, ""},
{"non-default source", source, ""},
}
const name = "some-source-file"
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
saveTestSourceFile(t, store, name, c.expect)
source, err := readRemoteConfigSource(store.fs, store.metaPath(name))
utiltest.ExpectError(t, err, c.err)
if err != nil {
return
}
if !checkpoint.EqualRemoteConfigSources(c.expect, source) {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.expect), spew.Sdump(c.expect), spew.Sdump(source))
}
})
}
}
func TestFsStoreWriteRemoteConfigSource(t *testing.T) {
store, err := newInitializedFakeFsStore()
if err != nil {
t.Fatalf("error constructing store: %v", err)
}
source, _, err := checkpoint.NewRemoteConfigSource(&apiv1.NodeConfigSource{ConfigMap: &apiv1.ConfigMapNodeConfigSource{
Name: "name",
Namespace: "namespace",
UID: "uid",
KubeletConfigKey: "kubelet",
}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
source checkpoint.RemoteConfigSource
}{
{"nil source", nil},
{"non-nil source", source},
}
const name = "some-source-file"
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
// set the source file
err := writeRemoteConfigSource(store.fs, store.metaPath(name), c.source)
if err != nil {
t.Fatalf("unable to set source file, error: %v", err)
}
// read back the file
data := readTestSourceFile(t, store, name)
str := string(data)
if c.source != nil {
// expect the contents to match the encoding of the source
data, err := c.source.Encode()
expect := string(data)
if err != nil {
t.Fatalf("couldn't encode source, error: %v", err)
}
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
} else {
// expect empty file
expect := ""
if expect != str {
t.Errorf("case %q, expect %q but got %q", spew.Sdump(c.source), expect, str)
}
}
})
}
}
func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid))
func mapFromCheckpoint(store *fsStore, uid, resourceVersion string) (map[string]string, error) {
files, err := store.fs.ReadDir(store.checkpointPath(uid, resourceVersion))
if err != nil {
return nil, err
}
@ -647,7 +658,7 @@ func mapFromCheckpoint(store *fsStore, uid string) (map[string]string, error) {
return nil, fmt.Errorf("expect only regular files in checkpoint dir %q", uid)
}
// read the file contents and build the map
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid), f.Name()))
data, err := store.fs.ReadFile(filepath.Join(store.checkpointPath(uid, resourceVersion), f.Name()))
if err != nil {
return nil, err
}

View File

@ -30,6 +30,7 @@ type Store interface {
Initialize() error
// Exists returns true if the object referenced by `source` has been checkpointed.
// The source must be unambiguous - e.g. if referencing an API object it must specify both uid and resourceVersion.
Exists(source checkpoint.RemoteConfigSource) (bool, error)
// Save Kubelet config payloads to the storage layer. It must be possible to unmarshal the payload to a KubeletConfiguration.
// The following payload types are supported:

View File

@ -37,10 +37,10 @@ import (
const (
// KubeletConfigChangedEventReason identifies an event as a change of Kubelet configuration
KubeletConfigChangedEventReason = "KubeletConfigChanged"
// EventMessageFmt is the message format for Kubelet config change events
EventMessageFmt = "Kubelet will restart to use: %s"
// LocalConfigMessage is the text to apply to EventMessageFmt when the Kubelet has been configured to use its local config (init or defaults)
LocalConfigMessage = "local config"
// LocalEventMessage is sent when the Kubelet restarts to use local config
LocalEventMessage = "Kubelet restarting to use local config"
// RemoteEventMessageFmt is sent when the Kubelet restarts to use a remote config
RemoteEventMessageFmt = "Kubelet restarting to use %s, UID: %s, ResourceVersion: %s, KubeletConfigKey: %s"
)
// pokeConfiSourceWorker tells the worker thread that syncs config sources that work needs to be done
@ -69,111 +69,116 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, eventClient v
}
}()
node, err := latestNode(cc.informer.GetStore(), nodeName)
// get the latest Node.Spec.ConfigSource from the informer
source, err := latestNodeConfigSource(cc.nodeInformer.GetStore(), nodeName)
if err != nil {
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, status.InternalError))
syncerr = fmt.Errorf("%s, error: %v", status.InternalError, err)
return
}
// check the Node and download any new config
if updated, cur, reason, err := cc.doSyncConfigSource(client, node.Spec.ConfigSource); err != nil {
cc.configStatus.SetErrorOverride(fmt.Sprintf(status.SyncErrorFmt, reason))
// a nil source simply means we reset to local defaults
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset assigned and last-known-good to defaults")
if updated, reason, err := cc.resetConfig(); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
path := LocalConfigMessage
if cur != nil {
path = cur.APIPath()
restartForNewConfig(eventClient, nodeName, nil)
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := eventf(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, EventMessageFmt, path)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
return
}
os.Exit(0)
// a non-nil source means we should attempt to download the config, and checkpoint it if necessary
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
// TODO(mtaufen): It would be nice if we could check the payload's metadata before (re)downloading the whole payload
// we at least try pulling the latest configmap out of the local informer store.
// construct the interface that can dynamically dispatch the correct Download, etc. methods for the given source type
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// "download" source, either from informer's in-memory store or directly from the API server, if the informer doesn't have a copy
payload, reason, err := cc.downloadConfigPayload(client, remote)
if err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// save a checkpoint for the payload, if one does not already exist
if reason, err := cc.saveConfigCheckpoint(remote, payload); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
}
// update the local, persistent record of assigned config
if updated, reason, err := cc.setAssignedConfig(remote); err != nil {
reason = fmt.Sprintf(status.SyncErrorFmt, reason)
cc.configStatus.SetErrorOverride(reason)
syncerr = fmt.Errorf("%s, error: %v", reason, err)
return
} else if updated {
restartForNewConfig(eventClient, nodeName, remote)
}
// If we get here:
// - there is no need to restart to update the current config
// - there is no need to restart to use new config
// - there was no error trying to sync configuration
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the status
cc.configStatus.SetErrorOverride("")
}
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,
// depending on the `source`, and returns whether the current config in the checkpoint store was updated as a result
func (cc *Controller) doSyncConfigSource(client clientset.Interface, source *apiv1.NodeConfigSource) (bool, checkpoint.RemoteConfigSource, string, error) {
if source == nil {
utillog.Infof("Node.Spec.ConfigSource is empty, will reset current and last-known-good to defaults")
updated, reason, err := cc.resetConfig()
if err != nil {
return false, nil, reason, err
// Note: source has up-to-date uid and resourceVersion after calling downloadConfigPayload.
func (cc *Controller) downloadConfigPayload(client clientset.Interface, source checkpoint.RemoteConfigSource) (checkpoint.Payload, string, error) {
var store cache.Store
if cc.remoteConfigSourceInformer != nil {
store = cc.remoteConfigSourceInformer.GetStore()
}
return updated, nil, "", nil
return source.Download(client, store)
}
// if the NodeConfigSource is non-nil, download the config
utillog.Infof("Node.Spec.ConfigSource is non-empty, will checkpoint source and update config if necessary")
remote, reason, err := checkpoint.NewRemoteConfigSource(source)
func (cc *Controller) saveConfigCheckpoint(source checkpoint.RemoteConfigSource, payload checkpoint.Payload) (string, error) {
ok, err := cc.checkpointStore.Exists(source)
if err != nil {
return false, nil, reason, err
}
reason, err = cc.checkpointConfigSource(client, remote)
if err != nil {
return false, nil, reason, err
}
updated, reason, err := cc.setAssignedConfig(remote)
if err != nil {
return false, nil, reason, err
}
return updated, remote, "", nil
}
// checkpointConfigSource downloads and checkpoints the object referred to by `source` if the checkpoint does not already exist,
// if a failure occurs, returns a sanitized failure reason and an error
func (cc *Controller) checkpointConfigSource(client clientset.Interface, source checkpoint.RemoteConfigSource) (string, error) {
// if the checkpoint already exists, skip downloading
if ok, err := cc.checkpointStore.Exists(source); err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
} else if ok {
// TODO(mtaufen): update this to include ResourceVersion in #63221
utillog.Infof("checkpoint already exists for object %s with UID %s, skipping download", source.APIPath(), source.UID())
}
if ok {
utillog.Infof("checkpoint already exists for %s, UID: %s, ResourceVersion: %s", source.APIPath(), payload.UID(), payload.ResourceVersion())
return "", nil
}
// download
payload, reason, err := source.Download(client)
if err != nil {
return reason, fmt.Errorf("%s, error: %v", reason, err)
}
// save
err = cc.checkpointStore.Save(payload)
if err != nil {
if err := cc.checkpointStore.Save(payload); err != nil {
return status.InternalError, fmt.Errorf("%s, error: %v", status.InternalError, err)
}
return "", nil
}
// setAssignedConfig updates the assigned checkpoint config in the store.
// Returns whether the current config changed as a result, or a sanitized failure reason and an error.
// Returns whether the assigned config changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) setAssignedConfig(source checkpoint.RemoteConfigSource) (bool, string, error) {
current, err := cc.checkpointStore.Assigned()
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return false, status.InternalError, err
}
if err := cc.checkpointStore.SetAssigned(source); err != nil {
return false, status.InternalError, err
}
return !checkpoint.EqualRemoteConfigSources(current, source), "", nil
return !checkpoint.EqualRemoteConfigSources(assigned, source), "", nil
}
// resetConfig resets the current and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the current checkpoint changed as a result, or a sanitized failure reason and an error.
// resetConfig resets the assigned and last-known-good checkpoints in the checkpoint store to their default values and
// returns whether the assigned checkpoint changed as a result, or a sanitized failure reason and an error.
func (cc *Controller) resetConfig() (bool, string, error) {
updated, err := cc.checkpointStore.Reset()
if err != nil {
@ -182,8 +187,26 @@ func (cc *Controller) resetConfig() (bool, string, error) {
return updated, "", nil
}
// latestNode returns the most recent Node with `nodeName` from `store`
func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
// restartForNewConfig presumes the Kubelet is managed by a babysitter, e.g. systemd
// It will send an event before exiting.
func restartForNewConfig(eventClient v1core.EventsGetter, nodeName string, source checkpoint.RemoteConfigSource) {
message := LocalEventMessage
if source != nil {
message = fmt.Sprintf(RemoteEventMessageFmt, source.APIPath(), source.UID(), source.ResourceVersion(), source.KubeletFilename())
}
// we directly log and send the event, instead of using the event recorder,
// because the event recorder won't flush its queue before we exit (we'd lose the event)
event := makeEvent(nodeName, apiv1.EventTypeNormal, KubeletConfigChangedEventReason, message)
glog.V(3).Infof("Event(%#v): type: '%v' reason: '%v' %v", event.InvolvedObject, event.Type, event.Reason, event.Message)
if _, err := eventClient.Events(apiv1.NamespaceDefault).Create(event); err != nil {
utillog.Errorf("failed to send event, error: %v", err)
}
utillog.Infof(message)
os.Exit(0)
}
// latestNodeConfigSource returns a copy of the most recent NodeConfigSource from the Node with `nodeName` in `store`
func latestNodeConfigSource(store cache.Store, nodeName string) (*apiv1.NodeConfigSource, error) {
obj, ok, err := store.GetByKey(nodeName)
if err != nil {
err := fmt.Errorf("failed to retrieve Node %q from informer's store, error: %v", nodeName, err)
@ -200,13 +223,11 @@ func latestNode(store cache.Store, nodeName string) (*apiv1.Node, error) {
utillog.Errorf(err.Error())
return nil, err
}
return node, nil
}
// eventf constructs and returns an event containing a formatted message
// similar to k8s.io/client-go/tools/record/event.go
func eventf(nodeName, eventType, reason, messageFmt string, args ...interface{}) *apiv1.Event {
return makeEvent(nodeName, eventType, reason, fmt.Sprintf(messageFmt, args...))
// Copy the source, so anyone who modifies it after here doesn't mess up the informer's store!
// This was previously the cause of a bug that made the Kubelet frequently resync config; Download updated
// the UID and ResourceVersion on the NodeConfigSource, but the pointer was still drilling all the way
// into the informer's copy!
return node.Spec.ConfigSource.DeepCopy(), nil
}
// makeEvent constructs an event

View File

@ -21,6 +21,7 @@ import (
"path/filepath"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -52,8 +53,11 @@ type Controller struct {
// configStatus manages the status we report on the Node object
configStatus status.NodeConfigStatus
// informer is the informer that watches the Node object
informer cache.SharedInformer
// nodeInformer is the informer that watches the Node object
nodeInformer cache.SharedInformer
// remoteConfigSourceInformer is the informer that watches the assigned config source
remoteConfigSourceInformer cache.SharedInformer
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
@ -139,51 +143,80 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
return nil, err
}
// update the active source to the non-nil last-known-good source
// set status to indicate the active source is the non-nil last-known-good source
cc.configStatus.SetActive(lastKnownGoodSource.NodeConfigSource())
return lastKnownGoodConfig, nil
}
// StartSync launches the controller's sync loops if `client` is non-nil and `nodeName` is non-empty.
// It will always start the Node condition reporting loop, and will also start the dynamic conifg sync loops
// if dynamic config is enabled on the controller. If `nodeName` is empty but `client` is non-nil, an error is logged.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) {
// StartSync tells the controller to start the goroutines that sync status/config to/from the API server.
// The clients must be non-nil, and the nodeName must be non-empty.
func (cc *Controller) StartSync(client clientset.Interface, eventClient v1core.EventsGetter, nodeName string) error {
const errFmt = "cannot start Kubelet config sync: %s"
if client == nil {
utillog.Infof("nil client, will not start sync loops")
return
} else if len(nodeName) == 0 {
utillog.Errorf("cannot start sync loops with empty nodeName")
return
return fmt.Errorf(errFmt, "nil client")
}
if eventClient == nil {
return fmt.Errorf(errFmt, "nil event client")
}
if nodeName == "" {
return fmt.Errorf(errFmt, "empty nodeName")
}
// start the status sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
cc.informer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
// start the informer loop
// Rather than use utilruntime.HandleCrash, which doesn't actually crash in the Kubelet,
// we use HandlePanic to manually call the panic handlers and then crash.
// We have a better chance of recovering normal operation if we just restart the Kubelet in the event
// of a Go runtime error.
go utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer sync loop")
cc.informer.Run(wait.NeverStop)
})()
// NOTE(mtaufen): utilpanic.HandlePanic returns a function and you have to call it for your thing to run!
// This was EVIL to debug (difficult to see missing `()`).
// The code now uses `go name()` instead of `go utilpanic.HandlePanic(func(){...})()` to avoid confusion.
// start the config source sync loop
go utilpanic.HandlePanic(func() {
utillog.Infof("starting config source sync loop")
// status sync worker
statusSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting status sync loop")
wait.JitterUntil(func() {
cc.configStatus.Sync(client, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})
// remote config source informer, if we have a remote source to watch
assignedSource, err := cc.checkpointStore.Assigned()
if err != nil {
return fmt.Errorf(errFmt, err)
} else if assignedSource == nil {
utillog.Infof("local source is assigned, will not start remote config source informer")
} else {
cc.remoteConfigSourceInformer = assignedSource.Informer(client, cache.ResourceEventHandlerFuncs{
AddFunc: cc.onAddRemoteConfigSourceEvent,
UpdateFunc: cc.onUpdateRemoteConfigSourceEvent,
DeleteFunc: cc.onDeleteRemoteConfigSourceEvent,
},
)
}
remoteConfigSourceInformerFunc := utilpanic.HandlePanic(func() {
if cc.remoteConfigSourceInformer != nil {
utillog.Infof("starting remote config source informer")
cc.remoteConfigSourceInformer.Run(wait.NeverStop)
}
})
// node informer
cc.nodeInformer = newSharedNodeInformer(client, nodeName,
cc.onAddNodeEvent, cc.onUpdateNodeEvent, cc.onDeleteNodeEvent)
nodeInformerFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Node informer")
cc.nodeInformer.Run(wait.NeverStop)
})
// config sync worker
configSyncLoopFunc := utilpanic.HandlePanic(func() {
utillog.Infof("starting Kubelet config sync loop")
wait.JitterUntil(func() {
cc.syncConfigSource(client, eventClient, nodeName)
}, 10*time.Second, 0.2, true, wait.NeverStop)
})()
})
go statusSyncLoopFunc()
go remoteConfigSourceInformerFunc()
go nodeInformerFunc()
go configSyncLoopFunc()
return nil
}
// loadConfig loads Kubelet config from a checkpoint
@ -213,7 +246,6 @@ func (cc *Controller) checkTrial(duration time.Duration) {
if trial, err := cc.inTrial(duration); err != nil {
utillog.Errorf("failed to check trial period for assigned config, error: %v", err)
} else if !trial {
utillog.Infof("assigned config passed trial period, will set as last-known-good")
if err := cc.graduateAssignedToLastKnownGood(); err != nil {
utillog.Errorf("failed to set last-known-good to assigned config, error: %v", err)
}
@ -236,17 +268,28 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
// graduateAssignedToLastKnownGood sets the last-known-good in the checkpointStore
// to the same value as the assigned config maintained by the checkpointStore
func (cc *Controller) graduateAssignedToLastKnownGood() error {
// get the assigned config
// get assigned
assigned, err := cc.checkpointStore.Assigned()
if err != nil {
return err
}
// update the last-known-good config
// get last-known-good
lastKnownGood, err := cc.checkpointStore.LastKnownGood()
if err != nil {
return err
}
// if the sources are equal, no need to change
if assigned == lastKnownGood ||
assigned != nil && lastKnownGood != nil && apiequality.Semantic.DeepEqual(assigned, lastKnownGood) {
return nil
}
// update last-known-good
err = cc.checkpointStore.SetLastKnownGood(assigned)
if err != nil {
return err
}
// update the status to reflect the new last-known-good config
cc.configStatus.SetLastKnownGood(assigned.NodeConfigSource())
utillog.Infof("updated last-known-good config to %s, UID: %s, ResourceVersion: %s", assigned.APIPath(), assigned.UID(), assigned.ResourceVersion())
return nil
}

View File

@ -80,9 +80,12 @@ type nodeConfigStatus struct {
// NewNodeConfigStatus returns a new NodeConfigStatus interface
func NewNodeConfigStatus() NodeConfigStatus {
return &nodeConfigStatus{
// channels must have capacity at least 1, since we signal with non-blocking writes
syncCh: make(chan bool, 1),
syncCh := make(chan bool, 1)
// prime new status managers to sync with the API server on the first call to Sync
syncCh <- true
return &nodeConfigStatus{
syncCh: syncCh,
}
}
@ -142,6 +145,8 @@ func (s *nodeConfigStatus) Sync(client clientset.Interface, nodeName string) {
return
}
utillog.Infof("updating Node.Status.Config")
// grab the lock
s.mux.Lock()
defer s.mux.Unlock()

View File

@ -86,6 +86,7 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
}
if oldObj == nil {
// Node was just added, need to sync
utillog.Infof("initial Node watch event")
cc.pokeConfigSourceWorker()
return
}
@ -95,31 +96,59 @@ func (cc *Controller) onUpdateNodeEvent(oldObj interface{}, newObj interface{})
return
}
if !apiequality.Semantic.DeepEqual(oldNode.Spec.ConfigSource, newNode.Spec.ConfigSource) {
utillog.Infof("Node.Spec.ConfigSource was updated")
cc.pokeConfigSourceWorker()
}
}
// onDeleteNodeEvent logs a message if the Node was deleted and may log errors
// if an unexpected DeletedFinalStateUnknown was received.
// onDeleteNodeEvent logs a message if the Node was deleted
// We allow the sync-loop to continue, because it is possible that the Kubelet detected
// a Node with unexpected externalID and is attempting to delete and re-create the Node
// (see pkg/kubelet/kubelet_node_status.go), or that someone accidentally deleted the Node
// (the Kubelet will re-create it).
func (cc *Controller) onDeleteNodeEvent(deletedObj interface{}) {
node, ok := deletedObj.(*apiv1.Node)
// For this case, we just log the event.
// We don't want to poke the worker, because a temporary deletion isn't worth reporting an error for.
// If the Node is deleted because the VM is being deleted, then the Kubelet has nothing to do.
utillog.Infof("Node was deleted")
}
// onAddRemoteConfigSourceEvent calls onUpdateConfigMapEvent with the new object and a nil old object
func (cc *Controller) onAddRemoteConfigSourceEvent(newObj interface{}) {
cc.onUpdateRemoteConfigSourceEvent(nil, newObj)
}
// onUpdateRemoteConfigSourceEvent checks whether the configSource changed between oldObj and newObj,
// and pokes the sync worker if there was a change
func (cc *Controller) onUpdateRemoteConfigSourceEvent(oldObj interface{}, newObj interface{}) {
// since ConfigMap is currently the only source type, we handle that here
newConfigMap, ok := newObj.(*apiv1.ConfigMap)
if !ok {
tombstone, ok := deletedObj.(cache.DeletedFinalStateUnknown)
if !ok {
utillog.Errorf("couldn't cast deleted object to DeletedFinalStateUnknown, object: %+v", deletedObj)
utillog.Errorf("failed to cast new object to ConfigMap, couldn't handle event")
return
}
node, ok = tombstone.Obj.(*apiv1.Node)
if oldObj == nil {
// ConfigMap was just added, need to sync
utillog.Infof("initial ConfigMap watch event")
cc.pokeConfigSourceWorker()
return
}
oldConfigMap, ok := oldObj.(*apiv1.ConfigMap)
if !ok {
utillog.Errorf("received DeletedFinalStateUnknown object but it did not contain a Node, object: %+v", deletedObj)
utillog.Errorf("failed to cast old object to ConfigMap, couldn't handle event")
return
}
utillog.Infof("Node was deleted (DeletedFinalStateUnknown), sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
return
if !apiequality.Semantic.DeepEqual(oldConfigMap, newConfigMap) {
utillog.Infof("assigned ConfigMap was updated")
cc.pokeConfigSourceWorker()
}
utillog.Infof("Node was deleted, sync-loop will continue because the Kubelet might recreate the Node, node: %+v", node)
}
// onDeleteRemoteConfigSourceEvent logs a message if the ConfigMap was deleted and pokes the sync worker
func (cc *Controller) onDeleteRemoteConfigSourceEvent(deletedObj interface{}) {
// If the ConfigMap we're watching is deleted, we log the event and poke the sync worker.
// This requires a sync, because if the Node is still configured to use the deleted ConfigMap,
// the Kubelet should report a DownloadError.
utillog.Infof("assigned ConfigMap was deleted")
cc.pokeConfigSourceWorker()
}

File diff suppressed because it is too large Load Diff

View File

@ -289,9 +289,6 @@ func TestNodeAuthorizer(t *testing.T) {
ConfigMap: &api.ConfigMapNodeConfigSource{
Namespace: "ns",
Name: "myconfigmapconfigsource",
// validation just requires UID to be non-empty and it isn't necessary for GET,
// so we just use a bogus one for the test
UID: "uid",
KubeletConfigKey: "kubelet",
},
}