kubeadm: Remove etcd member from the etcd cluster when reset the control plane node

pull/564/head
pytimer 2018-12-11 11:29:42 +08:00
parent 50328e0ba7
commit 83f5296a14
6 changed files with 124 additions and 0 deletions

View File

@ -40,6 +40,7 @@ go_library(
"//cmd/kubeadm/app/images:go_default_library", "//cmd/kubeadm/app/images:go_default_library",
"//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library", "//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library",
"//cmd/kubeadm/app/phases/certs:go_default_library", "//cmd/kubeadm/app/phases/certs:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/phases/kubeconfig:go_default_library", "//cmd/kubeadm/app/phases/kubeconfig:go_default_library",
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library", "//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library", "//cmd/kubeadm/app/preflight:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/preflight" "k8s.io/kubernetes/cmd/kubeadm/app/preflight"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util" kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config" configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
@ -139,6 +140,9 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error {
etcdDataDir, err := getEtcdDataDir(etcdManifestPath, client) etcdDataDir, err := getEtcdDataDir(etcdManifestPath, client)
if err == nil { if err == nil {
dirsToClean = append(dirsToClean, etcdDataDir) dirsToClean = append(dirsToClean, etcdDataDir)
if err := removeEtcdMember(client); err != nil {
klog.Warningf("[reset] failed to remove etcd member: %v\n.Please manually remove this etcd member using etcdctl", err)
}
} else { } else {
fmt.Println("[reset] no etcd config found. Assuming external etcd") fmt.Println("[reset] no etcd config found. Assuming external etcd")
fmt.Println("[reset] please manually reset etcd to prevent further issues") fmt.Println("[reset] please manually reset etcd to prevent further issues")
@ -205,6 +209,14 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error {
return nil return nil
} }
func removeEtcdMember(client clientset.Interface) error {
cfg, err := configutil.FetchInitConfigurationFromCluster(client, os.Stdout, "reset", false)
if err != nil {
return err
}
return etcdphase.RemoveStackedEtcdMemberFromCluster(client, cfg)
}
func getEtcdDataDir(manifestPath string, client clientset.Interface) (string, error) { func getEtcdDataDir(manifestPath string, client clientset.Interface) (string, error) {
const etcdVolumeName = "etcd-data" const etcdVolumeName = "etcd-data"
var dataDir string var dataDir string

View File

@ -81,6 +81,35 @@ func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.Clu
return nil return nil
} }
// RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster,
// when reset the control plane node.
func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
// creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
if err != nil {
return err
}
// notifies the other members of the etcd cluster about the removing member
etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)
klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
id, err := etcdClient.GetMemberID(etcdPeerAddress)
if err != nil {
return err
}
klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
members, err := etcdClient.RemoveMember(id)
if err != nil {
return err
}
klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)
return nil
}
// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file // CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
// for an additional etcd member that is joining an existing local/stacked etcd cluster. // for an additional etcd member that is joining an existing local/stacked etcd cluster.
// Other members of the etcd cluster will be notified of the joining node in beforehand as well. // Other members of the etcd cluster will be notified of the joining node in beforehand as well.

View File

@ -113,6 +113,14 @@ func (f fakeEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Mem
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (f fakeEtcdClient) GetMemberID(peerURL string) (uint64, error) {
return 0, nil
}
func (f fakeEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
func TestGetAvailableUpgrades(t *testing.T) { func TestGetAvailableUpgrades(t *testing.T) {
etcdClient := fakeEtcdClient{} etcdClient := fakeEtcdClient{}
tests := []struct { tests := []struct {

View File

@ -256,6 +256,14 @@ func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakeTLSEtcdClient) GetMemberID(peerURL string) (uint64, error) {
return 0, nil
}
func (c fakeTLSEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil } func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil }
@ -297,6 +305,14 @@ func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]e
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakePodManifestEtcdClient) GetMemberID(peerURL string) (uint64, error) {
return 0, nil
}
func (c fakePodManifestEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil
}
func TestStaticPodControlPlane(t *testing.T) { func TestStaticPodControlPlane(t *testing.T) {
tests := []struct { tests := []struct {
description string description string

View File

@ -46,6 +46,8 @@ type ClusterInterrogator interface {
WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
Sync() error Sync() error
AddMember(name string, peerAddrs string) ([]Member, error) AddMember(name string, peerAddrs string) ([]Member, error)
GetMemberID(peerURL string) (uint64, error)
RemoveMember(id uint64) ([]Member, error)
} }
// Client provides connection parameters for an etcd cluster // Client provides connection parameters for an etcd cluster
@ -146,6 +148,62 @@ type Member struct {
PeerURL string PeerURL string
} }
// GetMemberID returns the member ID of the given peer URL
func (c Client) GetMemberID(peerURL string) (uint64, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
})
if err != nil {
return 0, err
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := cli.MemberList(ctx)
cancel()
if err != nil {
return 0, err
}
for _, member := range resp.Members {
if member.GetPeerURLs()[0] == peerURL {
return member.GetID(), nil
}
}
return 0, nil
}
// RemoveMember notifies an etcd cluster to remove an existing member
func (c Client) RemoveMember(id uint64) ([]Member, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 30 * time.Second,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
// Remove an existing member from the cluster
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := cli.MemberRemove(ctx, id)
cancel()
if err != nil {
return nil, err
}
// Returns the updated list of etcd members
ret := []Member{}
for _, m := range resp.Members {
ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
}
return ret, nil
}
// AddMember notifies an existing etcd cluster that a new member is joining // AddMember notifies an existing etcd cluster that a new member is joining
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list // Parse the peer address, required to add the client URL later to the list