diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index c1b7eb565a..bde58eb61a 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/pkg/errors" "k8s.io/klog" @@ -36,8 +37,10 @@ import ( ) const ( - etcdVolumeName = "etcd-data" - certsVolumeName = "etcd-certs" + etcdVolumeName = "etcd-data" + certsVolumeName = "etcd-certs" + etcdHealthyCheckInterval = 5 * time.Second + etcdHealthyCheckRetries = 20 ) // CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file. @@ -61,13 +64,13 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In return err } - klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) return nil } // CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error { - fmt.Println("[etcd] Checking Etcd cluster health") + fmt.Println("[etcd] Checking etcd cluster health") // creates an etcd client that connects to all the local/stacked etcd members klog.V(1).Info("creating etcd client that connects to etcd pods") @@ -120,7 +123,13 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest return err } - fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + + fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries) + if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil { + return err + } + return nil } @@ -172,7 +181,7 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil if len(initialCluster) == 0 { defaultArguments["initial-cluster"] = fmt.Sprintf("%s=%s", cfg.GetNodeName(), etcdutil.GetPeerURL(cfg)) } else { - // NB. the joining etcd instance should be part of the initialCluster list + // NB. the joining etcd member should be part of the initialCluster list endpoints := []string{} for _, member := range initialCluster { endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL)) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index ef0ac4bd7c..f13d621ee6 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "fmt" "net" + "net/url" "path/filepath" "strconv" "strings" @@ -73,7 +74,7 @@ func New(endpoints []string, ca, cert, key string) (*Client, error) { return &client, nil } -// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in +// NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in // the kubeadm-config ConfigMap in kube-system namespace. // Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check). func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) { @@ -146,7 +147,15 @@ type Member struct { } // AddMember notifies an existing etcd cluster that a new member is joining -func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { +func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { + // Parse the peer address, required to add the client URL later to the list + // of endpoints for this client. Parsing as a first operation to make sure that + // if this fails no member addition is performed on the etcd cluster. + parsedPeerAddrs, err := url.Parse(peerAddrs) + if err != nil { + return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) + } + cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, DialTimeout: 20 * time.Second, @@ -176,6 +185,9 @@ func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { } } + // Add the new member client address to the list of endpoints + c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname())) + return ret, nil } @@ -255,7 +267,7 @@ func (c Client) WaitForClusterAvailable(retries int, retryInterval time.Duration fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) time.Sleep(retryInterval) } - fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries) + klog.V(2).Infof("attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries) resp, err := c.ClusterAvailable() if err != nil { switch err {