kubeadm: wait for the etcd cluster to be available when growing it

When the etcd cluster grows we need to explicitly wait for it to be
available. This ensures that we are not implicitly doing this in
following steps when they try to access the apiserver.
pull/564/head
Rafael Fernández López 2019-01-16 20:40:18 +01:00
parent 3ed638b233
commit b4cb3fd37c
No known key found for this signature in database
GPG Key ID: 8902294E78418CF9
2 changed files with 30 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"k8s.io/klog"
@ -36,8 +37,10 @@ import (
)
const (
etcdVolumeName = "etcd-data"
certsVolumeName = "etcd-certs"
etcdVolumeName = "etcd-data"
certsVolumeName = "etcd-certs"
etcdHealthyCheckInterval = 5 * time.Second
etcdHealthyCheckRetries = 20
)
// CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
@ -61,13 +64,13 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In
return err
}
klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
return nil
}
// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
fmt.Println("[etcd] Checking Etcd cluster health")
fmt.Println("[etcd] Checking etcd cluster health")
// creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("creating etcd client that connects to etcd pods")
@ -120,7 +123,13 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
return err
}
fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
return err
}
return nil
}
@ -172,7 +181,7 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil
if len(initialCluster) == 0 {
defaultArguments["initial-cluster"] = fmt.Sprintf("%s=%s", cfg.GetNodeName(), etcdutil.GetPeerURL(cfg))
} else {
// NB. the joining etcd instance should be part of the initialCluster list
// NB. the joining etcd member should be part of the initialCluster list
endpoints := []string{}
for _, member := range initialCluster {
endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL))

View File

@ -21,6 +21,7 @@ import (
"crypto/tls"
"fmt"
"net"
"net/url"
"path/filepath"
"strconv"
"strings"
@ -73,7 +74,7 @@ func New(endpoints []string, ca, cert, key string) (*Client, error) {
return &client, nil
}
// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in
// NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in
// the kubeadm-config ConfigMap in kube-system namespace.
// Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check).
func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
@ -146,7 +147,15 @@ type Member struct {
}
// AddMember notifies an existing etcd cluster that a new member is joining
func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) {
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list
// of endpoints for this client. Parsing as a first operation to make sure that
// if this fails no member addition is performed on the etcd cluster.
parsedPeerAddrs, err := url.Parse(peerAddrs)
if err != nil {
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second,
@ -176,6 +185,9 @@ func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) {
}
}
// Add the new member client address to the list of endpoints
c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
return ret, nil
}
@ -255,7 +267,7 @@ func (c Client) WaitForClusterAvailable(retries int, retryInterval time.Duration
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
time.Sleep(retryInterval)
}
fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries)
klog.V(2).Infof("attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
resp, err := c.ClusterAvailable()
if err != nil {
switch err {