diff --git a/cmd/kubeadm/app/cmd/upgrade/apply.go b/cmd/kubeadm/app/cmd/upgrade/apply.go index 319029ce07..92dbcef58e 100644 --- a/cmd/kubeadm/app/cmd/upgrade/apply.go +++ b/cmd/kubeadm/app/cmd/upgrade/apply.go @@ -18,6 +18,7 @@ package upgrade import ( "fmt" + "os" "strings" "time" @@ -26,12 +27,20 @@ import ( clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" "k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" + "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" + dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/util/version" ) +const ( + upgradeManifestTimeout = 1 * time.Minute +) + // applyFlags holds the information about the flags that can be passed to apply type applyFlags struct { nonInteractiveMode bool @@ -102,7 +111,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command { func RunApply(flags *applyFlags) error { // Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap) - upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig) + upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig, flags.dryRun) if err != nil { return err } @@ -126,16 +135,24 @@ func RunApply(flags *applyFlags) error { } } - // TODO: Implement a prepulling mechanism here + // Use a prepuller implementation based on creating DaemonSets + // and block until all DaemonSets are ready; then we know for sure that all control plane images are cached locally + prepuller := upgrade.NewDaemonSetPrepuller(upgradeVars.client, upgradeVars.waiter, internalcfg) + upgrade.PrepullImagesInParallel(prepuller, flags.imagePullTimeout) // Now; perform the upgrade procedure - if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, internalcfg); err != nil { + if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, upgradeVars.waiter, internalcfg); err != nil { return fmt.Errorf("[upgrade/apply] FATAL: %v", err) } // Upgrade RBAC rules and addons. Optionally, if needed, perform some extra task for a specific version if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil { - return fmt.Errorf("[upgrade/postupgrade] FATAL: %v", err) + return fmt.Errorf("[upgrade/postupgrade] FATAL post-upgrade error: %v", err) + } + + if flags.dryRun { + fmt.Println("[dryrun] Finished dryrunning successfully!") + return nil } fmt.Println("") @@ -182,7 +199,7 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett if len(versionSkewErrs.Skippable) > 0 { // Return the error if the user hasn't specified the --force flag if !flags.force { - return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Mandatory) + return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Skippable) } // Soft errors found, but --force was specified fmt.Printf("[upgrade/version] Found %d potential version compatibility errors but skipping since the --force flag is set: %v\n", len(versionSkewErrs.Skippable), versionSkewErrs.Skippable) @@ -192,21 +209,60 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett } // PerformControlPlaneUpgrade actually performs the upgrade procedure for the cluster of your type (self-hosted or static-pod-hosted) -func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, internalcfg *kubeadmapi.MasterConfiguration) error { +func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error { // Check if the cluster is self-hosted and act accordingly if upgrade.IsControlPlaneSelfHosted(client) { fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr) - // Upgrade a self-hosted cluster - // TODO(luxas): Implement this later when we have the new upgrade strategy - return fmt.Errorf("not implemented") + // Upgrade the self-hosted cluster + return upgrade.SelfHostedControlPlane(client, waiter, internalcfg, flags.newK8sVersion) } // OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster fmt.Printf("[upgrade/apply] Upgrading your Static Pod-hosted control plane to version %q...\n", flags.newK8sVersionStr) - if err := upgrade.PerformStaticPodControlPlaneUpgrade(client, internalcfg, flags.newK8sVersion); err != nil { + if flags.dryRun { + return DryRunStaticPodUpgrade(internalcfg) + } + return PerformStaticPodUpgrade(client, waiter, internalcfg) +} + +// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster +func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error { + pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory()) + if err != nil { + return err + } + + if err := upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg); err != nil { + return err + } + return nil +} + +// DryRunStaticPodUpgrade fakes an upgrade of the control plane +func DryRunStaticPodUpgrade(internalcfg *kubeadmapi.MasterConfiguration) error { + + dryRunManifestDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgrade-dryrun") + if err != nil { + return err + } + defer os.RemoveAll(dryRunManifestDir) + + if err := controlplane.CreateInitStaticPodManifestFiles(dryRunManifestDir, internalcfg); err != nil { + return err + } + + // Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests + files := []dryrunutil.FileToPrint{} + for _, component := range constants.MasterComponents { + realPath := constants.GetStaticPodFilepath(component, dryRunManifestDir) + outputPath := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory()) + files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath)) + } + + if err := dryrunutil.PrintDryRunFiles(files, os.Stdout); err != nil { return err } return nil diff --git a/cmd/kubeadm/app/cmd/upgrade/common.go b/cmd/kubeadm/app/cmd/upgrade/common.go index 4a64d37011..4755cc1140 100644 --- a/cmd/kubeadm/app/cmd/upgrade/common.go +++ b/cmd/kubeadm/app/cmd/upgrade/common.go @@ -26,10 +26,13 @@ import ( "github.com/ghodss/yaml" + fakediscovery "k8s.io/client-go/discovery/fake" clientset "k8s.io/client-go/kubernetes" kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" "k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade" "k8s.io/kubernetes/cmd/kubeadm/app/preflight" + "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" + dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig" ) @@ -39,11 +42,12 @@ type upgradeVariables struct { client clientset.Interface cfg *kubeadmapiext.MasterConfiguration versionGetter upgrade.VersionGetter + waiter apiclient.Waiter } // enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure -func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upgradeVariables, error) { - client, err := kubeconfigutil.ClientSetFromFile(kubeConfigPath) +func enforceRequirements(kubeConfigPath, cfgPath string, printConfig, dryRun bool) (*upgradeVariables, error) { + client, err := getClient(kubeConfigPath, dryRun) if err != nil { return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err) } @@ -69,6 +73,8 @@ func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upg cfg: cfg, // Use a real version getter interface that queries the API server, the kubeadm client and the Kubernetes CI system for latest versions versionGetter: upgrade.NewKubeVersionGetter(client, os.Stdout), + // Use the waiter conditionally based on the dryrunning variable + waiter: getWaiter(dryRun, client), }, nil } @@ -101,6 +107,46 @@ func runPreflightChecks(skipPreFlight bool) error { return preflight.RunRootCheckOnly() } +// getClient gets a real or fake client depending on whether the user is dry-running or not +func getClient(file string, dryRun bool) (clientset.Interface, error) { + if dryRun { + dryRunGetter, err := apiclient.NewClientBackedDryRunGetterFromKubeconfig(file) + if err != nil { + return nil, err + } + + // In order for fakeclient.Discovery().ServerVersion() to return the backing API Server's + // real version; we have to do some clever API machinery tricks. First, we get the real + // API Server's version + realServerVersion, err := dryRunGetter.Client().Discovery().ServerVersion() + if err != nil { + return nil, fmt.Errorf("failed to get server version: %v", err) + } + + // Get the fake clientset + fakeclient := apiclient.NewDryRunClient(dryRunGetter, os.Stdout) + // As we know the return of Discovery() of the fake clientset is of type *fakediscovery.FakeDiscovery + // we can convert it to that struct. + fakeclientDiscovery, ok := fakeclient.Discovery().(*fakediscovery.FakeDiscovery) + if !ok { + return nil, fmt.Errorf("couldn't set fake discovery's server version") + } + // Lastly, set the right server version to be used + fakeclientDiscovery.FakedServerVersion = realServerVersion + // return the fake clientset used for dry-running + return fakeclient, nil + } + return kubeconfigutil.ClientSetFromFile(file) +} + +// getWaiter gets the right waiter implementation +func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter { + if dryRun { + return dryrunutil.NewWaiter() + } + return apiclient.NewKubeWaiter(client, upgradeManifestTimeout, os.Stdout) +} + // InteractivelyConfirmUpgrade asks the user whether they _really_ want to upgrade. func InteractivelyConfirmUpgrade(question string) error { diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index 545b362ae8..4f9a9186d0 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -50,8 +50,8 @@ func NewCmdPlan(parentFlags *cmdUpgradeFlags) *cobra.Command { // RunPlan takes care of outputting available versions to upgrade to for the user func RunPlan(parentFlags *cmdUpgradeFlags) error { - // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. - upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig) + // Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never set dry-run for plan. + upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig, false) if err != nil { return err } @@ -59,7 +59,7 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error { // Compute which upgrade possibilities there are availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades) if err != nil { - return err + return fmt.Errorf("[upgrade/versions] FATAL: %v", err) } // Tell the user which upgrades are available diff --git a/cmd/kubeadm/app/constants/constants.go b/cmd/kubeadm/app/constants/constants.go index 6c93720085..ea0ccd705e 100644 --- a/cmd/kubeadm/app/constants/constants.go +++ b/cmd/kubeadm/app/constants/constants.go @@ -18,6 +18,8 @@ package constants import ( "fmt" + "io/ioutil" + "os" "path/filepath" "time" @@ -31,6 +33,7 @@ var KubernetesDir = "/etc/kubernetes" const ( ManifestsSubDirName = "manifests" + TempDirForKubeadm = "/etc/kubernetes/tmp" CACertAndKeyBaseName = "ca" CACertName = "ca.crt" @@ -181,3 +184,17 @@ func GetAdminKubeConfigPath() string { func AddSelfHostedPrefix(componentName string) string { return fmt.Sprintf("%s%s", SelfHostingPrefix, componentName) } + +// CreateTempDirForKubeadm is a function that creates a temporary directory under /etc/kubernetes/tmp (not using /tmp as that would potentially be dangerous) +func CreateTempDirForKubeadm(dirName string) (string, error) { + // creates target folder if not already exists + if err := os.MkdirAll(TempDirForKubeadm, 0700); err != nil { + return "", fmt.Errorf("failed to create directory %q: %v", TempDirForKubeadm, err) + } + + tempDir, err := ioutil.TempDir(TempDirForKubeadm, dirName) + if err != nil { + return "", fmt.Errorf("couldn't create a temporary directory: %v", err) + } + return tempDir, nil +} diff --git a/cmd/kubeadm/app/phases/selfhosting/podspec_mutation.go b/cmd/kubeadm/app/phases/selfhosting/podspec_mutation.go index 65575bb27b..78c54fd51b 100644 --- a/cmd/kubeadm/app/phases/selfhosting/podspec_mutation.go +++ b/cmd/kubeadm/app/phases/selfhosting/podspec_mutation.go @@ -21,6 +21,7 @@ import ( "k8s.io/api/core/v1" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/features" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" ) @@ -34,8 +35,8 @@ const ( // PodSpecMutatorFunc is a function capable of mutating a PodSpec type PodSpecMutatorFunc func(*v1.PodSpec) -// getDefaultMutators gets the mutator functions that alwasy should be used -func getDefaultMutators() map[string][]PodSpecMutatorFunc { +// GetDefaultMutators gets the mutator functions that alwasy should be used +func GetDefaultMutators() map[string][]PodSpecMutatorFunc { return map[string][]PodSpecMutatorFunc{ kubeadmconstants.KubeAPIServer: { addNodeSelectorToPodSpec, @@ -55,6 +56,22 @@ func getDefaultMutators() map[string][]PodSpecMutatorFunc { } } +// GetMutatorsFromFeatureGates returns all mutators needed based on the feature gates passed +func GetMutatorsFromFeatureGates(featureGates map[string]bool) map[string][]PodSpecMutatorFunc { + // Here the map of different mutators to use for the control plane's podspec is stored + mutators := GetDefaultMutators() + + // Some extra work to be done if we should store the control plane certificates in Secrets + if features.Enabled(featureGates, features.StoreCertsInSecrets) { + + // Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them + mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer) + mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager) + mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler) + } + return mutators +} + // mutatePodSpec makes a Static Pod-hosted PodSpec suitable for self-hosting func mutatePodSpec(mutators map[string][]PodSpecMutatorFunc, name string, podSpec *v1.PodSpec) { // Get the mutator functions for the component in question, then loop through and execute them diff --git a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go index acae768abd..fc19d07edb 100644 --- a/cmd/kubeadm/app/phases/selfhosting/selfhosting.go +++ b/cmd/kubeadm/app/phases/selfhosting/selfhosting.go @@ -60,7 +60,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea waiter.SetTimeout(selfHostingWaitTimeout) // Here the map of different mutators to use for the control plane's podspec is stored - mutators := getDefaultMutators() + mutators := GetMutatorsFromFeatureGates(cfg.FeatureGates) // Some extra work to be done if we should store the control plane certificates in Secrets if features.Enabled(cfg.FeatureGates, features.StoreCertsInSecrets) { @@ -72,10 +72,6 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea if err := uploadKubeConfigSecrets(client, kubeConfigDir); err != nil { return err } - // Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them - mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer) - mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager) - mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler) } for _, componentName := range kubeadmconstants.MasterComponents { @@ -95,7 +91,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea } // Build a DaemonSet object from the loaded PodSpec - ds := buildDaemonSet(componentName, podSpec, mutators) + ds := BuildDaemonSet(componentName, podSpec, mutators) // Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out if err := apiclient.TryRunCommand(func() error { @@ -105,7 +101,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea } // Wait for the self-hosted component to come up - if err := waiter.WaitForPodsWithLabel(buildSelfHostedWorkloadLabelQuery(componentName)); err != nil { + if err := waiter.WaitForPodsWithLabel(BuildSelfHostedComponentLabelQuery(componentName)); err != nil { return err } @@ -132,8 +128,8 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea return nil } -// buildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse -func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet { +// BuildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse +func BuildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet { // Mutate the PodSpec so it's suitable for self-hosting mutatePodSpec(mutators, name, podSpec) @@ -143,19 +139,19 @@ func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodS ObjectMeta: metav1.ObjectMeta{ Name: kubeadmconstants.AddSelfHostedPrefix(name), Namespace: metav1.NamespaceSystem, - Labels: map[string]string{ - "k8s-app": kubeadmconstants.AddSelfHostedPrefix(name), - }, + Labels: BuildSelfhostedComponentLabels(name), }, Spec: extensions.DaemonSetSpec{ Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "k8s-app": kubeadmconstants.AddSelfHostedPrefix(name), - }, + Labels: BuildSelfhostedComponentLabels(name), }, Spec: *podSpec, }, + UpdateStrategy: extensions.DaemonSetUpdateStrategy{ + // Make the DaemonSet utilize the RollingUpdate rollout strategy + Type: extensions.RollingUpdateDaemonSetStrategyType, + }, }, } } @@ -176,7 +172,14 @@ func loadPodSpecFromFile(manifestPath string) (*v1.PodSpec, error) { return &staticPod.Spec, nil } -// buildSelfHostedWorkloadLabelQuery creates the right query for matching a self-hosted Pod -func buildSelfHostedWorkloadLabelQuery(componentName string) string { +// BuildSelfhostedComponentLabels returns the labels for a self-hosted component +func BuildSelfhostedComponentLabels(component string) map[string]string { + return map[string]string{ + "k8s-app": kubeadmconstants.AddSelfHostedPrefix(component), + } +} + +// BuildSelfHostedComponentLabelQuery creates the right query for matching a self-hosted Pod +func BuildSelfHostedComponentLabelQuery(componentName string) string { return fmt.Sprintf("k8s-app=%s", kubeadmconstants.AddSelfHostedPrefix(componentName)) } diff --git a/cmd/kubeadm/app/phases/upgrade/compute.go b/cmd/kubeadm/app/phases/upgrade/compute.go index e81eb93b9d..2b78af9f69 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute.go +++ b/cmd/kubeadm/app/phases/upgrade/compute.go @@ -18,6 +18,10 @@ package upgrade import ( "fmt" + "strings" + + "k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns" + "k8s.io/kubernetes/pkg/util/version" ) // Upgrade defines an upgrade possibility to upgrade from a current version to a new one @@ -57,7 +61,196 @@ type ClusterState struct { // GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which // kinds of upgrades can be performed -func GetAvailableUpgrades(_ VersionGetter, _, _ bool) ([]Upgrade, error) { +func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool) ([]Upgrade, error) { fmt.Println("[upgrade] Fetching available versions to upgrade to:") - return []Upgrade{}, nil + + // Collect the upgrades kubeadm can do in this list + upgrades := []Upgrade{} + + // Get the cluster version + clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion() + if err != nil { + return nil, err + } + + // Get current kubeadm CLI version + kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion() + if err != nil { + return nil, err + } + + // Get and output the current latest stable version + stableVersionStr, stableVersion, err := versionGetterImpl.VersionFromCILabel("stable", "stable version") + if err != nil { + return nil, err + } + + // Get the kubelet versions in the cluster + kubeletVersions, err := versionGetterImpl.KubeletVersions() + if err != nil { + return nil, err + } + + // Construct a descriptor for the current state of the world + beforeState := ClusterState{ + KubeVersion: clusterVersionStr, + DNSVersion: dns.GetKubeDNSVersion(clusterVersion), + KubeadmVersion: kubeadmVersionStr, + KubeletVersions: kubeletVersions, + } + + // Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version + // This guess will be corrected once we know if there is a patch version available + canDoMinorUpgrade := clusterVersion.LessThan(stableVersion) + + // A patch version doesn't exist if the cluster version is higher than or equal to the current stable version + // in the case that a user is trying to upgrade from, let's say, v1.8.0-beta.2 to v1.8.0-rc.1 (given we support such upgrades experimentally) + // a stable-1.8 branch doesn't exist yet. Hence this check. + if patchVersionBranchExists(clusterVersion, stableVersion) { + + currentBranch := getBranchFromVersion(clusterVersionStr) + versionLabel := fmt.Sprintf("stable-%s", currentBranch) + description := fmt.Sprintf("version in the v%s series", currentBranch) + + // Get and output the latest patch version for the cluster branch + patchVersionStr, patchVersion, err := versionGetterImpl.VersionFromCILabel(versionLabel, description) + if err != nil { + return nil, err + } + + // Check if a minor version upgrade is possible when a patch release exists + // It's only possible if the latest patch version is higher than the current patch version + // If that's the case, they must be on different branches => a newer minor version can be upgraded to + canDoMinorUpgrade = minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion) + + // If the cluster version is lower than the newest patch version, we should inform about the possible upgrade + if patchUpgradePossible(clusterVersion, patchVersion) { + + // The kubeadm version has to be upgraded to the latest patch version + newKubeadmVer := patchVersionStr + if kubeadmVersion.AtLeast(patchVersion) { + // In this case, the kubeadm CLI version is new enough. Don't display an update suggestion for kubeadm by making .NewKubeadmVersion equal .CurrentKubeadmVersion + newKubeadmVer = kubeadmVersionStr + } + + upgrades = append(upgrades, Upgrade{ + Description: description, + Before: beforeState, + After: ClusterState{ + KubeVersion: patchVersionStr, + DNSVersion: dns.GetKubeDNSVersion(patchVersion), + KubeadmVersion: newKubeadmVer, + // KubeletVersions is unset here as it is not used anywhere in .After + }, + }) + } + } + + if canDoMinorUpgrade { + upgrades = append(upgrades, Upgrade{ + Description: "stable version", + Before: beforeState, + After: ClusterState{ + KubeVersion: stableVersionStr, + DNSVersion: dns.GetKubeDNSVersion(stableVersion), + KubeadmVersion: stableVersionStr, + // KubeletVersions is unset here as it is not used anywhere in .After + }, + }) + } + + if experimentalUpgradesAllowed || rcUpgradesAllowed { + // dl.k8s.io/release/latest.txt is ALWAYS an alpha.X version + // dl.k8s.io/release/latest-1.X.txt is first v1.X.0-alpha.0 -> v1.X.0-alpha.Y, then v1.X.0-beta.0 to v1.X.0-beta.Z, then v1.X.0-rc.1 to v1.X.0-rc.W. + // After the v1.X.0 release, latest-1.X.txt is always a beta.0 version. Let's say the latest stable version on the v1.7 branch is v1.7.3, then the + // latest-1.7 version is v1.7.4-beta.0 + + // Worth noticing is that when the release-1.X branch is cut; there are two versions tagged: v1.X.0-beta.0 AND v1.(X+1).alpha.0 + // The v1.(X+1).alpha.0 is pretty much useless and should just be ignored, as more betas may be released that have more features than the initial v1.(X+1).alpha.0 + + // So what we do below is getting the latest overall version, always an v1.X.0-alpha.Y version. Then we get latest-1.(X-1) version. This version may be anything + // between v1.(X-1).0-beta.0 and v1.(X-1).Z-beta.0. At some point in time, latest-1.(X-1) will point to v1.(X-1).0-rc.1. Then we should show it. + + // The flow looks like this (with time on the X axis): + // v1.8.0-alpha.1 -> v1.8.0-alpha.2 -> v1.8.0-alpha.3 | release-1.8 branch | v1.8.0-beta.0 -> v1.8.0-beta.1 -> v1.8.0-beta.2 -> v1.8.0-rc.1 -> v1.8.0 -> v1.8.1 + // v1.9.0-alpha.0 -> v1.9.0-alpha.1 -> v1.9.0-alpha.2 + + // Get and output the current latest unstable version + latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version") + if err != nil { + return nil, err + } + + minorUnstable := latestVersion.Components()[1] + // Get and output the current latest unstable version + previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1) + previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "") + if err != nil { + return nil, err + } + + // If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade + if rcUpgradesAllowed && rcUpgradePossible(clusterVersion, previousBranchLatestVersion) { + upgrades = append(upgrades, Upgrade{ + Description: "release candidate version", + Before: beforeState, + After: ClusterState{ + KubeVersion: previousBranchLatestVersionStr, + DNSVersion: dns.GetKubeDNSVersion(previousBranchLatestVersion), + KubeadmVersion: previousBranchLatestVersionStr, + // KubeletVersions is unset here as it is not used anywhere in .After + }, + }) + } + + // Show the possibility if experimental upgrades are allowed + if experimentalUpgradesAllowed && clusterVersion.LessThan(latestVersion) { + + // Default to assume that the experimental version to show is the unstable one + unstableKubeVersion := latestVersionStr + unstableKubeDNSVersion := dns.GetKubeDNSVersion(latestVersion) + + // Ẃe should not display alpha.0. The previous branch's beta/rc versions are more relevant due how the kube branching process works. + if latestVersion.PreRelease() == "alpha.0" { + unstableKubeVersion = previousBranchLatestVersionStr + unstableKubeDNSVersion = dns.GetKubeDNSVersion(previousBranchLatestVersion) + } + + upgrades = append(upgrades, Upgrade{ + Description: "experimental version", + Before: beforeState, + After: ClusterState{ + KubeVersion: unstableKubeVersion, + DNSVersion: unstableKubeDNSVersion, + KubeadmVersion: unstableKubeVersion, + // KubeletVersions is unset here as it is not used anywhere in .After + }, + }) + } + } + + // Add a newline in the end of this output to leave some space to the next output section + fmt.Println("") + + return upgrades, nil +} + +func getBranchFromVersion(version string) string { + return strings.TrimPrefix(version, "v")[:3] +} + +func patchVersionBranchExists(clusterVersion, stableVersion *version.Version) bool { + return stableVersion.AtLeast(clusterVersion) +} + +func patchUpgradePossible(clusterVersion, patchVersion *version.Version) bool { + return clusterVersion.LessThan(patchVersion) +} + +func rcUpgradePossible(clusterVersion, previousBranchLatestVersion *version.Version) bool { + return strings.HasPrefix(previousBranchLatestVersion.PreRelease(), "rc") && clusterVersion.LessThan(previousBranchLatestVersion) +} + +func minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion *version.Version) bool { + return patchVersion.LessThan(stableVersion) } diff --git a/cmd/kubeadm/app/phases/upgrade/configuration.go b/cmd/kubeadm/app/phases/upgrade/configuration.go index a91e4140a8..a00dafd1a9 100644 --- a/cmd/kubeadm/app/phases/upgrade/configuration.go +++ b/cmd/kubeadm/app/phases/upgrade/configuration.go @@ -19,18 +19,89 @@ package upgrade import ( "fmt" "io" + "io/ioutil" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" + kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" + "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" + configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config" "k8s.io/kubernetes/pkg/api" ) // FetchConfiguration fetches configuration required for upgrading your cluster from a file (which has precedence) or a ConfigMap in the cluster -func FetchConfiguration(_ clientset.Interface, _ io.Writer, _ string) (*kubeadmapiext.MasterConfiguration, error) { +func FetchConfiguration(client clientset.Interface, w io.Writer, cfgPath string) (*kubeadmapiext.MasterConfiguration, error) { fmt.Println("[upgrade/config] Making sure the configuration is correct:") - cfg := &kubeadmapiext.MasterConfiguration{} - api.Scheme.Default(cfg) + // Load the configuration from a file or the cluster + configBytes, err := loadConfigurationBytes(client, w, cfgPath) + if err != nil { + return nil, err + } - return cfg, nil + // Take the versioned configuration populated from the configmap, default it and validate + // Return the internal version of the API object + versionedcfg, err := bytesToValidatedMasterConfig(configBytes) + if err != nil { + return nil, fmt.Errorf("could not decode configuration: %v", err) + } + return versionedcfg, nil +} + +// loadConfigurationBytes loads the configuration byte slice from either a file or the cluster ConfigMap +func loadConfigurationBytes(client clientset.Interface, w io.Writer, cfgPath string) ([]byte, error) { + if cfgPath != "" { + fmt.Printf("[upgrade/config] Reading configuration options from a file: %s\n", cfgPath) + return ioutil.ReadFile(cfgPath) + } + + fmt.Println("[upgrade/config] Reading configuration from the cluster...") + + configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(constants.MasterConfigurationConfigMap, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + fmt.Printf("[upgrade/config] In order to upgrade, a ConfigMap called %q in the %s namespace must exist.\n", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem) + fmt.Println("[upgrade/config] Without this information, 'kubeadm upgrade' don't how to configure your upgraded cluster.") + fmt.Println("") + fmt.Println("[upgrade/config] Next steps:") + fmt.Printf("\t- OPTION 1: Run 'kubeadm config upload from-flags' and specify the same CLI arguments you passed to 'kubeadm init' when you created your master.\n") + fmt.Printf("\t- OPTION 2: Run 'kubeadm config upload from-file' and specify the same config file you passed to 'kubeadm init' when you created your master.\n") + fmt.Printf("\t- OPTION 3: Pass a config file to 'kubeadm upgrade' using the --config flag.\n") + fmt.Println("") + return []byte{}, fmt.Errorf("the ConfigMap %q in the %s namespace used for getting configuration information was not found", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem) + } else if err != nil { + return []byte{}, fmt.Errorf("an unexpected error happened when trying to get the ConfigMap %q in the %s namespace: %v", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem, err) + } + + fmt.Printf("[upgrade/config] FYI: You can look at this config file with 'kubectl -n %s get cm %s -oyaml'\n", metav1.NamespaceSystem, constants.MasterConfigurationConfigMap) + return []byte(configMap.Data[constants.MasterConfigurationConfigMapKey]), nil +} + +// bytesToValidatedMasterConfig converts a byte array to an external, defaulted and validated configuration object +func bytesToValidatedMasterConfig(b []byte) (*kubeadmapiext.MasterConfiguration, error) { + cfg := &kubeadmapiext.MasterConfiguration{} + finalCfg := &kubeadmapiext.MasterConfiguration{} + internalcfg := &kubeadmapi.MasterConfiguration{} + + if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), b, cfg); err != nil { + return nil, fmt.Errorf("unable to decode config from bytes: %v", err) + } + // Default and convert to the internal version + api.Scheme.Default(cfg) + api.Scheme.Convert(cfg, internalcfg, nil) + + // Applies dynamic defaults to settings not provided with flags + if err := configutil.SetInitDynamicDefaults(internalcfg); err != nil { + return nil, err + } + // Validates cfg (flags/configs + defaults + dynamic defaults) + if err := validation.ValidateMasterConfiguration(internalcfg).ToAggregate(); err != nil { + return nil, err + } + // Finally converts back to the external version + api.Scheme.Convert(internalcfg, finalCfg, nil) + return finalCfg, nil } diff --git a/cmd/kubeadm/app/phases/upgrade/health.go b/cmd/kubeadm/app/phases/upgrade/health.go index 1beac03999..466c720af3 100644 --- a/cmd/kubeadm/app/phases/upgrade/health.go +++ b/cmd/kubeadm/app/phases/upgrade/health.go @@ -17,20 +17,190 @@ limitations under the License. package upgrade import ( + "fmt" + "net/http" + "os" + + "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" ) +// healthCheck is a helper struct for easily performing healthchecks against the cluster and printing the output +type healthCheck struct { + description, okMessage, failMessage string + // f is invoked with a k8s client passed to it. Should return an optional warning and/or an error + f func(clientset.Interface) error +} + // CheckClusterHealth makes sure: // - the API /healthz endpoint is healthy // - all Nodes are Ready // - (if self-hosted) that there are DaemonSets with at least one Pod for all control plane components // - (if static pod-hosted) that all required Static Pod manifests exist on disk -func CheckClusterHealth(_ clientset.Interface) error { +func CheckClusterHealth(client clientset.Interface) error { + fmt.Println("[upgrade] Making sure the cluster is healthy:") + + healthChecks := []healthCheck{ + { + description: "API Server health", + okMessage: "Healthy", + failMessage: "Unhealthy", + f: apiServerHealthy, + }, + { + description: "Node health", + okMessage: "All Nodes are healthy", + failMessage: "More than one Node unhealthy", + f: nodesHealthy, + }, + // TODO: Add a check for ComponentStatuses here? + } + + // Run slightly different health checks depending on control plane hosting type + if IsControlPlaneSelfHosted(client) { + healthChecks = append(healthChecks, healthCheck{ + description: "Control plane DaemonSet health", + okMessage: "All control plane DaemonSets are healthy", + failMessage: "More than one control plane DaemonSet unhealthy", + f: controlPlaneHealth, + }) + } else { + healthChecks = append(healthChecks, healthCheck{ + description: "Static Pod manifests exists on disk", + okMessage: "All manifests exist on disk", + failMessage: "Some manifests don't exist on disk", + f: staticPodManifestHealth, + }) + } + + return runHealthChecks(client, healthChecks) +} + +// runHealthChecks runs a set of health checks against the cluster +func runHealthChecks(client clientset.Interface, healthChecks []healthCheck) error { + for _, check := range healthChecks { + + err := check.f(client) + if err != nil { + fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.failMessage) + return fmt.Errorf("The cluster is not in an upgradeable state due to: %v", err) + } + fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.okMessage) + } return nil } -// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not -func IsControlPlaneSelfHosted(_ clientset.Interface) bool { - // No-op for now - return false +// apiServerHealthy checks whether the API server's /healthz endpoint is healthy +func apiServerHealthy(client clientset.Interface) error { + healthStatus := 0 + + // If client.Discovery().RESTClient() is nil, the fake client is used, and that means we are dry-running. Just proceed + if client.Discovery().RESTClient() == nil { + return nil + } + client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus) + if healthStatus != http.StatusOK { + return fmt.Errorf("the API Server is unhealthy; /healthz didn't return %q", "ok") + } + return nil +} + +// nodesHealthy checks whether all Nodes in the cluster are in the Running state +func nodesHealthy(client clientset.Interface) error { + nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("couldn't list all nodes in cluster: %v", err) + } + + notReadyNodes := getNotReadyNodes(nodes.Items) + if len(notReadyNodes) != 0 { + return fmt.Errorf("there are NotReady Nodes in the cluster: %v", notReadyNodes) + } + return nil +} + +// controlPlaneHealth ensures all control plane DaemonSets are healthy +func controlPlaneHealth(client clientset.Interface) error { + notReadyDaemonSets, err := getNotReadyDaemonSets(client) + if err != nil { + return err + } + + if len(notReadyDaemonSets) != 0 { + return fmt.Errorf("there are control plane DaemonSets in the cluster that are not ready: %v", notReadyDaemonSets) + } + return nil +} + +// staticPodManifestHealth makes sure the required static pods are presents +func staticPodManifestHealth(_ clientset.Interface) error { + nonExistentManifests := []string{} + for _, component := range constants.MasterComponents { + manifestFile := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory()) + if _, err := os.Stat(manifestFile); os.IsNotExist(err) { + nonExistentManifests = append(nonExistentManifests, manifestFile) + } + } + if len(nonExistentManifests) == 0 { + return nil + } + return fmt.Errorf("The control plane seems to be Static Pod-hosted, but some of the manifests don't seem to exist on disk. This probably means you're running 'kubeadm upgrade' on a remote machine, which is not supported for a Static Pod-hosted cluster. Manifest files not found: %v", nonExistentManifests) +} + +// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not +func IsControlPlaneSelfHosted(client clientset.Interface) bool { + notReadyDaemonSets, err := getNotReadyDaemonSets(client) + if err != nil { + return false + } + + // If there are no NotReady DaemonSets, we are using self-hosting + return len(notReadyDaemonSets) == 0 +} + +// getNotReadyDaemonSets gets the amount of Ready control plane DaemonSets +func getNotReadyDaemonSets(client clientset.Interface) ([]error, error) { + notReadyDaemonSets := []error{} + for _, component := range constants.MasterComponents { + dsName := constants.AddSelfHostedPrefix(component) + ds, err := client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("couldn't get daemonset %q in the %s namespace", dsName, metav1.NamespaceSystem) + } + + if err := daemonSetHealth(&ds.Status); err != nil { + notReadyDaemonSets = append(notReadyDaemonSets, fmt.Errorf("DaemonSet %q not healthy: %v", dsName, err)) + } + } + return notReadyDaemonSets, nil +} + +// daemonSetHealth is a helper function for getting the health of a DaemonSet's status +func daemonSetHealth(dsStatus *extensions.DaemonSetStatus) error { + if dsStatus.CurrentNumberScheduled != dsStatus.DesiredNumberScheduled { + return fmt.Errorf("current number of scheduled Pods ('%d') doesn't match the amount of desired Pods ('%d')", dsStatus.CurrentNumberScheduled, dsStatus.DesiredNumberScheduled) + } + if dsStatus.NumberAvailable == 0 { + return fmt.Errorf("no available Pods for DaemonSet") + } + if dsStatus.NumberReady == 0 { + return fmt.Errorf("no ready Pods for DaemonSet") + } + return nil +} + +// getNotReadyNodes returns a string slice of nodes in the cluster that are NotReady +func getNotReadyNodes(nodes []v1.Node) []string { + notReadyNodes := []string{} + for _, node := range nodes { + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue { + notReadyNodes = append(notReadyNodes, node.ObjectMeta.Name) + } + } + } + return notReadyNodes } diff --git a/cmd/kubeadm/app/phases/upgrade/policy.go b/cmd/kubeadm/app/phases/upgrade/policy.go index 6b06a4116e..0e3b13223c 100644 --- a/cmd/kubeadm/app/phases/upgrade/policy.go +++ b/cmd/kubeadm/app/phases/upgrade/policy.go @@ -17,9 +17,21 @@ limitations under the License. package upgrade import ( + "fmt" + "strings" + + "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/pkg/util/version" ) +const ( + // MaximumAllowedMinorVersionUpgradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go + MaximumAllowedMinorVersionUpgradeSkew = 1 + + // MaximumAllowedMinorVersionKubeletSkew describes how many minor versions the control plane version and the kubelet can skew in a kubeadm cluster + MaximumAllowedMinorVersionKubeletSkew = 1 +) + // VersionSkewPolicyErrors describes version skew errors that might be seen during the validation process in EnforceVersionPolicies type VersionSkewPolicyErrors struct { Mandatory []error @@ -27,7 +39,120 @@ type VersionSkewPolicyErrors struct { } // EnforceVersionPolicies enforces that the proposed new version is compatible with all the different version skew policies -func EnforceVersionPolicies(_ VersionGetter, _ string, _ *version.Version, _, _ bool) *VersionSkewPolicyErrors { - // No-op now and return no skew errors - return nil +func EnforceVersionPolicies(versionGetter VersionGetter, newK8sVersionStr string, newK8sVersion *version.Version, allowExperimentalUpgrades, allowRCUpgrades bool) *VersionSkewPolicyErrors { + + skewErrors := &VersionSkewPolicyErrors{ + Mandatory: []error{}, + Skippable: []error{}, + } + + clusterVersionStr, clusterVersion, err := versionGetter.ClusterVersion() + if err != nil { + // This case can't be forced: kubeadm has to be able to lookup cluster version for upgrades to work + skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch cluster version: %v", err)) + return skewErrors + } + + kubeadmVersionStr, kubeadmVersion, err := versionGetter.KubeadmVersion() + if err != nil { + // This case can't be forced: kubeadm has to be able to lookup its version for upgrades to work + skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch kubeadm version: %v", err)) + return skewErrors + } + + kubeletVersions, err := versionGetter.KubeletVersions() + if err != nil { + // This is a non-critical error; continue although kubeadm couldn't look this up + skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Unable to fetch kubeadm version: %v", err)) + } + + // Make sure the new version is a supported version (higher than the minimum one supported) + if constants.MinimumControlPlaneVersion.AtLeast(newK8sVersion) { + // This must not happen, kubeadm always supports a minimum version; and we can't go below that + skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the minimum supported version %q. Please specify a higher version to upgrade to", newK8sVersionStr, clusterVersionStr)) + } + + // Make sure new version is higher than the current Kubernetes version + if clusterVersion.AtLeast(newK8sVersion) { + // Even though we don't officially support downgrades, it "should work", and if user(s) need it and are willing to try; they can do so with --force + skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the cluster version %q. Downgrades are not supported yet", newK8sVersionStr, clusterVersionStr)) + } else { + // If this code path runs, it's an upgrade (this code will run most of the time) + // kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported. Enforce that here + if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew { + skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew)) + } + } + + // If the kubeadm version is lower than what we want to upgrade to; error + if kubeadmVersion.LessThan(newK8sVersion) { + if newK8sVersion.Minor() > kubeadmVersion.Minor() { + // This is totally unsupported; kubeadm has no idea how it should handle a newer minor release than itself + skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor())) + } else { + // Upgrading to a higher patch version than kubeadm is ok if the user specifies --force. Not recommended, but possible. + skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is higher than the kubeadm version %q. Upgrade kubeadm first using the tool you used to install kubeadm", newK8sVersionStr, kubeadmVersionStr)) + } + } + + // Detect if the version is unstable and the user didn't allow that + if err = detectUnstableVersionError(newK8sVersion, newK8sVersionStr, allowExperimentalUpgrades, allowRCUpgrades); err != nil { + skewErrors.Skippable = append(skewErrors.Skippable, err) + } + + // Detect if there are too old kubelets in the cluster + // Check for nil here since this is the only case where kubeletVersions can be nil; if KubeletVersions() returned an error + // However, it's okay to skip that check + if kubeletVersions != nil { + if err = detectTooOldKubelets(newK8sVersion, kubeletVersions); err != nil { + skewErrors.Skippable = append(skewErrors.Skippable, err) + } + } + + // If we did not see any errors, return nil + if len(skewErrors.Skippable) == 0 && len(skewErrors.Mandatory) == 0 { + return nil + } + + // Uh oh, we encountered one or more errors, return them + return skewErrors +} + +// detectUnstableVersionError is a helper function for detecting if the unstable version (if specified) is allowed to be used +func detectUnstableVersionError(newK8sVersion *version.Version, newK8sVersionStr string, allowExperimentalUpgrades, allowRCUpgrades bool) error { + // Short-circuit quickly if this is not an unstable version + if len(newK8sVersion.PreRelease()) == 0 { + return nil + } + // If the user has specified that unstable versions are fine, then no error should be returned + if allowExperimentalUpgrades { + return nil + } + // If this is a release candidate and we allow such ones, everything's fine + if strings.HasPrefix(newK8sVersion.PreRelease(), "rc") && allowRCUpgrades { + return nil + } + + return fmt.Errorf("Specified version to upgrade to %q is an unstable version and such upgrades weren't allowed via setting the --allow-*-upgrades flags", newK8sVersionStr) +} + +// detectTooOldKubelets errors out if the kubelet versions are so old that an unsupported skew would happen if the cluster was upgraded +func detectTooOldKubelets(newK8sVersion *version.Version, kubeletVersions map[string]uint16) error { + tooOldKubeletVersions := []string{} + for versionStr := range kubeletVersions { + + kubeletVersion, err := version.ParseSemantic(versionStr) + if err != nil { + return fmt.Errorf("couldn't parse kubelet version %s", versionStr) + } + + if newK8sVersion.Minor() > kubeletVersion.Minor()+MaximumAllowedMinorVersionKubeletSkew { + tooOldKubeletVersions = append(tooOldKubeletVersions, versionStr) + } + } + if len(tooOldKubeletVersions) == 0 { + return nil + } + + return fmt.Errorf("There are kubelets in this cluster that are too old that have these versions %v", tooOldKubeletVersions) } diff --git a/cmd/kubeadm/app/phases/upgrade/postupgrade.go b/cmd/kubeadm/app/phases/upgrade/postupgrade.go index 3510caa57a..1b19d74925 100644 --- a/cmd/kubeadm/app/phases/upgrade/postupgrade.go +++ b/cmd/kubeadm/app/phases/upgrade/postupgrade.go @@ -17,14 +17,60 @@ limitations under the License. package upgrade import ( + "k8s.io/apimachinery/pkg/util/errors" clientset "k8s.io/client-go/kubernetes" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/proxy" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/apiconfig" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo" + nodebootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig" "k8s.io/kubernetes/pkg/util/version" ) // PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do // Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade -func PerformPostUpgradeTasks(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error { - // No-op; don't do anything here yet - return nil +func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error { + errs := []error{} + + // Upload currently used configuration to the cluster + // Note: This is done right in the beginning of cluster initialization; as we might want to make other phases + // depend on centralized information from this source in the future + if err := uploadconfig.UploadConfiguration(cfg, client); err != nil { + errs = append(errs, err) + } + + // Create/update RBAC rules that makes the bootstrap tokens able to post CSRs + if err := nodebootstraptoken.AllowBootstrapTokensToPostCSRs(client); err != nil { + errs = append(errs, err) + } + // Create/update RBAC rules that makes the bootstrap tokens able to get their CSRs approved automatically + if err := nodebootstraptoken.AutoApproveNodeBootstrapTokens(client, k8sVersion); err != nil { + errs = append(errs, err) + } + + // TODO: Is this needed to do here? I think that updating cluster info should probably be separate from a normal upgrade + // Create the cluster-info ConfigMap with the associated RBAC rules + // if err := clusterinfo.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil { + // return err + //} + // Create/update RBAC rules that makes the cluster-info ConfigMap reachable + if err := clusterinfo.CreateClusterInfoRBACRules(client); err != nil { + errs = append(errs, err) + } + + // TODO: This call is deprecated + if err := apiconfig.CreateRBACRules(client, k8sVersion); err != nil { + errs = append(errs, err) + } + + // Upgrade kube-dns and kube-proxy + if err := dns.EnsureDNSAddon(cfg, client, k8sVersion); err != nil { + errs = append(errs, err) + } + if err := proxy.EnsureProxyAddon(cfg, client); err != nil { + errs = append(errs, err) + } + return errors.NewAggregate(errs) } diff --git a/cmd/kubeadm/app/phases/upgrade/prepull.go b/cmd/kubeadm/app/phases/upgrade/prepull.go new file mode 100644 index 0000000000..30d9f7d3c5 --- /dev/null +++ b/cmd/kubeadm/app/phases/upgrade/prepull.go @@ -0,0 +1,180 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "fmt" + "time" + + "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/images" + "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" +) + +const ( + prepullPrefix = "upgrade-prepull-" +) + +// Prepuller defines an interface for performing a prepull operation in a create-wait-delete fashion in parallel +type Prepuller interface { + CreateFunc(string) error + WaitFunc(string) + DeleteFunc(string) error +} + +// DaemonSetPrepuller makes sure the control plane images are availble on all masters +type DaemonSetPrepuller struct { + client clientset.Interface + cfg *kubeadmapi.MasterConfiguration + waiter apiclient.Waiter +} + +// NewDaemonSetPrepuller creates a new instance of the DaemonSetPrepuller struct +func NewDaemonSetPrepuller(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration) *DaemonSetPrepuller { + return &DaemonSetPrepuller{ + client: client, + cfg: cfg, + waiter: waiter, + } +} + +// CreateFunc creates a DaemonSet for making the image available on every relevant node +func (d *DaemonSetPrepuller) CreateFunc(component string) error { + image := images.GetCoreImage(component, d.cfg.ImageRepository, d.cfg.KubernetesVersion, d.cfg.UnifiedControlPlaneImage) + ds := buildPrePullDaemonSet(component, image) + + // Create the DaemonSet in the API Server + if err := apiclient.CreateOrUpdateDaemonSet(d.client, ds); err != nil { + return fmt.Errorf("unable to create a DaemonSet for prepulling the component %q: %v", component, err) + } + return nil +} + +// WaitFunc waits for all Pods in the specified DaemonSet to be in the Running state +func (d *DaemonSetPrepuller) WaitFunc(component string) { + fmt.Printf("[upgrade/prepull] Prepulling image for component %s.\n", component) + d.waiter.WaitForPodsWithLabel("k8s-app=upgrade-prepull-" + component) +} + +// DeleteFunc deletes the DaemonSet used for making the image available on every relevant node +func (d *DaemonSetPrepuller) DeleteFunc(component string) error { + dsName := addPrepullPrefix(component) + if err := apiclient.DeleteDaemonSetForeground(d.client, metav1.NamespaceSystem, dsName); err != nil { + return fmt.Errorf("unable to cleanup the DaemonSet used for prepulling %s: %v", component, err) + } + fmt.Printf("[upgrade/prepull] Prepulled image for component %s.\n", component) + return nil +} + +// PrepullImagesInParallel creates DaemonSets synchronously but waits in parallel for the images to pull +func PrepullImagesInParallel(kubePrepuller Prepuller, timeout time.Duration) error { + componentsToPrepull := constants.MasterComponents + fmt.Printf("[upgrade/prepull] Will prepull images for components %v\n", componentsToPrepull) + + timeoutChan := time.After(timeout) + + // Synchronously create the DaemonSets + for _, component := range componentsToPrepull { + if err := kubePrepuller.CreateFunc(component); err != nil { + return err + } + } + + // Create a channel for streaming data from goroutines that run in parallell to a blocking for loop that cleans up + prePulledChan := make(chan string, len(componentsToPrepull)) + for _, component := range componentsToPrepull { + go func(c string) { + // Wait as long as needed. This WaitFunc call should be blocking until completetion + kubePrepuller.WaitFunc(c) + // When the task is done, go ahead and cleanup by sending the name to the channel + prePulledChan <- c + }(component) + } + + // This call blocks until all expected messages are received from the channel or errors out if timeoutChan fires. + // For every successful wait, kubePrepuller.DeleteFunc is executed + if err := waitForItemsFromChan(timeoutChan, prePulledChan, len(componentsToPrepull), kubePrepuller.DeleteFunc); err != nil { + return err + } + + fmt.Println("[upgrade/prepull] Successfully prepulled the images for all the control plane components") + return nil +} + +// waitForItemsFromChan waits for n elements from stringChan with a timeout. For every item received from stringChan, cleanupFunc is executed +func waitForItemsFromChan(timeoutChan <-chan time.Time, stringChan chan string, n int, cleanupFunc func(string) error) error { + i := 0 + for { + select { + case <-timeoutChan: + return fmt.Errorf("The prepull operation timed out") + case result := <-stringChan: + i++ + // If the cleanup function errors; error here as well + if err := cleanupFunc(result); err != nil { + return err + } + if i == n { + return nil + } + } + } +} + +// addPrepullPrefix adds the prepull prefix for this functionality; can be used in names, labels, etc. +func addPrepullPrefix(component string) string { + return fmt.Sprintf("%s%s", prepullPrefix, component) +} + +// buildPrePullDaemonSet builds the DaemonSet that ensures the control plane image is available +func buildPrePullDaemonSet(component, image string) *extensions.DaemonSet { + var gracePeriodSecs int64 + return &extensions.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: addPrepullPrefix(component), + Namespace: metav1.NamespaceSystem, + }, + Spec: extensions.DaemonSetSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "k8s-app": addPrepullPrefix(component), + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: component, + Image: image, + Command: []string{"/bin/sleep", "3600"}, + }, + }, + NodeSelector: map[string]string{ + constants.LabelNodeRoleMaster: "", + }, + Tolerations: []v1.Toleration{constants.MasterToleration}, + TerminationGracePeriodSeconds: &gracePeriodSecs, + }, + }, + }, + } +} diff --git a/cmd/kubeadm/app/phases/upgrade/selfhosted.go b/cmd/kubeadm/app/phases/upgrade/selfhosted.go new file mode 100644 index 0000000000..8f849dff17 --- /dev/null +++ b/cmd/kubeadm/app/phases/upgrade/selfhosted.go @@ -0,0 +1,272 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package upgrade + +import ( + "fmt" + "time" + + "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting" + "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" + "k8s.io/kubernetes/pkg/util/version" +) + +const ( + // upgradeTempDSPrefix is the prefix added to the temporary DaemonSet's name used during the upgrade + upgradeTempDSPrefix = "temp-upgrade-" + + // upgradeTempLabel is the label key used for identifying the temporary component's DaemonSet + upgradeTempLabel = "temp-upgrade-component" + + // selfHostingWaitTimeout describes the maximum amount of time a self-hosting wait process should wait before timing out + selfHostingWaitTimeout = 2 * time.Minute + + // selfHostingFailureThreshold describes how many times kubeadm will retry creating the DaemonSets + selfHostingFailureThreshold uint8 = 10 +) + +// controlPlaneComponentResources holds the relevant Pod and DaemonSet associated with a control plane component +type controlPlaneComponentResources struct { + pod *v1.Pod + daemonSet *extensions.DaemonSet +} + +// SelfHostedControlPlane upgrades a self-hosted control plane +// It works as follows: +// - The client gets the currently running DaemonSets and their associated Pods used for self-hosting the control plane +// - A temporary DaemonSet for the component in question is created; but nearly identical to the DaemonSet for the self-hosted component running right now +// - Why use this temporary DaemonSet? Because, the RollingUpdate strategy for upgrading DaemonSets first kills the old Pod, and then adds the new one +// - This doesn't work for self-hosted upgrades, as if you remove the only API server for instance you have in the cluster, the cluster essentially goes down +// - So instead, a nearly identical copy of the pre-upgrade DaemonSet is created and applied to the cluster. In the beginning, this duplicate DS is just idle +// - kubeadm waits for the temporary DaemonSet's Pod to become Running +// - kubeadm updates the real, self-hosted component. This will result in the pre-upgrade component Pod being removed from the cluster +// - Luckily, the temporary, backup DaemonSet now kicks in and takes over and acts as the control plane. It recognizes that a new Pod should be created, +// - as the "real" DaemonSet is being updated. +// - kubeadm waits for the pre-upgrade Pod to become deleted. It now takes advantage of the backup/temporary component +// - kubeadm waits for the new, upgraded DaemonSet to become Running. +// - Now that the new, upgraded DaemonSet is Running, we can delete the backup/temporary DaemonSet +// - Lastly, make sure the API /healthz endpoint still is reachable +// +// TL;DR; This is what the flow looks like in pseudo-code: +// for [kube-apiserver, kube-controller-manager, kube-scheduler], do: +// 1. Self-Hosted component v1 Running +// -> Duplicate the DaemonSet manifest +// 2. Self-Hosted component v1 Running (active). Backup component v1 Running (passive) +// -> Upgrade the Self-Hosted component v1 to v2. +// -> Self-Hosted component v1 is Deleted from the cluster +// 3. Backup component v1 Running becomes active and completes the upgrade by creating the Self-Hosted component v2 Pod (passive) +// -> Wait for Self-Hosted component v2 to become Running +// 4. Backup component v1 Running (active). Self-Hosted component v2 Running (passive) +// -> Backup component v1 is Deleted +// 5. Wait for Self-Hosted component v2 Running to become active +// 6. Repeat for all control plane components +func SelfHostedControlPlane(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error { + + // Adjust the timeout slightly to something self-hosting specific + waiter.SetTimeout(selfHostingWaitTimeout) + + // This function returns a map of DaemonSet objects ready to post to the API server + newControlPlaneDaemonSets := BuildUpgradedDaemonSetsFromConfig(cfg, k8sVersion) + + controlPlaneResources, err := getCurrentControlPlaneComponentResources(client) + if err != nil { + return err + } + + for _, component := range constants.MasterComponents { + // Make a shallow copy of the current DaemonSet in order to create a new, temporary one + tempDS := *controlPlaneResources[component].daemonSet + + // Mutate the temp daemonset a little to be suitable for this usage (change label selectors, etc) + mutateTempDaemonSet(&tempDS, component) + + // Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out + if err := apiclient.TryRunCommand(func() error { + return apiclient.CreateOrUpdateDaemonSet(client, &tempDS) + }, selfHostingFailureThreshold); err != nil { + return err + } + + // Wait for the temporary/backup self-hosted component to come up + if err := waiter.WaitForPodsWithLabel(buildTempUpgradeDSLabelQuery(component)); err != nil { + return err + } + + newDS := newControlPlaneDaemonSets[component] + + // Upgrade the component's self-hosted resource + // During this upgrade; the temporary/backup component will take over + if err := apiclient.TryRunCommand(func() error { + + if _, err := client.ExtensionsV1beta1().DaemonSets(newDS.ObjectMeta.Namespace).Update(newDS); err != nil { + return fmt.Errorf("couldn't update self-hosted component's DaemonSet: %v", err) + } + return nil + }, selfHostingFailureThreshold); err != nil { + return err + } + + // Wait for the component's old Pod to disappear + oldPod := controlPlaneResources[component].pod + if err := waiter.WaitForPodToDisappear(oldPod.ObjectMeta.Name); err != nil { + return err + } + + // Wait for the main, upgraded self-hosted component to come up + // Here we're talking to the temporary/backup component; the upgraded component is in the process of starting up + if err := waiter.WaitForPodsWithLabel(selfhosting.BuildSelfHostedComponentLabelQuery(component)); err != nil { + return err + } + + // Delete the temporary DaemonSet, and retry selfHostingFailureThreshold times if it errors out + // In order to pivot back to the upgraded API server, we kill the temporary/backup component + if err := apiclient.TryRunCommand(func() error { + return apiclient.DeleteDaemonSetForeground(client, tempDS.ObjectMeta.Namespace, tempDS.ObjectMeta.Name) + }, selfHostingFailureThreshold); err != nil { + return err + } + + // Just as an extra safety check; make sure the API server is returning ok at the /healthz endpoint + if err := waiter.WaitForAPI(); err != nil { + return err + } + + fmt.Printf("[upgrade/apply] Self-hosted component %q upgraded successfully!\n", component) + } + return nil +} + +// BuildUpgradedDaemonSetsFromConfig takes a config object and the current version and returns the DaemonSet objects to post to the master +func BuildUpgradedDaemonSetsFromConfig(cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) map[string]*extensions.DaemonSet { + // Here the map of different mutators to use for the control plane's podspec is stored + mutators := selfhosting.GetMutatorsFromFeatureGates(cfg.FeatureGates) + // Get the new PodSpecs to use + controlPlanePods := controlplane.GetStaticPodSpecs(cfg, k8sVersion) + // Store the created DaemonSets in this map + controlPlaneDaemonSets := map[string]*extensions.DaemonSet{} + + for _, component := range constants.MasterComponents { + podSpec := controlPlanePods[component].Spec + + // Build the full DaemonSet object from the PodSpec generated from the control plane phase and + // using the self-hosting mutators available from the selfhosting phase + ds := selfhosting.BuildDaemonSet(component, &podSpec, mutators) + controlPlaneDaemonSets[component] = ds + } + return controlPlaneDaemonSets +} + +// addTempUpgradeDSPrefix adds the upgradeTempDSPrefix to the specified DaemonSet name +func addTempUpgradeDSPrefix(currentName string) string { + return fmt.Sprintf("%s%s", upgradeTempDSPrefix, currentName) +} + +// buildTempUpgradeLabels returns the label string-string map for identifying the temporary +func buildTempUpgradeLabels(component string) map[string]string { + return map[string]string{ + upgradeTempLabel: component, + } +} + +// buildTempUpgradeDSLabelQuery creates the right query for matching +func buildTempUpgradeDSLabelQuery(component string) string { + return fmt.Sprintf("%s=%s", upgradeTempLabel, component) +} + +// mutateTempDaemonSet mutates the specified self-hosted DaemonSet for the specified component +// in a way that makes it possible to post a nearly identical, temporary DaemonSet as a backup +func mutateTempDaemonSet(tempDS *extensions.DaemonSet, component string) { + // Prefix the name of the temporary DaemonSet with upgradeTempDSPrefix + tempDS.ObjectMeta.Name = addTempUpgradeDSPrefix(tempDS.ObjectMeta.Name) + // Set .Labels to something else than the "real" self-hosted components have + tempDS.ObjectMeta.Labels = buildTempUpgradeLabels(component) + tempDS.Spec.Selector.MatchLabels = buildTempUpgradeLabels(component) + tempDS.Spec.Template.ObjectMeta.Labels = buildTempUpgradeLabels(component) + // Clean all unnecessary ObjectMeta fields + tempDS.ObjectMeta = extractRelevantObjectMeta(tempDS.ObjectMeta) + // Reset .Status as we're posting a new object + tempDS.Status = extensions.DaemonSetStatus{} +} + +// extractRelevantObjectMeta returns only the relevant parts of ObjectMeta required when creating +// a new, identical resource. We should not POST ResourceVersion, UUIDs, etc., only the name, labels, +// namespace and annotations should be preserved. +func extractRelevantObjectMeta(ob metav1.ObjectMeta) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: ob.Name, + Namespace: ob.Namespace, + Labels: ob.Labels, + Annotations: ob.Annotations, + } +} + +// listPodsWithLabelSelector returns the relevant Pods for the given LabelSelector +func listPodsWithLabelSelector(client clientset.Interface, kvLabel string) (*v1.PodList, error) { + return client.CoreV1().Pods(metav1.NamespaceSystem).List(metav1.ListOptions{ + LabelSelector: kvLabel, + }) +} + +// getCurrentControlPlaneComponentResources returns a string-(Pod|DaemonSet) map for later use +func getCurrentControlPlaneComponentResources(client clientset.Interface) (map[string]controlPlaneComponentResources, error) { + controlPlaneResources := map[string]controlPlaneComponentResources{} + + for _, component := range constants.MasterComponents { + var podList *v1.PodList + var currentDS *extensions.DaemonSet + + // Get the self-hosted pod associated with the component + podLabelSelector := selfhosting.BuildSelfHostedComponentLabelQuery(component) + if err := apiclient.TryRunCommand(func() error { + var tryrunerr error + podList, tryrunerr = listPodsWithLabelSelector(client, podLabelSelector) + return tryrunerr // note that tryrunerr is most likely nil here (in successful cases) + }, selfHostingFailureThreshold); err != nil { + return nil, err + } + + // Make sure that there are only one Pod with this label selector; otherwise unexpected things can happen + if len(podList.Items) > 1 { + return nil, fmt.Errorf("too many pods with label selector %q found in the %s namespace", podLabelSelector, metav1.NamespaceSystem) + } + + // Get the component's DaemonSet object + dsName := constants.AddSelfHostedPrefix(component) + if err := apiclient.TryRunCommand(func() error { + var tryrunerr error + // Try to get the current self-hosted component + currentDS, tryrunerr = client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{}) + return tryrunerr // note that tryrunerr is most likely nil here (in successful cases) + }, selfHostingFailureThreshold); err != nil { + return nil, err + } + + // Add the associated resources to the map to return later + controlPlaneResources[component] = controlPlaneComponentResources{ + pod: &podList.Items[0], + daemonSet: currentDS, + } + } + return controlPlaneResources, nil +} diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index 6970560f54..f368a66950 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -17,13 +17,176 @@ limitations under the License. package upgrade import ( - clientset "k8s.io/client-go/kubernetes" + "fmt" + "os" + kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" - "k8s.io/kubernetes/pkg/util/version" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" + "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" + "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" ) -// PerformStaticPodControlPlaneUpgrade upgrades a static pod-hosted control plane -func PerformStaticPodControlPlaneUpgrade(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error { - // No-op for now; doesn't do anything yet +// StaticPodPathManager is responsible for tracking the directories used in the static pod upgrade transition +type StaticPodPathManager interface { + // MoveFile should move a file from oldPath to newPath + MoveFile(oldPath, newPath string) error + // RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet + RealManifestPath(component string) string + // RealManifestDir should point to the static pod manifest directory used by the kubelet + RealManifestDir() string + // TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade + TempManifestPath(component string) string + // TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade + TempManifestDir() string + // BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition + BackupManifestPath(component string) string + // BackupManifestDir should point to the backup directory used for backuping manifests during the transition + BackupManifestDir() string +} + +// KubeStaticPodPathManager is a real implementation of StaticPodPathManager that is used when upgrading a static pod cluster +type KubeStaticPodPathManager struct { + realManifestDir string + tempManifestDir string + backupManifestDir string +} + +// NewKubeStaticPodPathManager creates a new instance of KubeStaticPodPathManager +func NewKubeStaticPodPathManager(realDir, tempDir, backupDir string) StaticPodPathManager { + return &KubeStaticPodPathManager{ + realManifestDir: realDir, + tempManifestDir: tempDir, + backupManifestDir: backupDir, + } +} + +// NewKubeStaticPodPathManagerUsingTempDirs creates a new instance of KubeStaticPodPathManager with temporary directories backing it +func NewKubeStaticPodPathManagerUsingTempDirs(realManifestDir string) (StaticPodPathManager, error) { + upgradedManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgraded-manifests") + if err != nil { + return nil, err + } + backupManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-backup-manifests") + if err != nil { + return nil, err + } + + return NewKubeStaticPodPathManager(realManifestDir, upgradedManifestsDir, backupManifestsDir), nil +} + +// MoveFile should move a file from oldPath to newPath +func (spm *KubeStaticPodPathManager) MoveFile(oldPath, newPath string) error { + return os.Rename(oldPath, newPath) +} + +// RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet +func (spm *KubeStaticPodPathManager) RealManifestPath(component string) string { + return constants.GetStaticPodFilepath(component, spm.realManifestDir) +} + +// RealManifestDir should point to the static pod manifest directory used by the kubelet +func (spm *KubeStaticPodPathManager) RealManifestDir() string { + return spm.realManifestDir +} + +// TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade +func (spm *KubeStaticPodPathManager) TempManifestPath(component string) string { + return constants.GetStaticPodFilepath(component, spm.tempManifestDir) +} + +// TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade +func (spm *KubeStaticPodPathManager) TempManifestDir() string { + return spm.tempManifestDir +} + +// BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition +func (spm *KubeStaticPodPathManager) BackupManifestPath(component string) string { + return constants.GetStaticPodFilepath(component, spm.backupManifestDir) +} + +// BackupManifestDir should point to the backup directory used for backuping manifests during the transition +func (spm *KubeStaticPodPathManager) BackupManifestDir() string { + return spm.backupManifestDir +} + +// StaticPodControlPlane upgrades a static pod-hosted control plane +func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration) error { + + // This string-string map stores the component name and backup filepath (if a rollback is needed). + // If a rollback is needed, + recoverManifests := map[string]string{} + + beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName) + if err != nil { + return err + } + + // Write the updated static Pod manifests into the temporary directory + fmt.Printf("[upgrade/staticpods] Writing upgraded Static Pod manifests to %q\n", pathMgr.TempManifestDir()) + err = controlplane.CreateInitStaticPodManifestFiles(pathMgr.TempManifestDir(), cfg) + + for _, component := range constants.MasterComponents { + // The old manifest is here; in the /etc/kubernetes/manifests/ + currentManifestPath := pathMgr.RealManifestPath(component) + // The new, upgraded manifest will be written here + newManifestPath := pathMgr.TempManifestPath(component) + // The old manifest will be moved here; into a subfolder of the temporary directory + // If a rollback is needed, these manifests will be put back to where they where initially + backupManifestPath := pathMgr.BackupManifestPath(component) + + // Store the backup path in the recover list. If something goes wrong now, this component will be rolled back. + recoverManifests[component] = backupManifestPath + + // Move the old manifest into the old-manifests directory + if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil { + return rollbackOldManifests(recoverManifests, err, pathMgr) + } + + // Move the new manifest into the manifests directory + if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil { + return rollbackOldManifests(recoverManifests, err, pathMgr) + } + + fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath) + fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component") + + // Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to + // notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy + // If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the + // API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results. + if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHashMap[component]); err != nil { + return rollbackOldManifests(recoverManifests, err, pathMgr) + } + + // Wait for the static pod component to come up and register itself as a mirror pod + if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil { + return rollbackOldManifests(recoverManifests, err, pathMgr) + } + + fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component) + } + // Remove the temporary directories used on a best-effort (don't fail if the calls error out) + // The calls are set here by design; we should _not_ use "defer" above as that would remove the directories + // even in the "fail and rollback" case, where we want the directories preserved for the user. + os.RemoveAll(pathMgr.TempManifestDir()) + os.RemoveAll(pathMgr.BackupManifestDir()) + return nil } + +// rollbackOldManifests rolls back the backuped manifests if something went wrong +func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr StaticPodPathManager) error { + errs := []error{origErr} + for component, backupPath := range oldManifests { + // Where we should put back the backed up manifest + realManifestPath := pathMgr.RealManifestPath(component) + + // Move the backup manifest back into the manifests directory + err := pathMgr.MoveFile(backupPath, realManifestPath) + if err != nil { + errs = append(errs, err) + } + } + // Let the user know there we're problems, but we tried to reçover + return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs) +} diff --git a/cmd/kubeadm/app/phases/upgrade/versiongetter.go b/cmd/kubeadm/app/phases/upgrade/versiongetter.go index 879ca11276..e0289176c5 100644 --- a/cmd/kubeadm/app/phases/upgrade/versiongetter.go +++ b/cmd/kubeadm/app/phases/upgrade/versiongetter.go @@ -20,8 +20,12 @@ import ( "fmt" "io" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" versionutil "k8s.io/kubernetes/pkg/util/version" + "k8s.io/kubernetes/pkg/version" ) // VersionGetter defines an interface for fetching different versions. @@ -43,11 +47,8 @@ type KubeVersionGetter struct { w io.Writer } -// Make sure KubeVersionGetter implements the VersionGetter interface -var _ VersionGetter = &KubeVersionGetter{} - // NewKubeVersionGetter returns a new instance of KubeVersionGetter -func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVersionGetter { +func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) VersionGetter { return &KubeVersionGetter{ client: client, w: writer, @@ -56,28 +57,68 @@ func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVer // ClusterVersion gets API server version func (g *KubeVersionGetter) ClusterVersion() (string, *versionutil.Version, error) { - fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: ") - fmt.Fprintln(g.w, "v1.7.0") + clusterVersionInfo, err := g.client.Discovery().ServerVersion() + if err != nil { + return "", nil, fmt.Errorf("Couldn't fetch cluster version from the API Server: %v", err) + } + fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: %s\n", clusterVersionInfo.String()) - return "v1.7.0", versionutil.MustParseSemantic("v1.7.0"), nil + clusterVersion, err := versionutil.ParseSemantic(clusterVersionInfo.String()) + if err != nil { + return "", nil, fmt.Errorf("Couldn't parse cluster version: %v", err) + } + return clusterVersionInfo.String(), clusterVersion, nil } // KubeadmVersion gets kubeadm version func (g *KubeVersionGetter) KubeadmVersion() (string, *versionutil.Version, error) { - fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", "v1.8.0") + kubeadmVersionInfo := version.Get() + fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", kubeadmVersionInfo.String()) - return "v1.8.0", versionutil.MustParseSemantic("v1.8.0"), nil + kubeadmVersion, err := versionutil.ParseSemantic(kubeadmVersionInfo.String()) + if err != nil { + return "", nil, fmt.Errorf("Couldn't parse kubeadm version: %v", err) + } + return kubeadmVersionInfo.String(), kubeadmVersion, nil } -// VersionFromCILabel resolves different labels like "stable" to action semver versions using the Kubernetes CI uploads to GCS -func (g *KubeVersionGetter) VersionFromCILabel(_, _ string) (string, *versionutil.Version, error) { - return "v1.8.1", versionutil.MustParseSemantic("v1.8.0"), nil +// VersionFromCILabel resolves a version label like "latest" or "stable" to an actual version using the public Kubernetes CI uploads +func (g *KubeVersionGetter) VersionFromCILabel(ciVersionLabel, description string) (string, *versionutil.Version, error) { + versionStr, err := kubeadmutil.KubernetesReleaseVersion(ciVersionLabel) + if err != nil { + return "", nil, fmt.Errorf("Couldn't fetch latest %s version from the internet: %v", description, err) + } + + if description != "" { + fmt.Fprintf(g.w, "[upgrade/versions] Latest %s: %s\n", description, versionStr) + } + + ver, err := versionutil.ParseSemantic(versionStr) + if err != nil { + return "", nil, fmt.Errorf("Couldn't parse latest %s version: %v", description, err) + } + return versionStr, ver, nil } // KubeletVersions gets the versions of the kubelets in the cluster func (g *KubeVersionGetter) KubeletVersions() (map[string]uint16, error) { - // This tells kubeadm that there are two nodes in the cluster; both on the v1.7.1 version currently - return map[string]uint16{ - "v1.7.1": 2, - }, nil + nodes, err := g.client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("couldn't list all nodes in cluster") + } + return computeKubeletVersions(nodes.Items), nil +} + +// computeKubeletVersions returns a string-int map that describes how many nodes are of a specific version +func computeKubeletVersions(nodes []v1.Node) map[string]uint16 { + kubeletVersions := map[string]uint16{} + for _, node := range nodes { + kver := node.Status.NodeInfo.KubeletVersion + if _, found := kubeletVersions[kver]; !found { + kubeletVersions[kver] = 1 + continue + } + kubeletVersions[kver]++ + } + return kubeletVersions } diff --git a/cmd/kubeadm/app/util/apiclient/clientbacked_dryrun.go b/cmd/kubeadm/app/util/apiclient/clientbacked_dryrun.go index 6d9266276e..2fbf95bdbb 100644 --- a/cmd/kubeadm/app/util/apiclient/clientbacked_dryrun.go +++ b/cmd/kubeadm/app/util/apiclient/clientbacked_dryrun.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" kuberuntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" clientsetscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" @@ -33,7 +34,7 @@ import ( // ClientBackedDryRunGetter implements the DryRunGetter interface for use in NewDryRunClient() and proxies all GET and LIST requests to the backing API server reachable via rest.Config type ClientBackedDryRunGetter struct { - baseConfig *rest.Config + client clientset.Interface dynClientPool dynamic.ClientPool } @@ -41,11 +42,16 @@ type ClientBackedDryRunGetter struct { var _ DryRunGetter = &ClientBackedDryRunGetter{} // NewClientBackedDryRunGetter creates a new ClientBackedDryRunGetter instance based on the rest.Config object -func NewClientBackedDryRunGetter(config *rest.Config) *ClientBackedDryRunGetter { - return &ClientBackedDryRunGetter{ - baseConfig: config, - dynClientPool: dynamic.NewDynamicClientPool(config), +func NewClientBackedDryRunGetter(config *rest.Config) (*ClientBackedDryRunGetter, error) { + client, err := clientset.NewForConfig(config) + if err != nil { + return nil, err } + + return &ClientBackedDryRunGetter{ + client: client, + dynClientPool: dynamic.NewDynamicClientPool(config), + }, nil } // NewClientBackedDryRunGetterFromKubeconfig creates a new ClientBackedDryRunGetter instance from the given KubeConfig file @@ -58,7 +64,7 @@ func NewClientBackedDryRunGetterFromKubeconfig(file string) (*ClientBackedDryRun if err != nil { return nil, fmt.Errorf("failed to create API client configuration from kubeconfig: %v", err) } - return NewClientBackedDryRunGetter(clientConfig), nil + return NewClientBackedDryRunGetter(clientConfig) } // HandleGetAction handles GET actions to the dryrun clientset this interface supports @@ -106,6 +112,11 @@ func (clg *ClientBackedDryRunGetter) HandleListAction(action core.ListAction) (b return true, newObj, err } +// Client gets the backing clientset.Interface +func (clg *ClientBackedDryRunGetter) Client() clientset.Interface { + return clg.client +} + // actionToResourceClient returns the ResourceInterface for the given action // First; the function gets the right API group interface from the resource type. The API group struct behind the interface // returned may be cached in the dynamic client pool. Then, an APIResource object is constructed so that it can be passed to diff --git a/cmd/kubeadm/app/util/apiclient/idempotency.go b/cmd/kubeadm/app/util/apiclient/idempotency.go index ae5886bf4d..78147d5124 100644 --- a/cmd/kubeadm/app/util/apiclient/idempotency.go +++ b/cmd/kubeadm/app/util/apiclient/idempotency.go @@ -23,6 +23,7 @@ import ( extensions "k8s.io/api/extensions/v1beta1" rbac "k8s.io/api/rbac/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" ) @@ -97,6 +98,15 @@ func CreateOrUpdateDaemonSet(client clientset.Interface, ds *extensions.DaemonSe return nil } +// DeleteDaemonSetForeground deletes the specified DaemonSet in foreground mode; i.e. it blocks until/makes sure all the managed Pods are deleted +func DeleteDaemonSetForeground(client clientset.Interface, namespace, name string) error { + foregroundDelete := metav1.DeletePropagationForeground + deleteOptions := &metav1.DeleteOptions{ + PropagationPolicy: &foregroundDelete, + } + return client.ExtensionsV1beta1().DaemonSets(namespace).Delete(name, deleteOptions) +} + // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead. func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error { if _, err := client.RbacV1beta1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil { diff --git a/cmd/kubeadm/app/util/apiclient/wait.go b/cmd/kubeadm/app/util/apiclient/wait.go index 59eb1599f7..5690572b18 100644 --- a/cmd/kubeadm/app/util/apiclient/wait.go +++ b/cmd/kubeadm/app/util/apiclient/wait.go @@ -17,6 +17,8 @@ limitations under the License. package apiclient import ( + "crypto/sha256" + "encoding/json" "fmt" "io" "net/http" @@ -38,6 +40,10 @@ type Waiter interface { WaitForPodsWithLabel(kvLabel string) error // WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted WaitForPodToDisappear(staticPodName string) error + // WaitForStaticPodControlPlaneHashes + WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) + // WaitForStaticPodControlPlaneHashChange + WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error // SetTimeout adjusts the timeout to the specified duration SetTimeout(timeout time.Duration) } @@ -110,7 +116,7 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error { return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) { _, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { - fmt.Printf("[apiclient] The Static Pod %q is now removed\n", podName) + fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName) return true, nil } return false, nil @@ -122,6 +128,61 @@ func (w *KubeWaiter) SetTimeout(timeout time.Duration) { w.timeout = timeout } +// WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods +func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) { + + var mirrorPodHashes map[string]string + err := wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) { + + hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName) + if err != nil { + return false, nil + } + mirrorPodHashes = hashes + return true, nil + }) + return mirrorPodHashes, err +} + +// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed +// This implicitely means this function blocks until the kubelet has restarted the Static Pod in question +func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error { + return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) { + + hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName) + if err != nil { + return false, nil + } + // We should continue polling until the UID changes + if hashes[component] == previousHash { + return false, nil + } + + return true, nil + }) +} + +// getStaticPodControlPlaneHashes computes hashes for all the control plane's Static Pod resources +func getStaticPodControlPlaneHashes(client clientset.Interface, nodeName string) (map[string]string, error) { + + mirrorPodHashes := map[string]string{} + for _, component := range constants.MasterComponents { + staticPodName := fmt.Sprintf("%s-%s", component, nodeName) + staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + podBytes, err := json.Marshal(staticPod) + if err != nil { + return nil, err + } + + mirrorPodHashes[component] = fmt.Sprintf("%x", sha256.Sum256(podBytes)) + } + return mirrorPodHashes, nil +} + // TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned func TryRunCommand(f func() error, failureThreshold uint8) error { var numFailures uint8 diff --git a/cmd/kubeadm/app/util/dryrun/dryrun.go b/cmd/kubeadm/app/util/dryrun/dryrun.go index dbba795d9e..41ed0fc406 100644 --- a/cmd/kubeadm/app/util/dryrun/dryrun.go +++ b/cmd/kubeadm/app/util/dryrun/dryrun.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" ) @@ -98,3 +99,18 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error { // SetTimeout is a no-op; we don't wait in this implementation func (w *Waiter) SetTimeout(_ time.Duration) {} + +// WaitForStaticPodControlPlaneHashes returns an empty hash for all control plane images; WaitForStaticPodControlPlaneHashChange won't block in any case +// but the empty strings there are needed +func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string, error) { + return map[string]string{ + constants.KubeAPIServer: "", + constants.KubeControllerManager: "", + constants.KubeScheduler: "", + }, nil +} + +// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning +func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error { + return nil +}