mirror of https://github.com/k3s-io/k3s
Fully implement the kubeadm upgrade functionality
parent
6b39b017b4
commit
c237ff5bc0
|
@ -18,6 +18,7 @@ package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,12 +27,20 @@ import (
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
|
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
|
||||||
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
|
||||||
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
|
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/util/version"
|
"k8s.io/kubernetes/pkg/util/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
upgradeManifestTimeout = 1 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
// applyFlags holds the information about the flags that can be passed to apply
|
// applyFlags holds the information about the flags that can be passed to apply
|
||||||
type applyFlags struct {
|
type applyFlags struct {
|
||||||
nonInteractiveMode bool
|
nonInteractiveMode bool
|
||||||
|
@ -102,7 +111,7 @@ func NewCmdApply(parentFlags *cmdUpgradeFlags) *cobra.Command {
|
||||||
func RunApply(flags *applyFlags) error {
|
func RunApply(flags *applyFlags) error {
|
||||||
|
|
||||||
// Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap)
|
// Start with the basics, verify that the cluster is healthy and get the configuration from the cluster (using the ConfigMap)
|
||||||
upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig)
|
upgradeVars, err := enforceRequirements(flags.parent.kubeConfigPath, flags.parent.cfgPath, flags.parent.printConfig, flags.dryRun)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -126,16 +135,24 @@ func RunApply(flags *applyFlags) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Implement a prepulling mechanism here
|
// Use a prepuller implementation based on creating DaemonSets
|
||||||
|
// and block until all DaemonSets are ready; then we know for sure that all control plane images are cached locally
|
||||||
|
prepuller := upgrade.NewDaemonSetPrepuller(upgradeVars.client, upgradeVars.waiter, internalcfg)
|
||||||
|
upgrade.PrepullImagesInParallel(prepuller, flags.imagePullTimeout)
|
||||||
|
|
||||||
// Now; perform the upgrade procedure
|
// Now; perform the upgrade procedure
|
||||||
if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, internalcfg); err != nil {
|
if err := PerformControlPlaneUpgrade(flags, upgradeVars.client, upgradeVars.waiter, internalcfg); err != nil {
|
||||||
return fmt.Errorf("[upgrade/apply] FATAL: %v", err)
|
return fmt.Errorf("[upgrade/apply] FATAL: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upgrade RBAC rules and addons. Optionally, if needed, perform some extra task for a specific version
|
// Upgrade RBAC rules and addons. Optionally, if needed, perform some extra task for a specific version
|
||||||
if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil {
|
if err := upgrade.PerformPostUpgradeTasks(upgradeVars.client, internalcfg, flags.newK8sVersion); err != nil {
|
||||||
return fmt.Errorf("[upgrade/postupgrade] FATAL: %v", err)
|
return fmt.Errorf("[upgrade/postupgrade] FATAL post-upgrade error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if flags.dryRun {
|
||||||
|
fmt.Println("[dryrun] Finished dryrunning successfully!")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("")
|
fmt.Println("")
|
||||||
|
@ -182,7 +199,7 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett
|
||||||
if len(versionSkewErrs.Skippable) > 0 {
|
if len(versionSkewErrs.Skippable) > 0 {
|
||||||
// Return the error if the user hasn't specified the --force flag
|
// Return the error if the user hasn't specified the --force flag
|
||||||
if !flags.force {
|
if !flags.force {
|
||||||
return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Mandatory)
|
return fmt.Errorf("The --version argument is invalid due to these errors: %v. Can be bypassed if you pass the --force flag", versionSkewErrs.Skippable)
|
||||||
}
|
}
|
||||||
// Soft errors found, but --force was specified
|
// Soft errors found, but --force was specified
|
||||||
fmt.Printf("[upgrade/version] Found %d potential version compatibility errors but skipping since the --force flag is set: %v\n", len(versionSkewErrs.Skippable), versionSkewErrs.Skippable)
|
fmt.Printf("[upgrade/version] Found %d potential version compatibility errors but skipping since the --force flag is set: %v\n", len(versionSkewErrs.Skippable), versionSkewErrs.Skippable)
|
||||||
|
@ -192,21 +209,60 @@ func EnforceVersionPolicies(flags *applyFlags, versionGetter upgrade.VersionGett
|
||||||
}
|
}
|
||||||
|
|
||||||
// PerformControlPlaneUpgrade actually performs the upgrade procedure for the cluster of your type (self-hosted or static-pod-hosted)
|
// PerformControlPlaneUpgrade actually performs the upgrade procedure for the cluster of your type (self-hosted or static-pod-hosted)
|
||||||
func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, internalcfg *kubeadmapi.MasterConfiguration) error {
|
func PerformControlPlaneUpgrade(flags *applyFlags, client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
|
||||||
|
|
||||||
// Check if the cluster is self-hosted and act accordingly
|
// Check if the cluster is self-hosted and act accordingly
|
||||||
if upgrade.IsControlPlaneSelfHosted(client) {
|
if upgrade.IsControlPlaneSelfHosted(client) {
|
||||||
fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
fmt.Printf("[upgrade/apply] Upgrading your Self-Hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
||||||
|
|
||||||
// Upgrade a self-hosted cluster
|
// Upgrade the self-hosted cluster
|
||||||
// TODO(luxas): Implement this later when we have the new upgrade strategy
|
return upgrade.SelfHostedControlPlane(client, waiter, internalcfg, flags.newK8sVersion)
|
||||||
return fmt.Errorf("not implemented")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster
|
// OK, the cluster is hosted using static pods. Upgrade a static-pod hosted cluster
|
||||||
fmt.Printf("[upgrade/apply] Upgrading your Static Pod-hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
fmt.Printf("[upgrade/apply] Upgrading your Static Pod-hosted control plane to version %q...\n", flags.newK8sVersionStr)
|
||||||
|
|
||||||
if err := upgrade.PerformStaticPodControlPlaneUpgrade(client, internalcfg, flags.newK8sVersion); err != nil {
|
if flags.dryRun {
|
||||||
|
return DryRunStaticPodUpgrade(internalcfg)
|
||||||
|
}
|
||||||
|
return PerformStaticPodUpgrade(client, waiter, internalcfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PerformStaticPodUpgrade performs the upgrade of the control plane components for a static pod hosted cluster
|
||||||
|
func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter, internalcfg *kubeadmapi.MasterConfiguration) error {
|
||||||
|
pathManager, err := upgrade.NewKubeStaticPodPathManagerUsingTempDirs(constants.GetStaticPodDirectory())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DryRunStaticPodUpgrade fakes an upgrade of the control plane
|
||||||
|
func DryRunStaticPodUpgrade(internalcfg *kubeadmapi.MasterConfiguration) error {
|
||||||
|
|
||||||
|
dryRunManifestDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgrade-dryrun")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dryRunManifestDir)
|
||||||
|
|
||||||
|
if err := controlplane.CreateInitStaticPodManifestFiles(dryRunManifestDir, internalcfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Print the contents of the upgraded manifests and pretend like they were in /etc/kubernetes/manifests
|
||||||
|
files := []dryrunutil.FileToPrint{}
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
realPath := constants.GetStaticPodFilepath(component, dryRunManifestDir)
|
||||||
|
outputPath := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory())
|
||||||
|
files = append(files, dryrunutil.NewFileToPrint(realPath, outputPath))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := dryrunutil.PrintDryRunFiles(files, os.Stdout); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -26,10 +26,13 @@ import (
|
||||||
|
|
||||||
"github.com/ghodss/yaml"
|
"github.com/ghodss/yaml"
|
||||||
|
|
||||||
|
fakediscovery "k8s.io/client-go/discovery/fake"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
|
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
|
||||||
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
|
||||||
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
|
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
|
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
|
||||||
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
|
kubeconfigutil "k8s.io/kubernetes/cmd/kubeadm/app/util/kubeconfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -39,11 +42,12 @@ type upgradeVariables struct {
|
||||||
client clientset.Interface
|
client clientset.Interface
|
||||||
cfg *kubeadmapiext.MasterConfiguration
|
cfg *kubeadmapiext.MasterConfiguration
|
||||||
versionGetter upgrade.VersionGetter
|
versionGetter upgrade.VersionGetter
|
||||||
|
waiter apiclient.Waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure
|
// enforceRequirements verifies that it's okay to upgrade and then returns the variables needed for the rest of the procedure
|
||||||
func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upgradeVariables, error) {
|
func enforceRequirements(kubeConfigPath, cfgPath string, printConfig, dryRun bool) (*upgradeVariables, error) {
|
||||||
client, err := kubeconfigutil.ClientSetFromFile(kubeConfigPath)
|
client, err := getClient(kubeConfigPath, dryRun)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err)
|
return nil, fmt.Errorf("couldn't create a Kubernetes client from file %q: %v", kubeConfigPath, err)
|
||||||
}
|
}
|
||||||
|
@ -69,6 +73,8 @@ func enforceRequirements(kubeConfigPath, cfgPath string, printConfig bool) (*upg
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
// Use a real version getter interface that queries the API server, the kubeadm client and the Kubernetes CI system for latest versions
|
// Use a real version getter interface that queries the API server, the kubeadm client and the Kubernetes CI system for latest versions
|
||||||
versionGetter: upgrade.NewKubeVersionGetter(client, os.Stdout),
|
versionGetter: upgrade.NewKubeVersionGetter(client, os.Stdout),
|
||||||
|
// Use the waiter conditionally based on the dryrunning variable
|
||||||
|
waiter: getWaiter(dryRun, client),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,6 +107,46 @@ func runPreflightChecks(skipPreFlight bool) error {
|
||||||
return preflight.RunRootCheckOnly()
|
return preflight.RunRootCheckOnly()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getClient gets a real or fake client depending on whether the user is dry-running or not
|
||||||
|
func getClient(file string, dryRun bool) (clientset.Interface, error) {
|
||||||
|
if dryRun {
|
||||||
|
dryRunGetter, err := apiclient.NewClientBackedDryRunGetterFromKubeconfig(file)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// In order for fakeclient.Discovery().ServerVersion() to return the backing API Server's
|
||||||
|
// real version; we have to do some clever API machinery tricks. First, we get the real
|
||||||
|
// API Server's version
|
||||||
|
realServerVersion, err := dryRunGetter.Client().Discovery().ServerVersion()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get server version: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the fake clientset
|
||||||
|
fakeclient := apiclient.NewDryRunClient(dryRunGetter, os.Stdout)
|
||||||
|
// As we know the return of Discovery() of the fake clientset is of type *fakediscovery.FakeDiscovery
|
||||||
|
// we can convert it to that struct.
|
||||||
|
fakeclientDiscovery, ok := fakeclient.Discovery().(*fakediscovery.FakeDiscovery)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("couldn't set fake discovery's server version")
|
||||||
|
}
|
||||||
|
// Lastly, set the right server version to be used
|
||||||
|
fakeclientDiscovery.FakedServerVersion = realServerVersion
|
||||||
|
// return the fake clientset used for dry-running
|
||||||
|
return fakeclient, nil
|
||||||
|
}
|
||||||
|
return kubeconfigutil.ClientSetFromFile(file)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getWaiter gets the right waiter implementation
|
||||||
|
func getWaiter(dryRun bool, client clientset.Interface) apiclient.Waiter {
|
||||||
|
if dryRun {
|
||||||
|
return dryrunutil.NewWaiter()
|
||||||
|
}
|
||||||
|
return apiclient.NewKubeWaiter(client, upgradeManifestTimeout, os.Stdout)
|
||||||
|
}
|
||||||
|
|
||||||
// InteractivelyConfirmUpgrade asks the user whether they _really_ want to upgrade.
|
// InteractivelyConfirmUpgrade asks the user whether they _really_ want to upgrade.
|
||||||
func InteractivelyConfirmUpgrade(question string) error {
|
func InteractivelyConfirmUpgrade(question string) error {
|
||||||
|
|
||||||
|
|
|
@ -50,8 +50,8 @@ func NewCmdPlan(parentFlags *cmdUpgradeFlags) *cobra.Command {
|
||||||
// RunPlan takes care of outputting available versions to upgrade to for the user
|
// RunPlan takes care of outputting available versions to upgrade to for the user
|
||||||
func RunPlan(parentFlags *cmdUpgradeFlags) error {
|
func RunPlan(parentFlags *cmdUpgradeFlags) error {
|
||||||
|
|
||||||
// Start with the basics, verify that the cluster is healthy, build a client and a versionGetter.
|
// Start with the basics, verify that the cluster is healthy, build a client and a versionGetter. Never set dry-run for plan.
|
||||||
upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig)
|
upgradeVars, err := enforceRequirements(parentFlags.kubeConfigPath, parentFlags.cfgPath, parentFlags.printConfig, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error {
|
||||||
// Compute which upgrade possibilities there are
|
// Compute which upgrade possibilities there are
|
||||||
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades)
|
availUpgrades, err := upgrade.GetAvailableUpgrades(upgradeVars.versionGetter, parentFlags.allowExperimentalUpgrades, parentFlags.allowRCUpgrades)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("[upgrade/versions] FATAL: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tell the user which upgrades are available
|
// Tell the user which upgrades are available
|
||||||
|
|
|
@ -18,6 +18,8 @@ package constants
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -31,6 +33,7 @@ var KubernetesDir = "/etc/kubernetes"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ManifestsSubDirName = "manifests"
|
ManifestsSubDirName = "manifests"
|
||||||
|
TempDirForKubeadm = "/etc/kubernetes/tmp"
|
||||||
|
|
||||||
CACertAndKeyBaseName = "ca"
|
CACertAndKeyBaseName = "ca"
|
||||||
CACertName = "ca.crt"
|
CACertName = "ca.crt"
|
||||||
|
@ -181,3 +184,17 @@ func GetAdminKubeConfigPath() string {
|
||||||
func AddSelfHostedPrefix(componentName string) string {
|
func AddSelfHostedPrefix(componentName string) string {
|
||||||
return fmt.Sprintf("%s%s", SelfHostingPrefix, componentName)
|
return fmt.Sprintf("%s%s", SelfHostingPrefix, componentName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateTempDirForKubeadm is a function that creates a temporary directory under /etc/kubernetes/tmp (not using /tmp as that would potentially be dangerous)
|
||||||
|
func CreateTempDirForKubeadm(dirName string) (string, error) {
|
||||||
|
// creates target folder if not already exists
|
||||||
|
if err := os.MkdirAll(TempDirForKubeadm, 0700); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create directory %q: %v", TempDirForKubeadm, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tempDir, err := ioutil.TempDir(TempDirForKubeadm, dirName)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("couldn't create a temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
return tempDir, nil
|
||||||
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/features"
|
||||||
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,8 +35,8 @@ const (
|
||||||
// PodSpecMutatorFunc is a function capable of mutating a PodSpec
|
// PodSpecMutatorFunc is a function capable of mutating a PodSpec
|
||||||
type PodSpecMutatorFunc func(*v1.PodSpec)
|
type PodSpecMutatorFunc func(*v1.PodSpec)
|
||||||
|
|
||||||
// getDefaultMutators gets the mutator functions that alwasy should be used
|
// GetDefaultMutators gets the mutator functions that alwasy should be used
|
||||||
func getDefaultMutators() map[string][]PodSpecMutatorFunc {
|
func GetDefaultMutators() map[string][]PodSpecMutatorFunc {
|
||||||
return map[string][]PodSpecMutatorFunc{
|
return map[string][]PodSpecMutatorFunc{
|
||||||
kubeadmconstants.KubeAPIServer: {
|
kubeadmconstants.KubeAPIServer: {
|
||||||
addNodeSelectorToPodSpec,
|
addNodeSelectorToPodSpec,
|
||||||
|
@ -55,6 +56,22 @@ func getDefaultMutators() map[string][]PodSpecMutatorFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMutatorsFromFeatureGates returns all mutators needed based on the feature gates passed
|
||||||
|
func GetMutatorsFromFeatureGates(featureGates map[string]bool) map[string][]PodSpecMutatorFunc {
|
||||||
|
// Here the map of different mutators to use for the control plane's podspec is stored
|
||||||
|
mutators := GetDefaultMutators()
|
||||||
|
|
||||||
|
// Some extra work to be done if we should store the control plane certificates in Secrets
|
||||||
|
if features.Enabled(featureGates, features.StoreCertsInSecrets) {
|
||||||
|
|
||||||
|
// Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them
|
||||||
|
mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer)
|
||||||
|
mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager)
|
||||||
|
mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler)
|
||||||
|
}
|
||||||
|
return mutators
|
||||||
|
}
|
||||||
|
|
||||||
// mutatePodSpec makes a Static Pod-hosted PodSpec suitable for self-hosting
|
// mutatePodSpec makes a Static Pod-hosted PodSpec suitable for self-hosting
|
||||||
func mutatePodSpec(mutators map[string][]PodSpecMutatorFunc, name string, podSpec *v1.PodSpec) {
|
func mutatePodSpec(mutators map[string][]PodSpecMutatorFunc, name string, podSpec *v1.PodSpec) {
|
||||||
// Get the mutator functions for the component in question, then loop through and execute them
|
// Get the mutator functions for the component in question, then loop through and execute them
|
||||||
|
|
|
@ -60,7 +60,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
|
||||||
waiter.SetTimeout(selfHostingWaitTimeout)
|
waiter.SetTimeout(selfHostingWaitTimeout)
|
||||||
|
|
||||||
// Here the map of different mutators to use for the control plane's podspec is stored
|
// Here the map of different mutators to use for the control plane's podspec is stored
|
||||||
mutators := getDefaultMutators()
|
mutators := GetMutatorsFromFeatureGates(cfg.FeatureGates)
|
||||||
|
|
||||||
// Some extra work to be done if we should store the control plane certificates in Secrets
|
// Some extra work to be done if we should store the control plane certificates in Secrets
|
||||||
if features.Enabled(cfg.FeatureGates, features.StoreCertsInSecrets) {
|
if features.Enabled(cfg.FeatureGates, features.StoreCertsInSecrets) {
|
||||||
|
@ -72,10 +72,6 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
|
||||||
if err := uploadKubeConfigSecrets(client, kubeConfigDir); err != nil {
|
if err := uploadKubeConfigSecrets(client, kubeConfigDir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Add the store-certs-in-secrets-specific mutators here so that the self-hosted component starts using them
|
|
||||||
mutators[kubeadmconstants.KubeAPIServer] = append(mutators[kubeadmconstants.KubeAPIServer], setSelfHostedVolumesForAPIServer)
|
|
||||||
mutators[kubeadmconstants.KubeControllerManager] = append(mutators[kubeadmconstants.KubeControllerManager], setSelfHostedVolumesForControllerManager)
|
|
||||||
mutators[kubeadmconstants.KubeScheduler] = append(mutators[kubeadmconstants.KubeScheduler], setSelfHostedVolumesForScheduler)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, componentName := range kubeadmconstants.MasterComponents {
|
for _, componentName := range kubeadmconstants.MasterComponents {
|
||||||
|
@ -95,7 +91,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build a DaemonSet object from the loaded PodSpec
|
// Build a DaemonSet object from the loaded PodSpec
|
||||||
ds := buildDaemonSet(componentName, podSpec, mutators)
|
ds := BuildDaemonSet(componentName, podSpec, mutators)
|
||||||
|
|
||||||
// Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out
|
// Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out
|
||||||
if err := apiclient.TryRunCommand(func() error {
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
@ -105,7 +101,7 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the self-hosted component to come up
|
// Wait for the self-hosted component to come up
|
||||||
if err := waiter.WaitForPodsWithLabel(buildSelfHostedWorkloadLabelQuery(componentName)); err != nil {
|
if err := waiter.WaitForPodsWithLabel(BuildSelfHostedComponentLabelQuery(componentName)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,8 +128,8 @@ func CreateSelfHostedControlPlane(manifestsDir, kubeConfigDir string, cfg *kubea
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
|
// BuildDaemonSet is responsible for mutating the PodSpec and return a DaemonSet which is suitable for the self-hosting purporse
|
||||||
func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet {
|
func BuildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodSpecMutatorFunc) *extensions.DaemonSet {
|
||||||
|
|
||||||
// Mutate the PodSpec so it's suitable for self-hosting
|
// Mutate the PodSpec so it's suitable for self-hosting
|
||||||
mutatePodSpec(mutators, name, podSpec)
|
mutatePodSpec(mutators, name, podSpec)
|
||||||
|
@ -143,19 +139,19 @@ func buildDaemonSet(name string, podSpec *v1.PodSpec, mutators map[string][]PodS
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: kubeadmconstants.AddSelfHostedPrefix(name),
|
Name: kubeadmconstants.AddSelfHostedPrefix(name),
|
||||||
Namespace: metav1.NamespaceSystem,
|
Namespace: metav1.NamespaceSystem,
|
||||||
Labels: map[string]string{
|
Labels: BuildSelfhostedComponentLabels(name),
|
||||||
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(name),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Spec: extensions.DaemonSetSpec{
|
Spec: extensions.DaemonSetSpec{
|
||||||
Template: v1.PodTemplateSpec{
|
Template: v1.PodTemplateSpec{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Labels: map[string]string{
|
Labels: BuildSelfhostedComponentLabels(name),
|
||||||
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(name),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Spec: *podSpec,
|
Spec: *podSpec,
|
||||||
},
|
},
|
||||||
|
UpdateStrategy: extensions.DaemonSetUpdateStrategy{
|
||||||
|
// Make the DaemonSet utilize the RollingUpdate rollout strategy
|
||||||
|
Type: extensions.RollingUpdateDaemonSetStrategyType,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +172,14 @@ func loadPodSpecFromFile(manifestPath string) (*v1.PodSpec, error) {
|
||||||
return &staticPod.Spec, nil
|
return &staticPod.Spec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildSelfHostedWorkloadLabelQuery creates the right query for matching a self-hosted Pod
|
// BuildSelfhostedComponentLabels returns the labels for a self-hosted component
|
||||||
func buildSelfHostedWorkloadLabelQuery(componentName string) string {
|
func BuildSelfhostedComponentLabels(component string) map[string]string {
|
||||||
|
return map[string]string{
|
||||||
|
"k8s-app": kubeadmconstants.AddSelfHostedPrefix(component),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildSelfHostedComponentLabelQuery creates the right query for matching a self-hosted Pod
|
||||||
|
func BuildSelfHostedComponentLabelQuery(componentName string) string {
|
||||||
return fmt.Sprintf("k8s-app=%s", kubeadmconstants.AddSelfHostedPrefix(componentName))
|
return fmt.Sprintf("k8s-app=%s", kubeadmconstants.AddSelfHostedPrefix(componentName))
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,10 @@ package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
|
||||||
|
"k8s.io/kubernetes/pkg/util/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Upgrade defines an upgrade possibility to upgrade from a current version to a new one
|
// Upgrade defines an upgrade possibility to upgrade from a current version to a new one
|
||||||
|
@ -57,7 +61,196 @@ type ClusterState struct {
|
||||||
|
|
||||||
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
|
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
|
||||||
// kinds of upgrades can be performed
|
// kinds of upgrades can be performed
|
||||||
func GetAvailableUpgrades(_ VersionGetter, _, _ bool) ([]Upgrade, error) {
|
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool) ([]Upgrade, error) {
|
||||||
fmt.Println("[upgrade] Fetching available versions to upgrade to:")
|
fmt.Println("[upgrade] Fetching available versions to upgrade to:")
|
||||||
return []Upgrade{}, nil
|
|
||||||
|
// Collect the upgrades kubeadm can do in this list
|
||||||
|
upgrades := []Upgrade{}
|
||||||
|
|
||||||
|
// Get the cluster version
|
||||||
|
clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get current kubeadm CLI version
|
||||||
|
kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get and output the current latest stable version
|
||||||
|
stableVersionStr, stableVersion, err := versionGetterImpl.VersionFromCILabel("stable", "stable version")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the kubelet versions in the cluster
|
||||||
|
kubeletVersions, err := versionGetterImpl.KubeletVersions()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct a descriptor for the current state of the world
|
||||||
|
beforeState := ClusterState{
|
||||||
|
KubeVersion: clusterVersionStr,
|
||||||
|
DNSVersion: dns.GetKubeDNSVersion(clusterVersion),
|
||||||
|
KubeadmVersion: kubeadmVersionStr,
|
||||||
|
KubeletVersions: kubeletVersions,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version
|
||||||
|
// This guess will be corrected once we know if there is a patch version available
|
||||||
|
canDoMinorUpgrade := clusterVersion.LessThan(stableVersion)
|
||||||
|
|
||||||
|
// A patch version doesn't exist if the cluster version is higher than or equal to the current stable version
|
||||||
|
// in the case that a user is trying to upgrade from, let's say, v1.8.0-beta.2 to v1.8.0-rc.1 (given we support such upgrades experimentally)
|
||||||
|
// a stable-1.8 branch doesn't exist yet. Hence this check.
|
||||||
|
if patchVersionBranchExists(clusterVersion, stableVersion) {
|
||||||
|
|
||||||
|
currentBranch := getBranchFromVersion(clusterVersionStr)
|
||||||
|
versionLabel := fmt.Sprintf("stable-%s", currentBranch)
|
||||||
|
description := fmt.Sprintf("version in the v%s series", currentBranch)
|
||||||
|
|
||||||
|
// Get and output the latest patch version for the cluster branch
|
||||||
|
patchVersionStr, patchVersion, err := versionGetterImpl.VersionFromCILabel(versionLabel, description)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if a minor version upgrade is possible when a patch release exists
|
||||||
|
// It's only possible if the latest patch version is higher than the current patch version
|
||||||
|
// If that's the case, they must be on different branches => a newer minor version can be upgraded to
|
||||||
|
canDoMinorUpgrade = minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion)
|
||||||
|
|
||||||
|
// If the cluster version is lower than the newest patch version, we should inform about the possible upgrade
|
||||||
|
if patchUpgradePossible(clusterVersion, patchVersion) {
|
||||||
|
|
||||||
|
// The kubeadm version has to be upgraded to the latest patch version
|
||||||
|
newKubeadmVer := patchVersionStr
|
||||||
|
if kubeadmVersion.AtLeast(patchVersion) {
|
||||||
|
// In this case, the kubeadm CLI version is new enough. Don't display an update suggestion for kubeadm by making .NewKubeadmVersion equal .CurrentKubeadmVersion
|
||||||
|
newKubeadmVer = kubeadmVersionStr
|
||||||
|
}
|
||||||
|
|
||||||
|
upgrades = append(upgrades, Upgrade{
|
||||||
|
Description: description,
|
||||||
|
Before: beforeState,
|
||||||
|
After: ClusterState{
|
||||||
|
KubeVersion: patchVersionStr,
|
||||||
|
DNSVersion: dns.GetKubeDNSVersion(patchVersion),
|
||||||
|
KubeadmVersion: newKubeadmVer,
|
||||||
|
// KubeletVersions is unset here as it is not used anywhere in .After
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if canDoMinorUpgrade {
|
||||||
|
upgrades = append(upgrades, Upgrade{
|
||||||
|
Description: "stable version",
|
||||||
|
Before: beforeState,
|
||||||
|
After: ClusterState{
|
||||||
|
KubeVersion: stableVersionStr,
|
||||||
|
DNSVersion: dns.GetKubeDNSVersion(stableVersion),
|
||||||
|
KubeadmVersion: stableVersionStr,
|
||||||
|
// KubeletVersions is unset here as it is not used anywhere in .After
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if experimentalUpgradesAllowed || rcUpgradesAllowed {
|
||||||
|
// dl.k8s.io/release/latest.txt is ALWAYS an alpha.X version
|
||||||
|
// dl.k8s.io/release/latest-1.X.txt is first v1.X.0-alpha.0 -> v1.X.0-alpha.Y, then v1.X.0-beta.0 to v1.X.0-beta.Z, then v1.X.0-rc.1 to v1.X.0-rc.W.
|
||||||
|
// After the v1.X.0 release, latest-1.X.txt is always a beta.0 version. Let's say the latest stable version on the v1.7 branch is v1.7.3, then the
|
||||||
|
// latest-1.7 version is v1.7.4-beta.0
|
||||||
|
|
||||||
|
// Worth noticing is that when the release-1.X branch is cut; there are two versions tagged: v1.X.0-beta.0 AND v1.(X+1).alpha.0
|
||||||
|
// The v1.(X+1).alpha.0 is pretty much useless and should just be ignored, as more betas may be released that have more features than the initial v1.(X+1).alpha.0
|
||||||
|
|
||||||
|
// So what we do below is getting the latest overall version, always an v1.X.0-alpha.Y version. Then we get latest-1.(X-1) version. This version may be anything
|
||||||
|
// between v1.(X-1).0-beta.0 and v1.(X-1).Z-beta.0. At some point in time, latest-1.(X-1) will point to v1.(X-1).0-rc.1. Then we should show it.
|
||||||
|
|
||||||
|
// The flow looks like this (with time on the X axis):
|
||||||
|
// v1.8.0-alpha.1 -> v1.8.0-alpha.2 -> v1.8.0-alpha.3 | release-1.8 branch | v1.8.0-beta.0 -> v1.8.0-beta.1 -> v1.8.0-beta.2 -> v1.8.0-rc.1 -> v1.8.0 -> v1.8.1
|
||||||
|
// v1.9.0-alpha.0 -> v1.9.0-alpha.1 -> v1.9.0-alpha.2
|
||||||
|
|
||||||
|
// Get and output the current latest unstable version
|
||||||
|
latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
minorUnstable := latestVersion.Components()[1]
|
||||||
|
// Get and output the current latest unstable version
|
||||||
|
previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1)
|
||||||
|
previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade
|
||||||
|
if rcUpgradesAllowed && rcUpgradePossible(clusterVersion, previousBranchLatestVersion) {
|
||||||
|
upgrades = append(upgrades, Upgrade{
|
||||||
|
Description: "release candidate version",
|
||||||
|
Before: beforeState,
|
||||||
|
After: ClusterState{
|
||||||
|
KubeVersion: previousBranchLatestVersionStr,
|
||||||
|
DNSVersion: dns.GetKubeDNSVersion(previousBranchLatestVersion),
|
||||||
|
KubeadmVersion: previousBranchLatestVersionStr,
|
||||||
|
// KubeletVersions is unset here as it is not used anywhere in .After
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Show the possibility if experimental upgrades are allowed
|
||||||
|
if experimentalUpgradesAllowed && clusterVersion.LessThan(latestVersion) {
|
||||||
|
|
||||||
|
// Default to assume that the experimental version to show is the unstable one
|
||||||
|
unstableKubeVersion := latestVersionStr
|
||||||
|
unstableKubeDNSVersion := dns.GetKubeDNSVersion(latestVersion)
|
||||||
|
|
||||||
|
// Ẃe should not display alpha.0. The previous branch's beta/rc versions are more relevant due how the kube branching process works.
|
||||||
|
if latestVersion.PreRelease() == "alpha.0" {
|
||||||
|
unstableKubeVersion = previousBranchLatestVersionStr
|
||||||
|
unstableKubeDNSVersion = dns.GetKubeDNSVersion(previousBranchLatestVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
upgrades = append(upgrades, Upgrade{
|
||||||
|
Description: "experimental version",
|
||||||
|
Before: beforeState,
|
||||||
|
After: ClusterState{
|
||||||
|
KubeVersion: unstableKubeVersion,
|
||||||
|
DNSVersion: unstableKubeDNSVersion,
|
||||||
|
KubeadmVersion: unstableKubeVersion,
|
||||||
|
// KubeletVersions is unset here as it is not used anywhere in .After
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a newline in the end of this output to leave some space to the next output section
|
||||||
|
fmt.Println("")
|
||||||
|
|
||||||
|
return upgrades, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBranchFromVersion(version string) string {
|
||||||
|
return strings.TrimPrefix(version, "v")[:3]
|
||||||
|
}
|
||||||
|
|
||||||
|
func patchVersionBranchExists(clusterVersion, stableVersion *version.Version) bool {
|
||||||
|
return stableVersion.AtLeast(clusterVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func patchUpgradePossible(clusterVersion, patchVersion *version.Version) bool {
|
||||||
|
return clusterVersion.LessThan(patchVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func rcUpgradePossible(clusterVersion, previousBranchLatestVersion *version.Version) bool {
|
||||||
|
return strings.HasPrefix(previousBranchLatestVersion.PreRelease(), "rc") && clusterVersion.LessThan(previousBranchLatestVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func minorUpgradePossibleWithPatchRelease(stableVersion, patchVersion *version.Version) bool {
|
||||||
|
return patchVersion.LessThan(stableVersion)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,18 +19,89 @@ package upgrade
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
|
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FetchConfiguration fetches configuration required for upgrading your cluster from a file (which has precedence) or a ConfigMap in the cluster
|
// FetchConfiguration fetches configuration required for upgrading your cluster from a file (which has precedence) or a ConfigMap in the cluster
|
||||||
func FetchConfiguration(_ clientset.Interface, _ io.Writer, _ string) (*kubeadmapiext.MasterConfiguration, error) {
|
func FetchConfiguration(client clientset.Interface, w io.Writer, cfgPath string) (*kubeadmapiext.MasterConfiguration, error) {
|
||||||
fmt.Println("[upgrade/config] Making sure the configuration is correct:")
|
fmt.Println("[upgrade/config] Making sure the configuration is correct:")
|
||||||
|
|
||||||
cfg := &kubeadmapiext.MasterConfiguration{}
|
// Load the configuration from a file or the cluster
|
||||||
api.Scheme.Default(cfg)
|
configBytes, err := loadConfigurationBytes(client, w, cfgPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return cfg, nil
|
// Take the versioned configuration populated from the configmap, default it and validate
|
||||||
|
// Return the internal version of the API object
|
||||||
|
versionedcfg, err := bytesToValidatedMasterConfig(configBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not decode configuration: %v", err)
|
||||||
|
}
|
||||||
|
return versionedcfg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadConfigurationBytes loads the configuration byte slice from either a file or the cluster ConfigMap
|
||||||
|
func loadConfigurationBytes(client clientset.Interface, w io.Writer, cfgPath string) ([]byte, error) {
|
||||||
|
if cfgPath != "" {
|
||||||
|
fmt.Printf("[upgrade/config] Reading configuration options from a file: %s\n", cfgPath)
|
||||||
|
return ioutil.ReadFile(cfgPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("[upgrade/config] Reading configuration from the cluster...")
|
||||||
|
|
||||||
|
configMap, err := client.CoreV1().ConfigMaps(metav1.NamespaceSystem).Get(constants.MasterConfigurationConfigMap, metav1.GetOptions{})
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
fmt.Printf("[upgrade/config] In order to upgrade, a ConfigMap called %q in the %s namespace must exist.\n", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem)
|
||||||
|
fmt.Println("[upgrade/config] Without this information, 'kubeadm upgrade' don't how to configure your upgraded cluster.")
|
||||||
|
fmt.Println("")
|
||||||
|
fmt.Println("[upgrade/config] Next steps:")
|
||||||
|
fmt.Printf("\t- OPTION 1: Run 'kubeadm config upload from-flags' and specify the same CLI arguments you passed to 'kubeadm init' when you created your master.\n")
|
||||||
|
fmt.Printf("\t- OPTION 2: Run 'kubeadm config upload from-file' and specify the same config file you passed to 'kubeadm init' when you created your master.\n")
|
||||||
|
fmt.Printf("\t- OPTION 3: Pass a config file to 'kubeadm upgrade' using the --config flag.\n")
|
||||||
|
fmt.Println("")
|
||||||
|
return []byte{}, fmt.Errorf("the ConfigMap %q in the %s namespace used for getting configuration information was not found", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem)
|
||||||
|
} else if err != nil {
|
||||||
|
return []byte{}, fmt.Errorf("an unexpected error happened when trying to get the ConfigMap %q in the %s namespace: %v", constants.MasterConfigurationConfigMap, metav1.NamespaceSystem, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/config] FYI: You can look at this config file with 'kubectl -n %s get cm %s -oyaml'\n", metav1.NamespaceSystem, constants.MasterConfigurationConfigMap)
|
||||||
|
return []byte(configMap.Data[constants.MasterConfigurationConfigMapKey]), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// bytesToValidatedMasterConfig converts a byte array to an external, defaulted and validated configuration object
|
||||||
|
func bytesToValidatedMasterConfig(b []byte) (*kubeadmapiext.MasterConfiguration, error) {
|
||||||
|
cfg := &kubeadmapiext.MasterConfiguration{}
|
||||||
|
finalCfg := &kubeadmapiext.MasterConfiguration{}
|
||||||
|
internalcfg := &kubeadmapi.MasterConfiguration{}
|
||||||
|
|
||||||
|
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), b, cfg); err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode config from bytes: %v", err)
|
||||||
|
}
|
||||||
|
// Default and convert to the internal version
|
||||||
|
api.Scheme.Default(cfg)
|
||||||
|
api.Scheme.Convert(cfg, internalcfg, nil)
|
||||||
|
|
||||||
|
// Applies dynamic defaults to settings not provided with flags
|
||||||
|
if err := configutil.SetInitDynamicDefaults(internalcfg); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Validates cfg (flags/configs + defaults + dynamic defaults)
|
||||||
|
if err := validation.ValidateMasterConfiguration(internalcfg).ToAggregate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Finally converts back to the external version
|
||||||
|
api.Scheme.Convert(internalcfg, finalCfg, nil)
|
||||||
|
return finalCfg, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,20 +17,190 @@ limitations under the License.
|
||||||
package upgrade
|
package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// healthCheck is a helper struct for easily performing healthchecks against the cluster and printing the output
|
||||||
|
type healthCheck struct {
|
||||||
|
description, okMessage, failMessage string
|
||||||
|
// f is invoked with a k8s client passed to it. Should return an optional warning and/or an error
|
||||||
|
f func(clientset.Interface) error
|
||||||
|
}
|
||||||
|
|
||||||
// CheckClusterHealth makes sure:
|
// CheckClusterHealth makes sure:
|
||||||
// - the API /healthz endpoint is healthy
|
// - the API /healthz endpoint is healthy
|
||||||
// - all Nodes are Ready
|
// - all Nodes are Ready
|
||||||
// - (if self-hosted) that there are DaemonSets with at least one Pod for all control plane components
|
// - (if self-hosted) that there are DaemonSets with at least one Pod for all control plane components
|
||||||
// - (if static pod-hosted) that all required Static Pod manifests exist on disk
|
// - (if static pod-hosted) that all required Static Pod manifests exist on disk
|
||||||
func CheckClusterHealth(_ clientset.Interface) error {
|
func CheckClusterHealth(client clientset.Interface) error {
|
||||||
|
fmt.Println("[upgrade] Making sure the cluster is healthy:")
|
||||||
|
|
||||||
|
healthChecks := []healthCheck{
|
||||||
|
{
|
||||||
|
description: "API Server health",
|
||||||
|
okMessage: "Healthy",
|
||||||
|
failMessage: "Unhealthy",
|
||||||
|
f: apiServerHealthy,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Node health",
|
||||||
|
okMessage: "All Nodes are healthy",
|
||||||
|
failMessage: "More than one Node unhealthy",
|
||||||
|
f: nodesHealthy,
|
||||||
|
},
|
||||||
|
// TODO: Add a check for ComponentStatuses here?
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run slightly different health checks depending on control plane hosting type
|
||||||
|
if IsControlPlaneSelfHosted(client) {
|
||||||
|
healthChecks = append(healthChecks, healthCheck{
|
||||||
|
description: "Control plane DaemonSet health",
|
||||||
|
okMessage: "All control plane DaemonSets are healthy",
|
||||||
|
failMessage: "More than one control plane DaemonSet unhealthy",
|
||||||
|
f: controlPlaneHealth,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
healthChecks = append(healthChecks, healthCheck{
|
||||||
|
description: "Static Pod manifests exists on disk",
|
||||||
|
okMessage: "All manifests exist on disk",
|
||||||
|
failMessage: "Some manifests don't exist on disk",
|
||||||
|
f: staticPodManifestHealth,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return runHealthChecks(client, healthChecks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// runHealthChecks runs a set of health checks against the cluster
|
||||||
|
func runHealthChecks(client clientset.Interface, healthChecks []healthCheck) error {
|
||||||
|
for _, check := range healthChecks {
|
||||||
|
|
||||||
|
err := check.f(client)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.failMessage)
|
||||||
|
return fmt.Errorf("The cluster is not in an upgradeable state due to: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("[upgrade/health] Checking %s: %s\n", check.description, check.okMessage)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not
|
// apiServerHealthy checks whether the API server's /healthz endpoint is healthy
|
||||||
func IsControlPlaneSelfHosted(_ clientset.Interface) bool {
|
func apiServerHealthy(client clientset.Interface) error {
|
||||||
// No-op for now
|
healthStatus := 0
|
||||||
return false
|
|
||||||
|
// If client.Discovery().RESTClient() is nil, the fake client is used, and that means we are dry-running. Just proceed
|
||||||
|
if client.Discovery().RESTClient() == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
client.Discovery().RESTClient().Get().AbsPath("/healthz").Do().StatusCode(&healthStatus)
|
||||||
|
if healthStatus != http.StatusOK {
|
||||||
|
return fmt.Errorf("the API Server is unhealthy; /healthz didn't return %q", "ok")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// nodesHealthy checks whether all Nodes in the cluster are in the Running state
|
||||||
|
func nodesHealthy(client clientset.Interface) error {
|
||||||
|
nodes, err := client.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't list all nodes in cluster: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
notReadyNodes := getNotReadyNodes(nodes.Items)
|
||||||
|
if len(notReadyNodes) != 0 {
|
||||||
|
return fmt.Errorf("there are NotReady Nodes in the cluster: %v", notReadyNodes)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// controlPlaneHealth ensures all control plane DaemonSets are healthy
|
||||||
|
func controlPlaneHealth(client clientset.Interface) error {
|
||||||
|
notReadyDaemonSets, err := getNotReadyDaemonSets(client)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(notReadyDaemonSets) != 0 {
|
||||||
|
return fmt.Errorf("there are control plane DaemonSets in the cluster that are not ready: %v", notReadyDaemonSets)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// staticPodManifestHealth makes sure the required static pods are presents
|
||||||
|
func staticPodManifestHealth(_ clientset.Interface) error {
|
||||||
|
nonExistentManifests := []string{}
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
manifestFile := constants.GetStaticPodFilepath(component, constants.GetStaticPodDirectory())
|
||||||
|
if _, err := os.Stat(manifestFile); os.IsNotExist(err) {
|
||||||
|
nonExistentManifests = append(nonExistentManifests, manifestFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(nonExistentManifests) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("The control plane seems to be Static Pod-hosted, but some of the manifests don't seem to exist on disk. This probably means you're running 'kubeadm upgrade' on a remote machine, which is not supported for a Static Pod-hosted cluster. Manifest files not found: %v", nonExistentManifests)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsControlPlaneSelfHosted returns whether the control plane is self hosted or not
|
||||||
|
func IsControlPlaneSelfHosted(client clientset.Interface) bool {
|
||||||
|
notReadyDaemonSets, err := getNotReadyDaemonSets(client)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there are no NotReady DaemonSets, we are using self-hosting
|
||||||
|
return len(notReadyDaemonSets) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNotReadyDaemonSets gets the amount of Ready control plane DaemonSets
|
||||||
|
func getNotReadyDaemonSets(client clientset.Interface) ([]error, error) {
|
||||||
|
notReadyDaemonSets := []error{}
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
dsName := constants.AddSelfHostedPrefix(component)
|
||||||
|
ds, err := client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get daemonset %q in the %s namespace", dsName, metav1.NamespaceSystem)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := daemonSetHealth(&ds.Status); err != nil {
|
||||||
|
notReadyDaemonSets = append(notReadyDaemonSets, fmt.Errorf("DaemonSet %q not healthy: %v", dsName, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notReadyDaemonSets, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// daemonSetHealth is a helper function for getting the health of a DaemonSet's status
|
||||||
|
func daemonSetHealth(dsStatus *extensions.DaemonSetStatus) error {
|
||||||
|
if dsStatus.CurrentNumberScheduled != dsStatus.DesiredNumberScheduled {
|
||||||
|
return fmt.Errorf("current number of scheduled Pods ('%d') doesn't match the amount of desired Pods ('%d')", dsStatus.CurrentNumberScheduled, dsStatus.DesiredNumberScheduled)
|
||||||
|
}
|
||||||
|
if dsStatus.NumberAvailable == 0 {
|
||||||
|
return fmt.Errorf("no available Pods for DaemonSet")
|
||||||
|
}
|
||||||
|
if dsStatus.NumberReady == 0 {
|
||||||
|
return fmt.Errorf("no ready Pods for DaemonSet")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNotReadyNodes returns a string slice of nodes in the cluster that are NotReady
|
||||||
|
func getNotReadyNodes(nodes []v1.Node) []string {
|
||||||
|
notReadyNodes := []string{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
for _, condition := range node.Status.Conditions {
|
||||||
|
if condition.Type == v1.NodeReady && condition.Status != v1.ConditionTrue {
|
||||||
|
notReadyNodes = append(notReadyNodes, node.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notReadyNodes
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,21 @@ limitations under the License.
|
||||||
package upgrade
|
package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
"k8s.io/kubernetes/pkg/util/version"
|
"k8s.io/kubernetes/pkg/util/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaximumAllowedMinorVersionUpgradeSkew describes how many minor versions kubeadm can upgrade the control plane version in one go
|
||||||
|
MaximumAllowedMinorVersionUpgradeSkew = 1
|
||||||
|
|
||||||
|
// MaximumAllowedMinorVersionKubeletSkew describes how many minor versions the control plane version and the kubelet can skew in a kubeadm cluster
|
||||||
|
MaximumAllowedMinorVersionKubeletSkew = 1
|
||||||
|
)
|
||||||
|
|
||||||
// VersionSkewPolicyErrors describes version skew errors that might be seen during the validation process in EnforceVersionPolicies
|
// VersionSkewPolicyErrors describes version skew errors that might be seen during the validation process in EnforceVersionPolicies
|
||||||
type VersionSkewPolicyErrors struct {
|
type VersionSkewPolicyErrors struct {
|
||||||
Mandatory []error
|
Mandatory []error
|
||||||
|
@ -27,7 +39,120 @@ type VersionSkewPolicyErrors struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnforceVersionPolicies enforces that the proposed new version is compatible with all the different version skew policies
|
// EnforceVersionPolicies enforces that the proposed new version is compatible with all the different version skew policies
|
||||||
func EnforceVersionPolicies(_ VersionGetter, _ string, _ *version.Version, _, _ bool) *VersionSkewPolicyErrors {
|
func EnforceVersionPolicies(versionGetter VersionGetter, newK8sVersionStr string, newK8sVersion *version.Version, allowExperimentalUpgrades, allowRCUpgrades bool) *VersionSkewPolicyErrors {
|
||||||
// No-op now and return no skew errors
|
|
||||||
return nil
|
skewErrors := &VersionSkewPolicyErrors{
|
||||||
|
Mandatory: []error{},
|
||||||
|
Skippable: []error{},
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterVersionStr, clusterVersion, err := versionGetter.ClusterVersion()
|
||||||
|
if err != nil {
|
||||||
|
// This case can't be forced: kubeadm has to be able to lookup cluster version for upgrades to work
|
||||||
|
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch cluster version: %v", err))
|
||||||
|
return skewErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeadmVersionStr, kubeadmVersion, err := versionGetter.KubeadmVersion()
|
||||||
|
if err != nil {
|
||||||
|
// This case can't be forced: kubeadm has to be able to lookup its version for upgrades to work
|
||||||
|
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Unable to fetch kubeadm version: %v", err))
|
||||||
|
return skewErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeletVersions, err := versionGetter.KubeletVersions()
|
||||||
|
if err != nil {
|
||||||
|
// This is a non-critical error; continue although kubeadm couldn't look this up
|
||||||
|
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Unable to fetch kubeadm version: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the new version is a supported version (higher than the minimum one supported)
|
||||||
|
if constants.MinimumControlPlaneVersion.AtLeast(newK8sVersion) {
|
||||||
|
// This must not happen, kubeadm always supports a minimum version; and we can't go below that
|
||||||
|
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the minimum supported version %q. Please specify a higher version to upgrade to", newK8sVersionStr, clusterVersionStr))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure new version is higher than the current Kubernetes version
|
||||||
|
if clusterVersion.AtLeast(newK8sVersion) {
|
||||||
|
// Even though we don't officially support downgrades, it "should work", and if user(s) need it and are willing to try; they can do so with --force
|
||||||
|
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is equal to or lower than the cluster version %q. Downgrades are not supported yet", newK8sVersionStr, clusterVersionStr))
|
||||||
|
} else {
|
||||||
|
// If this code path runs, it's an upgrade (this code will run most of the time)
|
||||||
|
// kubeadm doesn't support upgrades between two minor versions; e.g. a v1.7 -> v1.9 upgrade is not supported. Enforce that here
|
||||||
|
if newK8sVersion.Minor() > clusterVersion.Minor()+MaximumAllowedMinorVersionUpgradeSkew {
|
||||||
|
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is too high; kubeadm can upgrade only %d minor version at a time", newK8sVersionStr, MaximumAllowedMinorVersionUpgradeSkew))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the kubeadm version is lower than what we want to upgrade to; error
|
||||||
|
if kubeadmVersion.LessThan(newK8sVersion) {
|
||||||
|
if newK8sVersion.Minor() > kubeadmVersion.Minor() {
|
||||||
|
// This is totally unsupported; kubeadm has no idea how it should handle a newer minor release than itself
|
||||||
|
skewErrors.Mandatory = append(skewErrors.Mandatory, fmt.Errorf("Specified version to upgrade to %q is one minor release higher than the kubeadm minor release (%d > %d). Such an upgrade is not supported", newK8sVersionStr, newK8sVersion.Minor(), kubeadmVersion.Minor()))
|
||||||
|
} else {
|
||||||
|
// Upgrading to a higher patch version than kubeadm is ok if the user specifies --force. Not recommended, but possible.
|
||||||
|
skewErrors.Skippable = append(skewErrors.Skippable, fmt.Errorf("Specified version to upgrade to %q is higher than the kubeadm version %q. Upgrade kubeadm first using the tool you used to install kubeadm", newK8sVersionStr, kubeadmVersionStr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detect if the version is unstable and the user didn't allow that
|
||||||
|
if err = detectUnstableVersionError(newK8sVersion, newK8sVersionStr, allowExperimentalUpgrades, allowRCUpgrades); err != nil {
|
||||||
|
skewErrors.Skippable = append(skewErrors.Skippable, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detect if there are too old kubelets in the cluster
|
||||||
|
// Check for nil here since this is the only case where kubeletVersions can be nil; if KubeletVersions() returned an error
|
||||||
|
// However, it's okay to skip that check
|
||||||
|
if kubeletVersions != nil {
|
||||||
|
if err = detectTooOldKubelets(newK8sVersion, kubeletVersions); err != nil {
|
||||||
|
skewErrors.Skippable = append(skewErrors.Skippable, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we did not see any errors, return nil
|
||||||
|
if len(skewErrors.Skippable) == 0 && len(skewErrors.Mandatory) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Uh oh, we encountered one or more errors, return them
|
||||||
|
return skewErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
// detectUnstableVersionError is a helper function for detecting if the unstable version (if specified) is allowed to be used
|
||||||
|
func detectUnstableVersionError(newK8sVersion *version.Version, newK8sVersionStr string, allowExperimentalUpgrades, allowRCUpgrades bool) error {
|
||||||
|
// Short-circuit quickly if this is not an unstable version
|
||||||
|
if len(newK8sVersion.PreRelease()) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// If the user has specified that unstable versions are fine, then no error should be returned
|
||||||
|
if allowExperimentalUpgrades {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// If this is a release candidate and we allow such ones, everything's fine
|
||||||
|
if strings.HasPrefix(newK8sVersion.PreRelease(), "rc") && allowRCUpgrades {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("Specified version to upgrade to %q is an unstable version and such upgrades weren't allowed via setting the --allow-*-upgrades flags", newK8sVersionStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// detectTooOldKubelets errors out if the kubelet versions are so old that an unsupported skew would happen if the cluster was upgraded
|
||||||
|
func detectTooOldKubelets(newK8sVersion *version.Version, kubeletVersions map[string]uint16) error {
|
||||||
|
tooOldKubeletVersions := []string{}
|
||||||
|
for versionStr := range kubeletVersions {
|
||||||
|
|
||||||
|
kubeletVersion, err := version.ParseSemantic(versionStr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't parse kubelet version %s", versionStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if newK8sVersion.Minor() > kubeletVersion.Minor()+MaximumAllowedMinorVersionKubeletSkew {
|
||||||
|
tooOldKubeletVersions = append(tooOldKubeletVersions, versionStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(tooOldKubeletVersions) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("There are kubelets in this cluster that are too old that have these versions %v", tooOldKubeletVersions)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,60 @@ limitations under the License.
|
||||||
package upgrade
|
package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"k8s.io/apimachinery/pkg/util/errors"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/proxy"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/apiconfig"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/clusterinfo"
|
||||||
|
nodebootstraptoken "k8s.io/kubernetes/cmd/kubeadm/app/phases/bootstraptoken/node"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/uploadconfig"
|
||||||
"k8s.io/kubernetes/pkg/util/version"
|
"k8s.io/kubernetes/pkg/util/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do
|
// PerformPostUpgradeTasks runs nearly the same functions as 'kubeadm init' would do
|
||||||
// Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade
|
// Note that the markmaster phase is left out, not needed, and no token is created as that doesn't belong to the upgrade
|
||||||
func PerformPostUpgradeTasks(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error {
|
func PerformPostUpgradeTasks(client clientset.Interface, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error {
|
||||||
// No-op; don't do anything here yet
|
errs := []error{}
|
||||||
return nil
|
|
||||||
|
// Upload currently used configuration to the cluster
|
||||||
|
// Note: This is done right in the beginning of cluster initialization; as we might want to make other phases
|
||||||
|
// depend on centralized information from this source in the future
|
||||||
|
if err := uploadconfig.UploadConfiguration(cfg, client); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create/update RBAC rules that makes the bootstrap tokens able to post CSRs
|
||||||
|
if err := nodebootstraptoken.AllowBootstrapTokensToPostCSRs(client); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
// Create/update RBAC rules that makes the bootstrap tokens able to get their CSRs approved automatically
|
||||||
|
if err := nodebootstraptoken.AutoApproveNodeBootstrapTokens(client, k8sVersion); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Is this needed to do here? I think that updating cluster info should probably be separate from a normal upgrade
|
||||||
|
// Create the cluster-info ConfigMap with the associated RBAC rules
|
||||||
|
// if err := clusterinfo.CreateBootstrapConfigMapIfNotExists(client, kubeadmconstants.GetAdminKubeConfigPath()); err != nil {
|
||||||
|
// return err
|
||||||
|
//}
|
||||||
|
// Create/update RBAC rules that makes the cluster-info ConfigMap reachable
|
||||||
|
if err := clusterinfo.CreateClusterInfoRBACRules(client); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: This call is deprecated
|
||||||
|
if err := apiconfig.CreateRBACRules(client, k8sVersion); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade kube-dns and kube-proxy
|
||||||
|
if err := dns.EnsureDNSAddon(cfg, client, k8sVersion); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
if err := proxy.EnsureProxyAddon(cfg, client); err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
return errors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package upgrade
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/images"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
prepullPrefix = "upgrade-prepull-"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Prepuller defines an interface for performing a prepull operation in a create-wait-delete fashion in parallel
|
||||||
|
type Prepuller interface {
|
||||||
|
CreateFunc(string) error
|
||||||
|
WaitFunc(string)
|
||||||
|
DeleteFunc(string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// DaemonSetPrepuller makes sure the control plane images are availble on all masters
|
||||||
|
type DaemonSetPrepuller struct {
|
||||||
|
client clientset.Interface
|
||||||
|
cfg *kubeadmapi.MasterConfiguration
|
||||||
|
waiter apiclient.Waiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDaemonSetPrepuller creates a new instance of the DaemonSetPrepuller struct
|
||||||
|
func NewDaemonSetPrepuller(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration) *DaemonSetPrepuller {
|
||||||
|
return &DaemonSetPrepuller{
|
||||||
|
client: client,
|
||||||
|
cfg: cfg,
|
||||||
|
waiter: waiter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateFunc creates a DaemonSet for making the image available on every relevant node
|
||||||
|
func (d *DaemonSetPrepuller) CreateFunc(component string) error {
|
||||||
|
image := images.GetCoreImage(component, d.cfg.ImageRepository, d.cfg.KubernetesVersion, d.cfg.UnifiedControlPlaneImage)
|
||||||
|
ds := buildPrePullDaemonSet(component, image)
|
||||||
|
|
||||||
|
// Create the DaemonSet in the API Server
|
||||||
|
if err := apiclient.CreateOrUpdateDaemonSet(d.client, ds); err != nil {
|
||||||
|
return fmt.Errorf("unable to create a DaemonSet for prepulling the component %q: %v", component, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitFunc waits for all Pods in the specified DaemonSet to be in the Running state
|
||||||
|
func (d *DaemonSetPrepuller) WaitFunc(component string) {
|
||||||
|
fmt.Printf("[upgrade/prepull] Prepulling image for component %s.\n", component)
|
||||||
|
d.waiter.WaitForPodsWithLabel("k8s-app=upgrade-prepull-" + component)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteFunc deletes the DaemonSet used for making the image available on every relevant node
|
||||||
|
func (d *DaemonSetPrepuller) DeleteFunc(component string) error {
|
||||||
|
dsName := addPrepullPrefix(component)
|
||||||
|
if err := apiclient.DeleteDaemonSetForeground(d.client, metav1.NamespaceSystem, dsName); err != nil {
|
||||||
|
return fmt.Errorf("unable to cleanup the DaemonSet used for prepulling %s: %v", component, err)
|
||||||
|
}
|
||||||
|
fmt.Printf("[upgrade/prepull] Prepulled image for component %s.\n", component)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrepullImagesInParallel creates DaemonSets synchronously but waits in parallel for the images to pull
|
||||||
|
func PrepullImagesInParallel(kubePrepuller Prepuller, timeout time.Duration) error {
|
||||||
|
componentsToPrepull := constants.MasterComponents
|
||||||
|
fmt.Printf("[upgrade/prepull] Will prepull images for components %v\n", componentsToPrepull)
|
||||||
|
|
||||||
|
timeoutChan := time.After(timeout)
|
||||||
|
|
||||||
|
// Synchronously create the DaemonSets
|
||||||
|
for _, component := range componentsToPrepull {
|
||||||
|
if err := kubePrepuller.CreateFunc(component); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a channel for streaming data from goroutines that run in parallell to a blocking for loop that cleans up
|
||||||
|
prePulledChan := make(chan string, len(componentsToPrepull))
|
||||||
|
for _, component := range componentsToPrepull {
|
||||||
|
go func(c string) {
|
||||||
|
// Wait as long as needed. This WaitFunc call should be blocking until completetion
|
||||||
|
kubePrepuller.WaitFunc(c)
|
||||||
|
// When the task is done, go ahead and cleanup by sending the name to the channel
|
||||||
|
prePulledChan <- c
|
||||||
|
}(component)
|
||||||
|
}
|
||||||
|
|
||||||
|
// This call blocks until all expected messages are received from the channel or errors out if timeoutChan fires.
|
||||||
|
// For every successful wait, kubePrepuller.DeleteFunc is executed
|
||||||
|
if err := waitForItemsFromChan(timeoutChan, prePulledChan, len(componentsToPrepull), kubePrepuller.DeleteFunc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("[upgrade/prepull] Successfully prepulled the images for all the control plane components")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForItemsFromChan waits for n elements from stringChan with a timeout. For every item received from stringChan, cleanupFunc is executed
|
||||||
|
func waitForItemsFromChan(timeoutChan <-chan time.Time, stringChan chan string, n int, cleanupFunc func(string) error) error {
|
||||||
|
i := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timeoutChan:
|
||||||
|
return fmt.Errorf("The prepull operation timed out")
|
||||||
|
case result := <-stringChan:
|
||||||
|
i++
|
||||||
|
// If the cleanup function errors; error here as well
|
||||||
|
if err := cleanupFunc(result); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if i == n {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addPrepullPrefix adds the prepull prefix for this functionality; can be used in names, labels, etc.
|
||||||
|
func addPrepullPrefix(component string) string {
|
||||||
|
return fmt.Sprintf("%s%s", prepullPrefix, component)
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildPrePullDaemonSet builds the DaemonSet that ensures the control plane image is available
|
||||||
|
func buildPrePullDaemonSet(component, image string) *extensions.DaemonSet {
|
||||||
|
var gracePeriodSecs int64
|
||||||
|
return &extensions.DaemonSet{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: addPrepullPrefix(component),
|
||||||
|
Namespace: metav1.NamespaceSystem,
|
||||||
|
},
|
||||||
|
Spec: extensions.DaemonSetSpec{
|
||||||
|
Template: v1.PodTemplateSpec{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"k8s-app": addPrepullPrefix(component),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: v1.PodSpec{
|
||||||
|
Containers: []v1.Container{
|
||||||
|
{
|
||||||
|
Name: component,
|
||||||
|
Image: image,
|
||||||
|
Command: []string{"/bin/sleep", "3600"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NodeSelector: map[string]string{
|
||||||
|
constants.LabelNodeRoleMaster: "",
|
||||||
|
},
|
||||||
|
Tolerations: []v1.Toleration{constants.MasterToleration},
|
||||||
|
TerminationGracePeriodSeconds: &gracePeriodSecs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,272 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package upgrade
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/selfhosting"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
|
"k8s.io/kubernetes/pkg/util/version"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// upgradeTempDSPrefix is the prefix added to the temporary DaemonSet's name used during the upgrade
|
||||||
|
upgradeTempDSPrefix = "temp-upgrade-"
|
||||||
|
|
||||||
|
// upgradeTempLabel is the label key used for identifying the temporary component's DaemonSet
|
||||||
|
upgradeTempLabel = "temp-upgrade-component"
|
||||||
|
|
||||||
|
// selfHostingWaitTimeout describes the maximum amount of time a self-hosting wait process should wait before timing out
|
||||||
|
selfHostingWaitTimeout = 2 * time.Minute
|
||||||
|
|
||||||
|
// selfHostingFailureThreshold describes how many times kubeadm will retry creating the DaemonSets
|
||||||
|
selfHostingFailureThreshold uint8 = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// controlPlaneComponentResources holds the relevant Pod and DaemonSet associated with a control plane component
|
||||||
|
type controlPlaneComponentResources struct {
|
||||||
|
pod *v1.Pod
|
||||||
|
daemonSet *extensions.DaemonSet
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelfHostedControlPlane upgrades a self-hosted control plane
|
||||||
|
// It works as follows:
|
||||||
|
// - The client gets the currently running DaemonSets and their associated Pods used for self-hosting the control plane
|
||||||
|
// - A temporary DaemonSet for the component in question is created; but nearly identical to the DaemonSet for the self-hosted component running right now
|
||||||
|
// - Why use this temporary DaemonSet? Because, the RollingUpdate strategy for upgrading DaemonSets first kills the old Pod, and then adds the new one
|
||||||
|
// - This doesn't work for self-hosted upgrades, as if you remove the only API server for instance you have in the cluster, the cluster essentially goes down
|
||||||
|
// - So instead, a nearly identical copy of the pre-upgrade DaemonSet is created and applied to the cluster. In the beginning, this duplicate DS is just idle
|
||||||
|
// - kubeadm waits for the temporary DaemonSet's Pod to become Running
|
||||||
|
// - kubeadm updates the real, self-hosted component. This will result in the pre-upgrade component Pod being removed from the cluster
|
||||||
|
// - Luckily, the temporary, backup DaemonSet now kicks in and takes over and acts as the control plane. It recognizes that a new Pod should be created,
|
||||||
|
// - as the "real" DaemonSet is being updated.
|
||||||
|
// - kubeadm waits for the pre-upgrade Pod to become deleted. It now takes advantage of the backup/temporary component
|
||||||
|
// - kubeadm waits for the new, upgraded DaemonSet to become Running.
|
||||||
|
// - Now that the new, upgraded DaemonSet is Running, we can delete the backup/temporary DaemonSet
|
||||||
|
// - Lastly, make sure the API /healthz endpoint still is reachable
|
||||||
|
//
|
||||||
|
// TL;DR; This is what the flow looks like in pseudo-code:
|
||||||
|
// for [kube-apiserver, kube-controller-manager, kube-scheduler], do:
|
||||||
|
// 1. Self-Hosted component v1 Running
|
||||||
|
// -> Duplicate the DaemonSet manifest
|
||||||
|
// 2. Self-Hosted component v1 Running (active). Backup component v1 Running (passive)
|
||||||
|
// -> Upgrade the Self-Hosted component v1 to v2.
|
||||||
|
// -> Self-Hosted component v1 is Deleted from the cluster
|
||||||
|
// 3. Backup component v1 Running becomes active and completes the upgrade by creating the Self-Hosted component v2 Pod (passive)
|
||||||
|
// -> Wait for Self-Hosted component v2 to become Running
|
||||||
|
// 4. Backup component v1 Running (active). Self-Hosted component v2 Running (passive)
|
||||||
|
// -> Backup component v1 is Deleted
|
||||||
|
// 5. Wait for Self-Hosted component v2 Running to become active
|
||||||
|
// 6. Repeat for all control plane components
|
||||||
|
func SelfHostedControlPlane(client clientset.Interface, waiter apiclient.Waiter, cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) error {
|
||||||
|
|
||||||
|
// Adjust the timeout slightly to something self-hosting specific
|
||||||
|
waiter.SetTimeout(selfHostingWaitTimeout)
|
||||||
|
|
||||||
|
// This function returns a map of DaemonSet objects ready to post to the API server
|
||||||
|
newControlPlaneDaemonSets := BuildUpgradedDaemonSetsFromConfig(cfg, k8sVersion)
|
||||||
|
|
||||||
|
controlPlaneResources, err := getCurrentControlPlaneComponentResources(client)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
// Make a shallow copy of the current DaemonSet in order to create a new, temporary one
|
||||||
|
tempDS := *controlPlaneResources[component].daemonSet
|
||||||
|
|
||||||
|
// Mutate the temp daemonset a little to be suitable for this usage (change label selectors, etc)
|
||||||
|
mutateTempDaemonSet(&tempDS, component)
|
||||||
|
|
||||||
|
// Create or update the DaemonSet in the API Server, and retry selfHostingFailureThreshold times if it errors out
|
||||||
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
return apiclient.CreateOrUpdateDaemonSet(client, &tempDS)
|
||||||
|
}, selfHostingFailureThreshold); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the temporary/backup self-hosted component to come up
|
||||||
|
if err := waiter.WaitForPodsWithLabel(buildTempUpgradeDSLabelQuery(component)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newDS := newControlPlaneDaemonSets[component]
|
||||||
|
|
||||||
|
// Upgrade the component's self-hosted resource
|
||||||
|
// During this upgrade; the temporary/backup component will take over
|
||||||
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
|
||||||
|
if _, err := client.ExtensionsV1beta1().DaemonSets(newDS.ObjectMeta.Namespace).Update(newDS); err != nil {
|
||||||
|
return fmt.Errorf("couldn't update self-hosted component's DaemonSet: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, selfHostingFailureThreshold); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the component's old Pod to disappear
|
||||||
|
oldPod := controlPlaneResources[component].pod
|
||||||
|
if err := waiter.WaitForPodToDisappear(oldPod.ObjectMeta.Name); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the main, upgraded self-hosted component to come up
|
||||||
|
// Here we're talking to the temporary/backup component; the upgraded component is in the process of starting up
|
||||||
|
if err := waiter.WaitForPodsWithLabel(selfhosting.BuildSelfHostedComponentLabelQuery(component)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the temporary DaemonSet, and retry selfHostingFailureThreshold times if it errors out
|
||||||
|
// In order to pivot back to the upgraded API server, we kill the temporary/backup component
|
||||||
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
return apiclient.DeleteDaemonSetForeground(client, tempDS.ObjectMeta.Namespace, tempDS.ObjectMeta.Name)
|
||||||
|
}, selfHostingFailureThreshold); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just as an extra safety check; make sure the API server is returning ok at the /healthz endpoint
|
||||||
|
if err := waiter.WaitForAPI(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/apply] Self-hosted component %q upgraded successfully!\n", component)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildUpgradedDaemonSetsFromConfig takes a config object and the current version and returns the DaemonSet objects to post to the master
|
||||||
|
func BuildUpgradedDaemonSetsFromConfig(cfg *kubeadmapi.MasterConfiguration, k8sVersion *version.Version) map[string]*extensions.DaemonSet {
|
||||||
|
// Here the map of different mutators to use for the control plane's podspec is stored
|
||||||
|
mutators := selfhosting.GetMutatorsFromFeatureGates(cfg.FeatureGates)
|
||||||
|
// Get the new PodSpecs to use
|
||||||
|
controlPlanePods := controlplane.GetStaticPodSpecs(cfg, k8sVersion)
|
||||||
|
// Store the created DaemonSets in this map
|
||||||
|
controlPlaneDaemonSets := map[string]*extensions.DaemonSet{}
|
||||||
|
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
podSpec := controlPlanePods[component].Spec
|
||||||
|
|
||||||
|
// Build the full DaemonSet object from the PodSpec generated from the control plane phase and
|
||||||
|
// using the self-hosting mutators available from the selfhosting phase
|
||||||
|
ds := selfhosting.BuildDaemonSet(component, &podSpec, mutators)
|
||||||
|
controlPlaneDaemonSets[component] = ds
|
||||||
|
}
|
||||||
|
return controlPlaneDaemonSets
|
||||||
|
}
|
||||||
|
|
||||||
|
// addTempUpgradeDSPrefix adds the upgradeTempDSPrefix to the specified DaemonSet name
|
||||||
|
func addTempUpgradeDSPrefix(currentName string) string {
|
||||||
|
return fmt.Sprintf("%s%s", upgradeTempDSPrefix, currentName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildTempUpgradeLabels returns the label string-string map for identifying the temporary
|
||||||
|
func buildTempUpgradeLabels(component string) map[string]string {
|
||||||
|
return map[string]string{
|
||||||
|
upgradeTempLabel: component,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildTempUpgradeDSLabelQuery creates the right query for matching
|
||||||
|
func buildTempUpgradeDSLabelQuery(component string) string {
|
||||||
|
return fmt.Sprintf("%s=%s", upgradeTempLabel, component)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mutateTempDaemonSet mutates the specified self-hosted DaemonSet for the specified component
|
||||||
|
// in a way that makes it possible to post a nearly identical, temporary DaemonSet as a backup
|
||||||
|
func mutateTempDaemonSet(tempDS *extensions.DaemonSet, component string) {
|
||||||
|
// Prefix the name of the temporary DaemonSet with upgradeTempDSPrefix
|
||||||
|
tempDS.ObjectMeta.Name = addTempUpgradeDSPrefix(tempDS.ObjectMeta.Name)
|
||||||
|
// Set .Labels to something else than the "real" self-hosted components have
|
||||||
|
tempDS.ObjectMeta.Labels = buildTempUpgradeLabels(component)
|
||||||
|
tempDS.Spec.Selector.MatchLabels = buildTempUpgradeLabels(component)
|
||||||
|
tempDS.Spec.Template.ObjectMeta.Labels = buildTempUpgradeLabels(component)
|
||||||
|
// Clean all unnecessary ObjectMeta fields
|
||||||
|
tempDS.ObjectMeta = extractRelevantObjectMeta(tempDS.ObjectMeta)
|
||||||
|
// Reset .Status as we're posting a new object
|
||||||
|
tempDS.Status = extensions.DaemonSetStatus{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractRelevantObjectMeta returns only the relevant parts of ObjectMeta required when creating
|
||||||
|
// a new, identical resource. We should not POST ResourceVersion, UUIDs, etc., only the name, labels,
|
||||||
|
// namespace and annotations should be preserved.
|
||||||
|
func extractRelevantObjectMeta(ob metav1.ObjectMeta) metav1.ObjectMeta {
|
||||||
|
return metav1.ObjectMeta{
|
||||||
|
Name: ob.Name,
|
||||||
|
Namespace: ob.Namespace,
|
||||||
|
Labels: ob.Labels,
|
||||||
|
Annotations: ob.Annotations,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// listPodsWithLabelSelector returns the relevant Pods for the given LabelSelector
|
||||||
|
func listPodsWithLabelSelector(client clientset.Interface, kvLabel string) (*v1.PodList, error) {
|
||||||
|
return client.CoreV1().Pods(metav1.NamespaceSystem).List(metav1.ListOptions{
|
||||||
|
LabelSelector: kvLabel,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// getCurrentControlPlaneComponentResources returns a string-(Pod|DaemonSet) map for later use
|
||||||
|
func getCurrentControlPlaneComponentResources(client clientset.Interface) (map[string]controlPlaneComponentResources, error) {
|
||||||
|
controlPlaneResources := map[string]controlPlaneComponentResources{}
|
||||||
|
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
var podList *v1.PodList
|
||||||
|
var currentDS *extensions.DaemonSet
|
||||||
|
|
||||||
|
// Get the self-hosted pod associated with the component
|
||||||
|
podLabelSelector := selfhosting.BuildSelfHostedComponentLabelQuery(component)
|
||||||
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
var tryrunerr error
|
||||||
|
podList, tryrunerr = listPodsWithLabelSelector(client, podLabelSelector)
|
||||||
|
return tryrunerr // note that tryrunerr is most likely nil here (in successful cases)
|
||||||
|
}, selfHostingFailureThreshold); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that there are only one Pod with this label selector; otherwise unexpected things can happen
|
||||||
|
if len(podList.Items) > 1 {
|
||||||
|
return nil, fmt.Errorf("too many pods with label selector %q found in the %s namespace", podLabelSelector, metav1.NamespaceSystem)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the component's DaemonSet object
|
||||||
|
dsName := constants.AddSelfHostedPrefix(component)
|
||||||
|
if err := apiclient.TryRunCommand(func() error {
|
||||||
|
var tryrunerr error
|
||||||
|
// Try to get the current self-hosted component
|
||||||
|
currentDS, tryrunerr = client.ExtensionsV1beta1().DaemonSets(metav1.NamespaceSystem).Get(dsName, metav1.GetOptions{})
|
||||||
|
return tryrunerr // note that tryrunerr is most likely nil here (in successful cases)
|
||||||
|
}, selfHostingFailureThreshold); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the associated resources to the map to return later
|
||||||
|
controlPlaneResources[component] = controlPlaneComponentResources{
|
||||||
|
pod: &podList.Items[0],
|
||||||
|
daemonSet: currentDS,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return controlPlaneResources, nil
|
||||||
|
}
|
|
@ -17,13 +17,176 @@ limitations under the License.
|
||||||
package upgrade
|
package upgrade
|
||||||
|
|
||||||
import (
|
import (
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
|
||||||
"k8s.io/kubernetes/pkg/util/version"
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PerformStaticPodControlPlaneUpgrade upgrades a static pod-hosted control plane
|
// StaticPodPathManager is responsible for tracking the directories used in the static pod upgrade transition
|
||||||
func PerformStaticPodControlPlaneUpgrade(_ clientset.Interface, _ *kubeadmapi.MasterConfiguration, _ *version.Version) error {
|
type StaticPodPathManager interface {
|
||||||
// No-op for now; doesn't do anything yet
|
// MoveFile should move a file from oldPath to newPath
|
||||||
|
MoveFile(oldPath, newPath string) error
|
||||||
|
// RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet
|
||||||
|
RealManifestPath(component string) string
|
||||||
|
// RealManifestDir should point to the static pod manifest directory used by the kubelet
|
||||||
|
RealManifestDir() string
|
||||||
|
// TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade
|
||||||
|
TempManifestPath(component string) string
|
||||||
|
// TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade
|
||||||
|
TempManifestDir() string
|
||||||
|
// BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition
|
||||||
|
BackupManifestPath(component string) string
|
||||||
|
// BackupManifestDir should point to the backup directory used for backuping manifests during the transition
|
||||||
|
BackupManifestDir() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// KubeStaticPodPathManager is a real implementation of StaticPodPathManager that is used when upgrading a static pod cluster
|
||||||
|
type KubeStaticPodPathManager struct {
|
||||||
|
realManifestDir string
|
||||||
|
tempManifestDir string
|
||||||
|
backupManifestDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKubeStaticPodPathManager creates a new instance of KubeStaticPodPathManager
|
||||||
|
func NewKubeStaticPodPathManager(realDir, tempDir, backupDir string) StaticPodPathManager {
|
||||||
|
return &KubeStaticPodPathManager{
|
||||||
|
realManifestDir: realDir,
|
||||||
|
tempManifestDir: tempDir,
|
||||||
|
backupManifestDir: backupDir,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKubeStaticPodPathManagerUsingTempDirs creates a new instance of KubeStaticPodPathManager with temporary directories backing it
|
||||||
|
func NewKubeStaticPodPathManagerUsingTempDirs(realManifestDir string) (StaticPodPathManager, error) {
|
||||||
|
upgradedManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-upgraded-manifests")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
backupManifestsDir, err := constants.CreateTempDirForKubeadm("kubeadm-backup-manifests")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewKubeStaticPodPathManager(realManifestDir, upgradedManifestsDir, backupManifestsDir), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MoveFile should move a file from oldPath to newPath
|
||||||
|
func (spm *KubeStaticPodPathManager) MoveFile(oldPath, newPath string) error {
|
||||||
|
return os.Rename(oldPath, newPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RealManifestPath gets the file path for the component in the "real" static pod manifest directory used by the kubelet
|
||||||
|
func (spm *KubeStaticPodPathManager) RealManifestPath(component string) string {
|
||||||
|
return constants.GetStaticPodFilepath(component, spm.realManifestDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RealManifestDir should point to the static pod manifest directory used by the kubelet
|
||||||
|
func (spm *KubeStaticPodPathManager) RealManifestDir() string {
|
||||||
|
return spm.realManifestDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// TempManifestPath gets the file path for the component in the temporary directory created for generating new manifests for the upgrade
|
||||||
|
func (spm *KubeStaticPodPathManager) TempManifestPath(component string) string {
|
||||||
|
return constants.GetStaticPodFilepath(component, spm.tempManifestDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TempManifestDir should point to the temporary directory created for generating new manifests for the upgrade
|
||||||
|
func (spm *KubeStaticPodPathManager) TempManifestDir() string {
|
||||||
|
return spm.tempManifestDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackupManifestPath gets the file path for the component in the backup directory used for backuping manifests during the transition
|
||||||
|
func (spm *KubeStaticPodPathManager) BackupManifestPath(component string) string {
|
||||||
|
return constants.GetStaticPodFilepath(component, spm.backupManifestDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackupManifestDir should point to the backup directory used for backuping manifests during the transition
|
||||||
|
func (spm *KubeStaticPodPathManager) BackupManifestDir() string {
|
||||||
|
return spm.backupManifestDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// StaticPodControlPlane upgrades a static pod-hosted control plane
|
||||||
|
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration) error {
|
||||||
|
|
||||||
|
// This string-string map stores the component name and backup filepath (if a rollback is needed).
|
||||||
|
// If a rollback is needed,
|
||||||
|
recoverManifests := map[string]string{}
|
||||||
|
|
||||||
|
beforePodHashMap, err := waiter.WaitForStaticPodControlPlaneHashes(cfg.NodeName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the updated static Pod manifests into the temporary directory
|
||||||
|
fmt.Printf("[upgrade/staticpods] Writing upgraded Static Pod manifests to %q\n", pathMgr.TempManifestDir())
|
||||||
|
err = controlplane.CreateInitStaticPodManifestFiles(pathMgr.TempManifestDir(), cfg)
|
||||||
|
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
// The old manifest is here; in the /etc/kubernetes/manifests/
|
||||||
|
currentManifestPath := pathMgr.RealManifestPath(component)
|
||||||
|
// The new, upgraded manifest will be written here
|
||||||
|
newManifestPath := pathMgr.TempManifestPath(component)
|
||||||
|
// The old manifest will be moved here; into a subfolder of the temporary directory
|
||||||
|
// If a rollback is needed, these manifests will be put back to where they where initially
|
||||||
|
backupManifestPath := pathMgr.BackupManifestPath(component)
|
||||||
|
|
||||||
|
// Store the backup path in the recover list. If something goes wrong now, this component will be rolled back.
|
||||||
|
recoverManifests[component] = backupManifestPath
|
||||||
|
|
||||||
|
// Move the old manifest into the old-manifests directory
|
||||||
|
if err := pathMgr.MoveFile(currentManifestPath, backupManifestPath); err != nil {
|
||||||
|
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move the new manifest into the manifests directory
|
||||||
|
if err := pathMgr.MoveFile(newManifestPath, currentManifestPath); err != nil {
|
||||||
|
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/staticpods] Moved upgraded manifest to %q and backed up old manifest to %q\n", currentManifestPath, backupManifestPath)
|
||||||
|
fmt.Println("[upgrade/staticpods] Waiting for the kubelet to restart the component")
|
||||||
|
|
||||||
|
// Wait for the mirror Pod hash to change; otherwise we'll run into race conditions here when the kubelet hasn't had time to
|
||||||
|
// notice the removal of the Static Pod, leading to a false positive below where we check that the API endpoint is healthy
|
||||||
|
// If we don't do this, there is a case where we remove the Static Pod manifest, kubelet is slow to react, kubeadm checks the
|
||||||
|
// API endpoint below of the OLD Static Pod component and proceeds quickly enough, which might lead to unexpected results.
|
||||||
|
if err := waiter.WaitForStaticPodControlPlaneHashChange(cfg.NodeName, component, beforePodHashMap[component]); err != nil {
|
||||||
|
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the static pod component to come up and register itself as a mirror pod
|
||||||
|
if err := waiter.WaitForPodsWithLabel("component=" + component); err != nil {
|
||||||
|
return rollbackOldManifests(recoverManifests, err, pathMgr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("[upgrade/staticpods] Component %q upgraded successfully!\n", component)
|
||||||
|
}
|
||||||
|
// Remove the temporary directories used on a best-effort (don't fail if the calls error out)
|
||||||
|
// The calls are set here by design; we should _not_ use "defer" above as that would remove the directories
|
||||||
|
// even in the "fail and rollback" case, where we want the directories preserved for the user.
|
||||||
|
os.RemoveAll(pathMgr.TempManifestDir())
|
||||||
|
os.RemoveAll(pathMgr.BackupManifestDir())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rollbackOldManifests rolls back the backuped manifests if something went wrong
|
||||||
|
func rollbackOldManifests(oldManifests map[string]string, origErr error, pathMgr StaticPodPathManager) error {
|
||||||
|
errs := []error{origErr}
|
||||||
|
for component, backupPath := range oldManifests {
|
||||||
|
// Where we should put back the backed up manifest
|
||||||
|
realManifestPath := pathMgr.RealManifestPath(component)
|
||||||
|
|
||||||
|
// Move the backup manifest back into the manifests directory
|
||||||
|
err := pathMgr.MoveFile(backupPath, realManifestPath)
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Let the user know there we're problems, but we tried to reçover
|
||||||
|
return fmt.Errorf("couldn't upgrade control plane. kubeadm has tried to recover everything into the earlier state. Errors faced: %v", errs)
|
||||||
|
}
|
||||||
|
|
|
@ -20,8 +20,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
||||||
versionutil "k8s.io/kubernetes/pkg/util/version"
|
versionutil "k8s.io/kubernetes/pkg/util/version"
|
||||||
|
"k8s.io/kubernetes/pkg/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// VersionGetter defines an interface for fetching different versions.
|
// VersionGetter defines an interface for fetching different versions.
|
||||||
|
@ -43,11 +47,8 @@ type KubeVersionGetter struct {
|
||||||
w io.Writer
|
w io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure KubeVersionGetter implements the VersionGetter interface
|
|
||||||
var _ VersionGetter = &KubeVersionGetter{}
|
|
||||||
|
|
||||||
// NewKubeVersionGetter returns a new instance of KubeVersionGetter
|
// NewKubeVersionGetter returns a new instance of KubeVersionGetter
|
||||||
func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVersionGetter {
|
func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) VersionGetter {
|
||||||
return &KubeVersionGetter{
|
return &KubeVersionGetter{
|
||||||
client: client,
|
client: client,
|
||||||
w: writer,
|
w: writer,
|
||||||
|
@ -56,28 +57,68 @@ func NewKubeVersionGetter(client clientset.Interface, writer io.Writer) *KubeVer
|
||||||
|
|
||||||
// ClusterVersion gets API server version
|
// ClusterVersion gets API server version
|
||||||
func (g *KubeVersionGetter) ClusterVersion() (string, *versionutil.Version, error) {
|
func (g *KubeVersionGetter) ClusterVersion() (string, *versionutil.Version, error) {
|
||||||
fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: ")
|
clusterVersionInfo, err := g.client.Discovery().ServerVersion()
|
||||||
fmt.Fprintln(g.w, "v1.7.0")
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("Couldn't fetch cluster version from the API Server: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(g.w, "[upgrade/versions] Cluster version: %s\n", clusterVersionInfo.String())
|
||||||
|
|
||||||
return "v1.7.0", versionutil.MustParseSemantic("v1.7.0"), nil
|
clusterVersion, err := versionutil.ParseSemantic(clusterVersionInfo.String())
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("Couldn't parse cluster version: %v", err)
|
||||||
|
}
|
||||||
|
return clusterVersionInfo.String(), clusterVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KubeadmVersion gets kubeadm version
|
// KubeadmVersion gets kubeadm version
|
||||||
func (g *KubeVersionGetter) KubeadmVersion() (string, *versionutil.Version, error) {
|
func (g *KubeVersionGetter) KubeadmVersion() (string, *versionutil.Version, error) {
|
||||||
fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", "v1.8.0")
|
kubeadmVersionInfo := version.Get()
|
||||||
|
fmt.Fprintf(g.w, "[upgrade/versions] kubeadm version: %s\n", kubeadmVersionInfo.String())
|
||||||
|
|
||||||
return "v1.8.0", versionutil.MustParseSemantic("v1.8.0"), nil
|
kubeadmVersion, err := versionutil.ParseSemantic(kubeadmVersionInfo.String())
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("Couldn't parse kubeadm version: %v", err)
|
||||||
|
}
|
||||||
|
return kubeadmVersionInfo.String(), kubeadmVersion, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// VersionFromCILabel resolves different labels like "stable" to action semver versions using the Kubernetes CI uploads to GCS
|
// VersionFromCILabel resolves a version label like "latest" or "stable" to an actual version using the public Kubernetes CI uploads
|
||||||
func (g *KubeVersionGetter) VersionFromCILabel(_, _ string) (string, *versionutil.Version, error) {
|
func (g *KubeVersionGetter) VersionFromCILabel(ciVersionLabel, description string) (string, *versionutil.Version, error) {
|
||||||
return "v1.8.1", versionutil.MustParseSemantic("v1.8.0"), nil
|
versionStr, err := kubeadmutil.KubernetesReleaseVersion(ciVersionLabel)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("Couldn't fetch latest %s version from the internet: %v", description, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if description != "" {
|
||||||
|
fmt.Fprintf(g.w, "[upgrade/versions] Latest %s: %s\n", description, versionStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
ver, err := versionutil.ParseSemantic(versionStr)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, fmt.Errorf("Couldn't parse latest %s version: %v", description, err)
|
||||||
|
}
|
||||||
|
return versionStr, ver, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KubeletVersions gets the versions of the kubelets in the cluster
|
// KubeletVersions gets the versions of the kubelets in the cluster
|
||||||
func (g *KubeVersionGetter) KubeletVersions() (map[string]uint16, error) {
|
func (g *KubeVersionGetter) KubeletVersions() (map[string]uint16, error) {
|
||||||
// This tells kubeadm that there are two nodes in the cluster; both on the v1.7.1 version currently
|
nodes, err := g.client.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||||
return map[string]uint16{
|
if err != nil {
|
||||||
"v1.7.1": 2,
|
return nil, fmt.Errorf("couldn't list all nodes in cluster")
|
||||||
}, nil
|
}
|
||||||
|
return computeKubeletVersions(nodes.Items), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// computeKubeletVersions returns a string-int map that describes how many nodes are of a specific version
|
||||||
|
func computeKubeletVersions(nodes []v1.Node) map[string]uint16 {
|
||||||
|
kubeletVersions := map[string]uint16{}
|
||||||
|
for _, node := range nodes {
|
||||||
|
kver := node.Status.NodeInfo.KubeletVersion
|
||||||
|
if _, found := kubeletVersions[kver]; !found {
|
||||||
|
kubeletVersions[kver] = 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kubeletVersions[kver]++
|
||||||
|
}
|
||||||
|
return kubeletVersions
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
kuberuntime "k8s.io/apimachinery/pkg/runtime"
|
kuberuntime "k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
|
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
|
@ -33,7 +34,7 @@ import (
|
||||||
|
|
||||||
// ClientBackedDryRunGetter implements the DryRunGetter interface for use in NewDryRunClient() and proxies all GET and LIST requests to the backing API server reachable via rest.Config
|
// ClientBackedDryRunGetter implements the DryRunGetter interface for use in NewDryRunClient() and proxies all GET and LIST requests to the backing API server reachable via rest.Config
|
||||||
type ClientBackedDryRunGetter struct {
|
type ClientBackedDryRunGetter struct {
|
||||||
baseConfig *rest.Config
|
client clientset.Interface
|
||||||
dynClientPool dynamic.ClientPool
|
dynClientPool dynamic.ClientPool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,11 +42,16 @@ type ClientBackedDryRunGetter struct {
|
||||||
var _ DryRunGetter = &ClientBackedDryRunGetter{}
|
var _ DryRunGetter = &ClientBackedDryRunGetter{}
|
||||||
|
|
||||||
// NewClientBackedDryRunGetter creates a new ClientBackedDryRunGetter instance based on the rest.Config object
|
// NewClientBackedDryRunGetter creates a new ClientBackedDryRunGetter instance based on the rest.Config object
|
||||||
func NewClientBackedDryRunGetter(config *rest.Config) *ClientBackedDryRunGetter {
|
func NewClientBackedDryRunGetter(config *rest.Config) (*ClientBackedDryRunGetter, error) {
|
||||||
return &ClientBackedDryRunGetter{
|
client, err := clientset.NewForConfig(config)
|
||||||
baseConfig: config,
|
if err != nil {
|
||||||
dynClientPool: dynamic.NewDynamicClientPool(config),
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return &ClientBackedDryRunGetter{
|
||||||
|
client: client,
|
||||||
|
dynClientPool: dynamic.NewDynamicClientPool(config),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientBackedDryRunGetterFromKubeconfig creates a new ClientBackedDryRunGetter instance from the given KubeConfig file
|
// NewClientBackedDryRunGetterFromKubeconfig creates a new ClientBackedDryRunGetter instance from the given KubeConfig file
|
||||||
|
@ -58,7 +64,7 @@ func NewClientBackedDryRunGetterFromKubeconfig(file string) (*ClientBackedDryRun
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create API client configuration from kubeconfig: %v", err)
|
return nil, fmt.Errorf("failed to create API client configuration from kubeconfig: %v", err)
|
||||||
}
|
}
|
||||||
return NewClientBackedDryRunGetter(clientConfig), nil
|
return NewClientBackedDryRunGetter(clientConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleGetAction handles GET actions to the dryrun clientset this interface supports
|
// HandleGetAction handles GET actions to the dryrun clientset this interface supports
|
||||||
|
@ -106,6 +112,11 @@ func (clg *ClientBackedDryRunGetter) HandleListAction(action core.ListAction) (b
|
||||||
return true, newObj, err
|
return true, newObj, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Client gets the backing clientset.Interface
|
||||||
|
func (clg *ClientBackedDryRunGetter) Client() clientset.Interface {
|
||||||
|
return clg.client
|
||||||
|
}
|
||||||
|
|
||||||
// actionToResourceClient returns the ResourceInterface for the given action
|
// actionToResourceClient returns the ResourceInterface for the given action
|
||||||
// First; the function gets the right API group interface from the resource type. The API group struct behind the interface
|
// First; the function gets the right API group interface from the resource type. The API group struct behind the interface
|
||||||
// returned may be cached in the dynamic client pool. Then, an APIResource object is constructed so that it can be passed to
|
// returned may be cached in the dynamic client pool. Then, an APIResource object is constructed so that it can be passed to
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
extensions "k8s.io/api/extensions/v1beta1"
|
extensions "k8s.io/api/extensions/v1beta1"
|
||||||
rbac "k8s.io/api/rbac/v1beta1"
|
rbac "k8s.io/api/rbac/v1beta1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -97,6 +98,15 @@ func CreateOrUpdateDaemonSet(client clientset.Interface, ds *extensions.DaemonSe
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteDaemonSetForeground deletes the specified DaemonSet in foreground mode; i.e. it blocks until/makes sure all the managed Pods are deleted
|
||||||
|
func DeleteDaemonSetForeground(client clientset.Interface, namespace, name string) error {
|
||||||
|
foregroundDelete := metav1.DeletePropagationForeground
|
||||||
|
deleteOptions := &metav1.DeleteOptions{
|
||||||
|
PropagationPolicy: &foregroundDelete,
|
||||||
|
}
|
||||||
|
return client.ExtensionsV1beta1().DaemonSets(namespace).Delete(name, deleteOptions)
|
||||||
|
}
|
||||||
|
|
||||||
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
// CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
|
||||||
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
|
func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
|
||||||
if _, err := client.RbacV1beta1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil {
|
if _, err := client.RbacV1beta1().Roles(role.ObjectMeta.Namespace).Create(role); err != nil {
|
||||||
|
|
|
@ -17,6 +17,8 @@ limitations under the License.
|
||||||
package apiclient
|
package apiclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -38,6 +40,10 @@ type Waiter interface {
|
||||||
WaitForPodsWithLabel(kvLabel string) error
|
WaitForPodsWithLabel(kvLabel string) error
|
||||||
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
|
// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
|
||||||
WaitForPodToDisappear(staticPodName string) error
|
WaitForPodToDisappear(staticPodName string) error
|
||||||
|
// WaitForStaticPodControlPlaneHashes
|
||||||
|
WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
|
||||||
|
// WaitForStaticPodControlPlaneHashChange
|
||||||
|
WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error
|
||||||
// SetTimeout adjusts the timeout to the specified duration
|
// SetTimeout adjusts the timeout to the specified duration
|
||||||
SetTimeout(timeout time.Duration)
|
SetTimeout(timeout time.Duration)
|
||||||
}
|
}
|
||||||
|
@ -110,7 +116,7 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
|
||||||
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
|
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
|
||||||
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
|
_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(podName, metav1.GetOptions{})
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
fmt.Printf("[apiclient] The Static Pod %q is now removed\n", podName)
|
fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -122,6 +128,61 @@ func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
|
||||||
w.timeout = timeout
|
w.timeout = timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods
|
||||||
|
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
|
||||||
|
|
||||||
|
var mirrorPodHashes map[string]string
|
||||||
|
err := wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
|
||||||
|
|
||||||
|
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
mirrorPodHashes = hashes
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return mirrorPodHashes, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodControlPlaneHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
|
||||||
|
// This implicitely means this function blocks until the kubelet has restarted the Static Pod in question
|
||||||
|
func (w *KubeWaiter) WaitForStaticPodControlPlaneHashChange(nodeName, component, previousHash string) error {
|
||||||
|
return wait.PollImmediate(constants.APICallRetryInterval, w.timeout, func() (bool, error) {
|
||||||
|
|
||||||
|
hashes, err := getStaticPodControlPlaneHashes(w.client, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// We should continue polling until the UID changes
|
||||||
|
if hashes[component] == previousHash {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStaticPodControlPlaneHashes computes hashes for all the control plane's Static Pod resources
|
||||||
|
func getStaticPodControlPlaneHashes(client clientset.Interface, nodeName string) (map[string]string, error) {
|
||||||
|
|
||||||
|
mirrorPodHashes := map[string]string{}
|
||||||
|
for _, component := range constants.MasterComponents {
|
||||||
|
staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
|
||||||
|
staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(staticPodName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
podBytes, err := json.Marshal(staticPod)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mirrorPodHashes[component] = fmt.Sprintf("%x", sha256.Sum256(podBytes))
|
||||||
|
}
|
||||||
|
return mirrorPodHashes, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
|
// TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned
|
||||||
func TryRunCommand(f func() error, failureThreshold uint8) error {
|
func TryRunCommand(f func() error, failureThreshold uint8) error {
|
||||||
var numFailures uint8
|
var numFailures uint8
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/errors"
|
"k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||||
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -98,3 +99,18 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error {
|
||||||
|
|
||||||
// SetTimeout is a no-op; we don't wait in this implementation
|
// SetTimeout is a no-op; we don't wait in this implementation
|
||||||
func (w *Waiter) SetTimeout(_ time.Duration) {}
|
func (w *Waiter) SetTimeout(_ time.Duration) {}
|
||||||
|
|
||||||
|
// WaitForStaticPodControlPlaneHashes returns an empty hash for all control plane images; WaitForStaticPodControlPlaneHashChange won't block in any case
|
||||||
|
// but the empty strings there are needed
|
||||||
|
func (w *Waiter) WaitForStaticPodControlPlaneHashes(_ string) (map[string]string, error) {
|
||||||
|
return map[string]string{
|
||||||
|
constants.KubeAPIServer: "",
|
||||||
|
constants.KubeControllerManager: "",
|
||||||
|
constants.KubeScheduler: "",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitForStaticPodControlPlaneHashChange returns a dummy nil error in order for the flow to just continue as we're dryrunning
|
||||||
|
func (w *Waiter) WaitForStaticPodControlPlaneHashChange(_, _, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue