Merge pull request #73029 from neolit123/join-phases

kubeadm: include a phase runner for `join`
pull/564/head
Kubernetes Prow Robot 2019-01-20 10:53:47 -08:00 committed by GitHub
commit 3ec18a5aed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 25 deletions

View File

@ -38,6 +38,7 @@ import (
kubeadmapiv1beta1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta1"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/discovery"
certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs"
@ -157,6 +158,7 @@ var (
// Please note that this structure includes the public kubeadm config API, but only a subset of the options
// supported by this api will be exposed as a flag.
type joinOptions struct {
args *[]string
cfgPath string
token string
controlPlane bool
@ -168,6 +170,8 @@ type joinOptions struct {
// this data is shared across all the phases that are included in the workflow.
type joinData struct {
cfg *kubeadmapi.JoinConfiguration
initCfg *kubeadmapi.InitConfiguration
tlsBootstrapCfg *clientcmdapi.Config
ignorePreflightErrors sets.String
outputWriter io.Writer
}
@ -179,16 +183,25 @@ func NewCmdJoin(out io.Writer, joinOptions *joinOptions) *cobra.Command {
if joinOptions == nil {
joinOptions = newJoinOptions()
}
joinRunner := workflow.NewRunner()
cmd := &cobra.Command{
Use: "join",
Short: "Run this on any machine you wish to join an existing cluster",
Long: joinLongDescription,
Run: func(cmd *cobra.Command, args []string) {
joinData, err := newJoinData(cmd, args, joinOptions, out)
joinOptions.args = &args
c, err := joinRunner.InitData()
kubeadmutil.CheckErr(err)
err = joinData.Run()
err = joinRunner.Run()
kubeadmutil.CheckErr(err)
// TODO: remove this once we have all phases in place.
// the method joinData.Run() itself should be removed too.
data := c.(joinData)
err = data.Run()
kubeadmutil.CheckErr(err)
},
}
@ -196,6 +209,20 @@ func NewCmdJoin(out io.Writer, joinOptions *joinOptions) *cobra.Command {
AddJoinConfigFlags(cmd.Flags(), joinOptions.externalcfg)
AddJoinOtherFlags(cmd.Flags(), &joinOptions.cfgPath, &joinOptions.ignorePreflightErrors, &joinOptions.controlPlane, &joinOptions.token)
// initialize the workflow runner with the list of phases
// TODO: append phases here like so:
// joinRunner.AppendPhase(phases.NewPreflightMasterPhase())
// sets the data builder function, that will be used by the runner
// both when running the entire workflow or single phases
joinRunner.SetDataInitializer(func(cmd *cobra.Command) (workflow.RunData, error) {
return newJoinData(cmd, joinOptions, out)
})
// binds the Runner to kubeadm join command by altering
// command help, adding --skip-phases flag and by adding phases subcommands
joinRunner.BindToCommand(cmd)
return cmd
}
@ -296,7 +323,7 @@ func newJoinOptions() *joinOptions {
// newJoinData returns a new joinData struct to be used for the execution of the kubeadm join workflow.
// This func takes care of validating joinOptions passed to the command, and then it converts
// options into the internal JoinConfiguration type that is used as input all the phases in the kubeadm join workflow
func newJoinData(cmd *cobra.Command, args []string, options *joinOptions, out io.Writer) (joinData, error) {
func newJoinData(cmd *cobra.Command, options *joinOptions, out io.Writer) (joinData, error) {
// Re-apply defaults to the public kubeadm API (this will set only values not exposed/not set as a flags)
kubeadmscheme.Scheme.Default(options.externalcfg)
@ -319,13 +346,13 @@ func newJoinData(cmd *cobra.Command, args []string, options *joinOptions, out io
}
// if an APIServerEndpoint from which to retrive cluster information was not provided, unset the Discovery.BootstrapToken object
if len(args) == 0 {
if len(*options.args) == 0 {
options.externalcfg.Discovery.BootstrapToken = nil
} else {
if len(options.cfgPath) == 0 && len(args) > 1 {
klog.Warningf("[join] WARNING: More than one API server endpoint supplied on command line %v. Using the first one.", args)
if len(options.cfgPath) == 0 && len(*options.args) > 1 {
klog.Warningf("[join] WARNING: More than one API server endpoint supplied on command line %v. Using the first one.", *options.args)
}
options.externalcfg.Discovery.BootstrapToken.APIServerEndpoint = args[0]
options.externalcfg.Discovery.BootstrapToken.APIServerEndpoint = (*options.args)[0]
}
// if not joining a control plane, unset the ControlPlane object
@ -378,6 +405,46 @@ func newJoinData(cmd *cobra.Command, args []string, options *joinOptions, out io
}, nil
}
// Cfg returns the JoinConfiguration.
func (j *joinData) Cfg() *kubeadmapi.JoinConfiguration {
return j.cfg
}
// TLSBootstrapCfg returns the cluster-info (kubeconfig).
func (j *joinData) TLSBootstrapCfg() (*clientcmdapi.Config, error) {
if j.tlsBootstrapCfg != nil {
return j.tlsBootstrapCfg, nil
}
klog.V(1).Infoln("[join] Discovering cluster-info")
tlsBootstrapCfg, err := discovery.For(j.cfg)
j.tlsBootstrapCfg = tlsBootstrapCfg
return tlsBootstrapCfg, err
}
// InitCfg returns the InitConfiguration.
func (j *joinData) InitCfg() (*kubeadmapi.InitConfiguration, error) {
if j.initCfg != nil {
return j.initCfg, nil
}
if _, err := j.TLSBootstrapCfg(); err != nil {
return nil, err
}
klog.V(1).Infoln("[join] Fetching init configuration")
initCfg, err := fetchInitConfigurationFromJoinConfiguration(j.cfg, j.tlsBootstrapCfg)
j.initCfg = initCfg
return initCfg, err
}
// IgnorePreflightErrors returns the list of preflight errors to ignore.
func (j *joinData) IgnorePreflightErrors() sets.String {
return j.ignorePreflightErrors
}
// OutputWriter returns the io.Writer used to write messages such as the "join done" message.
func (j *joinData) OutputWriter() io.Writer {
return j.outputWriter
}
// Run executes worker node provisioning and tries to join an existing cluster.
func (j *joinData) Run() error {
fmt.Println("[preflight] Running pre-flight checks")
@ -388,9 +455,15 @@ func (j *joinData) Run() error {
return err
}
// Fetch the init configuration based on the join configuration
klog.V(1).Infoln("[preflight] Fetching init configuration")
initCfg, tlsBootstrapCfg, err := fetchInitConfigurationFromJoinConfiguration(j.cfg)
// Fetch the init configuration based on the join configuration.
// TODO: individual phases should call these:
// - phases that need initCfg should call joinData.InitCfg().
// - phases that need tlsBootstrapCfg should call joinData.TLSBootstrapCfg().
tlsBootstrapCfg, err := j.TLSBootstrapCfg()
if err != nil {
return err
}
initCfg, err := j.InitCfg()
if err != nil {
return err
}
@ -654,20 +727,12 @@ func waitForTLSBootstrappedClient() error {
}
// fetchInitConfigurationFromJoinConfiguration retrieves the init configuration from a join configuration, performing the discovery
func fetchInitConfigurationFromJoinConfiguration(cfg *kubeadmapi.JoinConfiguration) (*kubeadmapi.InitConfiguration, *clientcmdapi.Config, error) {
// Perform the Discovery, which turns a Bootstrap Token and optionally (and preferably) a CA cert hash into a KubeConfig
// file that may be used for the TLS Bootstrapping process the kubelet performs using the Certificates API.
klog.V(1).Infoln("[join] Discovering cluster-info")
tlsBootstrapCfg, err := discovery.For(cfg)
if err != nil {
return nil, nil, err
}
func fetchInitConfigurationFromJoinConfiguration(cfg *kubeadmapi.JoinConfiguration, tlsBootstrapCfg *clientcmdapi.Config) (*kubeadmapi.InitConfiguration, error) {
// Retrieves the kubeadm configuration
klog.V(1).Infoln("[join] Retrieving KubeConfig objects")
initConfiguration, err := fetchInitConfiguration(tlsBootstrapCfg)
if err != nil {
return nil, nil, err
return nil, err
}
// Create the final KubeConfig file with the cluster name discovered after fetching the cluster configuration
@ -683,7 +748,7 @@ func fetchInitConfigurationFromJoinConfiguration(cfg *kubeadmapi.JoinConfigurati
initConfiguration.LocalAPIEndpoint = cfg.ControlPlane.LocalAPIEndpoint
}
return initConfiguration, tlsBootstrapCfg, nil
return initConfiguration, nil
}
// fetchInitConfiguration reads the cluster configuration from the kubeadm-admin configMap

View File

@ -220,6 +220,7 @@ func TestNewJoinData(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// initialize an external join option and inject it to the join cmd
joinOptions := newJoinOptions()
joinOptions.args = &tc.args
cmd := NewCmdJoin(nil, joinOptions)
// sets cmd flags (that will be reflected on the join options)
@ -228,7 +229,7 @@ func TestNewJoinData(t *testing.T) {
}
// test newJoinData method
data, err := newJoinData(cmd, tc.args, joinOptions, nil)
data, err := newJoinData(cmd, joinOptions, nil)
if err != nil && !tc.expectError {
t.Fatalf("newJoinData returned unexpected error: %v", err)
}

View File

@ -295,6 +295,10 @@ func (e *Runner) SetAdditionalFlags(fn func(*pflag.FlagSet)) {
// command help, adding phase related flags and by adding phases subcommands
// Please note that this command needs to be done once all the phases are added to the Runner.
func (e *Runner) BindToCommand(cmd *cobra.Command) {
// keep track of the command triggering the runner
e.runCmd = cmd
// return early if no phases were added
if len(e.Phases) == 0 {
return
}
@ -387,9 +391,6 @@ func (e *Runner) BindToCommand(cmd *cobra.Command) {
// adds phase related flags to the main command
cmd.Flags().StringSliceVar(&e.Options.SkipPhases, "skip-phases", nil, "List of phases to be skipped")
// keep tracks of the command triggering the runner
e.runCmd = cmd
}
func inheritsFlags(sourceFlags, targetFlags *pflag.FlagSet, cmdFlags []string) {