diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4b9736ad19..3d0a9d1565 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/apis/componentconfig" - kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" + v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" "k8s.io/kubernetes/pkg/capabilities" "k8s.io/kubernetes/pkg/client/chaosclient" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -64,6 +64,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/server" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/runtime" utilconfig "k8s.io/kubernetes/pkg/util/config" "k8s.io/kubernetes/pkg/util/configz" "k8s.io/kubernetes/pkg/util/crypto" @@ -73,7 +74,7 @@ import ( nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/rlimit" - "k8s.io/kubernetes/pkg/util/runtime" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" @@ -306,6 +307,117 @@ func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) { }, nil } +func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) { + clientConfig, err := CreateAPIServerClientConfig(s) + if err == nil { + kubeClient, err := clientset.NewForConfig(clientConfig) + if err != nil { + return nil, err + } + return kubeClient, nil + } + return nil, err +} + +// Tries to download the kubelet- configmap from "kube-system" namespace via the API server and returns a JSON string or error +func getRemoteKubeletConfig(s *options.KubeletServer, kcfg *KubeletConfig) (string, error) { + // TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request + kubeClient, err := getKubeClient(s) + if err != nil { + return "", err + } + + configmap, err := func() (*api.ConfigMap, error) { + var nodename string + hostname := nodeutil.GetHostname(s.HostnameOverride) + + if kcfg != nil && kcfg.Cloud != nil { + instances, ok := kcfg.Cloud.Instances() + if !ok { + err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.") + return nil, err + } + nodename, err = instances.CurrentNodeName(hostname) + if err != nil { + err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err) + return nil, err + } + // look for kubelet- configmap from "kube-system" + configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename)) + if err != nil { + return nil, err + } + return configmap, nil + } + // No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname + configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname)) + if err != nil { + return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err) + } + return configmap, nil + }() + if err != nil { + return "", err + } + + // When we create the KubeletConfiguration configmap, we put a json string + // representation of the config in a `kubelet.config` key. + jsonstr, ok := configmap.Data["kubelet.config"] + if !ok { + return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`") + } + + return jsonstr, nil +} + +func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) { + glog.Infof("Starting Kubelet configuration sync loop") + go func() { + wait.PollInfinite(30*time.Second, func() (bool, error) { + glog.Infof("Checking API server for new Kubelet configuration.") + remoteKC, err := getRemoteKubeletConfig(s, nil) + if err == nil { + // Detect new config by comparing with the last JSON string we extracted. + if remoteKC != currentKC { + glog.Info("Found new Kubelet configuration via API server, restarting!") + os.Exit(0) + } + } else { + glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err) + } + return false, nil // Always return (false, nil) so we poll forever. + }) + }() +} + +// Try to check for config on the API server, return that config if we get it, and start +// a background thread that checks for updates to configs. +func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) { + jsonstr, err := getRemoteKubeletConfig(s, nil) + if err == nil { + // We will compare future API server config against the config we just got (jsonstr): + startKubeletConfigSyncLoop(s, jsonstr) + + // Convert json from API server to external type struct, and convert that to internal type struct + extKC := v1alpha1.KubeletConfiguration{} + err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC) + if err != nil { + return nil, err + } + kc := componentconfig.KubeletConfiguration{} + err = api.Scheme.Convert(&extKC, &kc, nil) + if err != nil { + return nil, err + } + return &kc, nil + } else { + // Couldn't get a configuration from the API server yet. + // Restart as soon as anything comes back from the API server. + startKubeletConfigSyncLoop(s, "") + return nil, err + } +} + // Run runs the specified KubeletServer for the given KubeletConfig. This should never exit. // The kcfg argument may be nil - if so, it is initialized from the settings on KubeletServer. // Otherwise, the caller is assumed to have set up the KubeletConfig object and all defaults @@ -326,6 +438,22 @@ func checkPermissions() error { return nil } +func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) { + tmp := v1alpha1.KubeletConfiguration{} + api.Scheme.Convert(kc, &tmp, nil) + cz.Set(tmp) +} + +func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) { + cz, err := configz.New("componentconfig") + if err == nil { + setConfigz(cz, kc) + } else { + glog.Errorf("unable to register configz: %s", err) + } + return cz, err +} + func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") @@ -344,18 +472,38 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { } } } - if c, err := configz.New("componentconfig"); err == nil { - c.Set(s.KubeletConfiguration) - } else { - glog.Errorf("unable to register configz: %s", err) - } + + // Register current configuration with /configz endpoint + cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration) if kcfg == nil { + if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() { + // Look for config on the API server. If it exists, replace s.KubeletConfiguration + // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config. + + // Don't do dynamic Kubelet configuration in runonce mode + if s.RunOnce == false { + // For now we only do dynamic config when kcfg is passed as nil, because we don't want to disturb + // any special relationship between the values in s (KubeletServer) and kcfg (KubeletConfig). + remoteKC, err := initKubeletConfigSync(s) + if err == nil { + // Update s (KubeletServer) with new config from API server + s.KubeletConfiguration = *remoteKC + // Ensure that /configz is up to date with the new config + if cfgzErr != nil { + glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr) + } else { + setConfigz(cfgz, &s.KubeletConfiguration) + } + } + } + } + var kubeClient, eventClient *clientset.Clientset var autoDetectCloudProvider bool var cloud cloudprovider.Interface - if s.CloudProvider == kubeExternal.AutoDetectCloudProvider { + if s.CloudProvider == v1alpha1.AutoDetectCloudProvider { autoDetectCloudProvider = true } else { cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) @@ -440,7 +588,8 @@ func run(s *options.KubeletServer, kcfg *KubeletConfig) (err error) { glog.Error(err) } - runtime.ReallyCrash = s.ReallyCrashForTesting + utilruntime.ReallyCrash = s.ReallyCrashForTesting + rand.Seed(time.Now().UTC().UnixNano()) // TODO(vmarmol): Do this through container config. diff --git a/pkg/util/config/feature_gate.go b/pkg/util/config/feature_gate.go index f8777d58c9..25aef393d9 100644 --- a/pkg/util/config/feature_gate.go +++ b/pkg/util/config/feature_gate.go @@ -38,14 +38,16 @@ const ( // specification of gates. Examples: // AllAlpha=false,NewFeature=true will result in newFeature=true // AllAlpha=true,NewFeature=false will result in newFeature=false - allAlphaGate = "AllAlpha" + allAlphaGate = "AllAlpha" + dynamicKubeletConfig = "DynamicKubeletConfig" ) var ( // Default values for recorded features. Every new feature gate should be // represented here. knownFeatures = map[string]featureSpec{ - allAlphaGate: {false, alpha}, + allAlphaGate: {false, alpha}, + dynamicKubeletConfig: {false, alpha}, } // Special handling for a few gates. @@ -86,6 +88,7 @@ type FeatureGate interface { // MyFeature() bool // TODO: Define accessors for each non-API alpha feature. + DynamicKubeletConfig() bool } // featureGate implements FeatureGate as well as pflag.Value for flag parsing. @@ -154,6 +157,11 @@ func (f *featureGate) Type() string { return "mapStringBool" } +// DynamicKubeletConfig returns value for dynamicKubeletConfig +func (f *featureGate) DynamicKubeletConfig() bool { + return f.lookup(dynamicKubeletConfig) +} + func (f *featureGate) lookup(key string) bool { defaultValue := f.known[key].enabled if f.enabled != nil {