From 83f5296a147183ba9471ddfeb73c76cab17471bc Mon Sep 17 00:00:00 2001 From: pytimer Date: Tue, 11 Dec 2018 11:29:42 +0800 Subject: [PATCH] kubeadm: Remove etcd member from the etcd cluster when reset the control plane node --- cmd/kubeadm/app/cmd/BUILD | 1 + cmd/kubeadm/app/cmd/reset.go | 12 ++++ cmd/kubeadm/app/phases/etcd/local.go | 29 ++++++++++ .../app/phases/upgrade/compute_test.go | 8 +++ .../app/phases/upgrade/staticpods_test.go | 16 +++++ cmd/kubeadm/app/util/etcd/etcd.go | 58 +++++++++++++++++++ 6 files changed, 124 insertions(+) diff --git a/cmd/kubeadm/app/cmd/BUILD b/cmd/kubeadm/app/cmd/BUILD index 772da2d909..e22ef05bbd 100644 --- a/cmd/kubeadm/app/cmd/BUILD +++ b/cmd/kubeadm/app/cmd/BUILD @@ -40,6 +40,7 @@ go_library( "//cmd/kubeadm/app/images:go_default_library", "//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library", "//cmd/kubeadm/app/phases/certs:go_default_library", + "//cmd/kubeadm/app/phases/etcd:go_default_library", "//cmd/kubeadm/app/phases/kubeconfig:go_default_library", "//cmd/kubeadm/app/phases/uploadconfig:go_default_library", "//cmd/kubeadm/app/preflight:go_default_library", diff --git a/cmd/kubeadm/app/cmd/reset.go b/cmd/kubeadm/app/cmd/reset.go index eeb8f8c5e3..b8e2b919eb 100644 --- a/cmd/kubeadm/app/cmd/reset.go +++ b/cmd/kubeadm/app/cmd/reset.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" + etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd" "k8s.io/kubernetes/cmd/kubeadm/app/preflight" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config" @@ -139,6 +140,9 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error { etcdDataDir, err := getEtcdDataDir(etcdManifestPath, client) if err == nil { dirsToClean = append(dirsToClean, etcdDataDir) + if err := removeEtcdMember(client); err != nil { + klog.Warningf("[reset] failed to remove etcd member: %v\n.Please manually remove this etcd member using etcdctl", err) + } } else { fmt.Println("[reset] no etcd config found. Assuming external etcd") fmt.Println("[reset] please manually reset etcd to prevent further issues") @@ -205,6 +209,14 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error { return nil } +func removeEtcdMember(client clientset.Interface) error { + cfg, err := configutil.FetchInitConfigurationFromCluster(client, os.Stdout, "reset", false) + if err != nil { + return err + } + return etcdphase.RemoveStackedEtcdMemberFromCluster(client, cfg) +} + func getEtcdDataDir(manifestPath string, client clientset.Interface) (string, error) { const etcdVolumeName = "etcd-data" var dataDir string diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index 5d5f5e4a6d..d522a16425 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -81,6 +81,35 @@ func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.Clu return nil } +// RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster, +// when reset the control plane node. +func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error { + // creates an etcd client that connects to all the local/stacked etcd members + klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods") + etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir) + if err != nil { + return err + } + + // notifies the other members of the etcd cluster about the removing member + etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint) + + klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress) + id, err := etcdClient.GetMemberID(etcdPeerAddress) + if err != nil { + return err + } + + klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id) + members, err := etcdClient.RemoveMember(id) + if err != nil { + return err + } + klog.V(1).Infof("[etcd] Updated etcd member list: %v", members) + + return nil +} + // CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file // for an additional etcd member that is joining an existing local/stacked etcd cluster. // Other members of the etcd cluster will be notified of the joining node in beforehand as well. diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index 24c1f5b32b..e9480f739c 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -113,6 +113,14 @@ func (f fakeEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Mem return []etcdutil.Member{}, nil } +func (f fakeEtcdClient) GetMemberID(peerURL string) (uint64, error) { + return 0, nil +} + +func (f fakeEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + func TestGetAvailableUpgrades(t *testing.T) { etcdClient := fakeEtcdClient{} tests := []struct { diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index b98b2b9a65..1a3b1ba87a 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -256,6 +256,14 @@ func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil. return []etcdutil.Member{}, nil } +func (c fakeTLSEtcdClient) GetMemberID(peerURL string) (uint64, error) { + return 0, nil +} + +func (c fakeTLSEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil } @@ -297,6 +305,14 @@ func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]e return []etcdutil.Member{}, nil } +func (c fakePodManifestEtcdClient) GetMemberID(peerURL string) (uint64, error) { + return 0, nil +} + +func (c fakePodManifestEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) { + return []etcdutil.Member{}, nil +} + func TestStaticPodControlPlane(t *testing.T) { tests := []struct { description string diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 3851a5ff22..e7772f6dda 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -46,6 +46,8 @@ type ClusterInterrogator interface { WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) Sync() error AddMember(name string, peerAddrs string) ([]Member, error) + GetMemberID(peerURL string) (uint64, error) + RemoveMember(id uint64) ([]Member, error) } // Client provides connection parameters for an etcd cluster @@ -146,6 +148,62 @@ type Member struct { PeerURL string } +// GetMemberID returns the member ID of the given peer URL +func (c Client) GetMemberID(peerURL string) (uint64, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: 30 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return 0, err + } + defer cli.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + resp, err := cli.MemberList(ctx) + cancel() + if err != nil { + return 0, err + } + + for _, member := range resp.Members { + if member.GetPeerURLs()[0] == peerURL { + return member.GetID(), nil + } + } + return 0, nil +} + +// RemoveMember notifies an etcd cluster to remove an existing member +func (c Client) RemoveMember(id uint64) ([]Member, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: 30 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return nil, err + } + defer cli.Close() + + // Remove an existing member from the cluster + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + resp, err := cli.MemberRemove(ctx, id) + cancel() + if err != nil { + return nil, err + } + + // Returns the updated list of etcd members + ret := []Member{} + for _, m := range resp.Members { + ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]}) + } + + return ret, nil +} + // AddMember notifies an existing etcd cluster that a new member is joining func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { // Parse the peer address, required to add the client URL later to the list