From b4cb3fd37c8f36c653b551ea5f962c2cac1aba9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez=20L=C3=B3pez?= Date: Wed, 16 Jan 2019 20:40:18 +0100 Subject: [PATCH] kubeadm: wait for the etcd cluster to be available when growing it When the etcd cluster grows we need to explicitly wait for it to be available. This ensures that we are not implicitly doing this in following steps when they try to access the apiserver. --- cmd/kubeadm/app/phases/etcd/local.go | 21 +++++++++++++++------ cmd/kubeadm/app/util/etcd/etcd.go | 18 +++++++++++++++--- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index c1b7eb565a..bde58eb61a 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/pkg/errors" "k8s.io/klog" @@ -36,8 +37,10 @@ import ( ) const ( - etcdVolumeName = "etcd-data" - certsVolumeName = "etcd-certs" + etcdVolumeName = "etcd-data" + certsVolumeName = "etcd-certs" + etcdHealthyCheckInterval = 5 * time.Second + etcdHealthyCheckRetries = 20 ) // CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file. @@ -61,13 +64,13 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In return err } - klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) return nil } // CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error { - fmt.Println("[etcd] Checking Etcd cluster health") + fmt.Println("[etcd] Checking etcd cluster health") // creates an etcd client that connects to all the local/stacked etcd members klog.V(1).Info("creating etcd client that connects to etcd pods") @@ -120,7 +123,13 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest return err } - fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) + + fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries) + if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil { + return err + } + return nil } @@ -172,7 +181,7 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil if len(initialCluster) == 0 { defaultArguments["initial-cluster"] = fmt.Sprintf("%s=%s", cfg.GetNodeName(), etcdutil.GetPeerURL(cfg)) } else { - // NB. the joining etcd instance should be part of the initialCluster list + // NB. the joining etcd member should be part of the initialCluster list endpoints := []string{} for _, member := range initialCluster { endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL)) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index ef0ac4bd7c..f13d621ee6 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -21,6 +21,7 @@ import ( "crypto/tls" "fmt" "net" + "net/url" "path/filepath" "strconv" "strings" @@ -73,7 +74,7 @@ func New(endpoints []string, ca, cert, key string) (*Client, error) { return &client, nil } -// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in +// NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in // the kubeadm-config ConfigMap in kube-system namespace. // Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check). func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) { @@ -146,7 +147,15 @@ type Member struct { } // AddMember notifies an existing etcd cluster that a new member is joining -func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { +func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { + // Parse the peer address, required to add the client URL later to the list + // of endpoints for this client. Parsing as a first operation to make sure that + // if this fails no member addition is performed on the etcd cluster. + parsedPeerAddrs, err := url.Parse(peerAddrs) + if err != nil { + return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) + } + cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, DialTimeout: 20 * time.Second, @@ -176,6 +185,9 @@ func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { } } + // Add the new member client address to the list of endpoints + c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname())) + return ret, nil } @@ -255,7 +267,7 @@ func (c Client) WaitForClusterAvailable(retries int, retryInterval time.Duration fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) time.Sleep(retryInterval) } - fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries) + klog.V(2).Infof("attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries) resp, err := c.ClusterAvailable() if err != nil { switch err {