mirror of https://github.com/k3s-io/k3s
Merge pull request #74112 from pytimer/kubeadm-reset
kubeadm: Remove etcd members from the etcd cluster when reset the nodespull/564/head
commit
9e53b85d28
|
@ -40,6 +40,7 @@ go_library(
|
|||
"//cmd/kubeadm/app/images:go_default_library",
|
||||
"//cmd/kubeadm/app/phases/bootstraptoken/node:go_default_library",
|
||||
"//cmd/kubeadm/app/phases/certs:go_default_library",
|
||||
"//cmd/kubeadm/app/phases/etcd:go_default_library",
|
||||
"//cmd/kubeadm/app/phases/kubeconfig:go_default_library",
|
||||
"//cmd/kubeadm/app/phases/uploadconfig:go_default_library",
|
||||
"//cmd/kubeadm/app/preflight:go_default_library",
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
|
||||
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
|
||||
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
|
||||
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
|
||||
"k8s.io/kubernetes/cmd/kubeadm/app/preflight"
|
||||
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
|
||||
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
|
||||
|
@ -139,6 +140,9 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error {
|
|||
etcdDataDir, err := getEtcdDataDir(etcdManifestPath, client)
|
||||
if err == nil {
|
||||
dirsToClean = append(dirsToClean, etcdDataDir)
|
||||
if err := removeEtcdMember(client); err != nil {
|
||||
klog.Warningf("[reset] failed to remove etcd member: %v\n.Please manually remove this etcd member using etcdctl", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Println("[reset] no etcd config found. Assuming external etcd")
|
||||
fmt.Println("[reset] please manually reset etcd to prevent further issues")
|
||||
|
@ -205,6 +209,14 @@ func (r *Reset) Run(out io.Writer, client clientset.Interface) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func removeEtcdMember(client clientset.Interface) error {
|
||||
cfg, err := configutil.FetchInitConfigurationFromCluster(client, os.Stdout, "reset", false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return etcdphase.RemoveStackedEtcdMemberFromCluster(client, cfg)
|
||||
}
|
||||
|
||||
func getEtcdDataDir(manifestPath string, client clientset.Interface) (string, error) {
|
||||
const etcdVolumeName = "etcd-data"
|
||||
var dataDir string
|
||||
|
|
|
@ -81,6 +81,35 @@ func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.Clu
|
|||
return nil
|
||||
}
|
||||
|
||||
// RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster,
|
||||
// when reset the control plane node.
|
||||
func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
|
||||
// creates an etcd client that connects to all the local/stacked etcd members
|
||||
klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
|
||||
etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// notifies the other members of the etcd cluster about the removing member
|
||||
etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)
|
||||
|
||||
klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
|
||||
id, err := etcdClient.GetMemberID(etcdPeerAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
|
||||
members, err := etcdClient.RemoveMember(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
|
||||
// for an additional etcd member that is joining an existing local/stacked etcd cluster.
|
||||
// Other members of the etcd cluster will be notified of the joining node in beforehand as well.
|
||||
|
|
|
@ -113,6 +113,14 @@ func (f fakeEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.Mem
|
|||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
func (f fakeEtcdClient) GetMemberID(peerURL string) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (f fakeEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
|
||||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
func TestGetAvailableUpgrades(t *testing.T) {
|
||||
etcdClient := fakeEtcdClient{}
|
||||
tests := []struct {
|
||||
|
|
|
@ -256,6 +256,14 @@ func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.
|
|||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
func (c fakeTLSEtcdClient) GetMemberID(peerURL string) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (c fakeTLSEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
|
||||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
|
||||
|
||||
func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil }
|
||||
|
@ -297,6 +305,14 @@ func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]e
|
|||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
func (c fakePodManifestEtcdClient) GetMemberID(peerURL string) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (c fakePodManifestEtcdClient) RemoveMember(id uint64) ([]etcdutil.Member, error) {
|
||||
return []etcdutil.Member{}, nil
|
||||
}
|
||||
|
||||
func TestStaticPodControlPlane(t *testing.T) {
|
||||
tests := []struct {
|
||||
description string
|
||||
|
|
|
@ -46,6 +46,8 @@ type ClusterInterrogator interface {
|
|||
WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
|
||||
Sync() error
|
||||
AddMember(name string, peerAddrs string) ([]Member, error)
|
||||
GetMemberID(peerURL string) (uint64, error)
|
||||
RemoveMember(id uint64) ([]Member, error)
|
||||
}
|
||||
|
||||
// Client provides connection parameters for an etcd cluster
|
||||
|
@ -146,6 +148,62 @@ type Member struct {
|
|||
PeerURL string
|
||||
}
|
||||
|
||||
// GetMemberID returns the member ID of the given peer URL
|
||||
func (c Client) GetMemberID(peerURL string) (uint64, error) {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: c.Endpoints,
|
||||
DialTimeout: 30 * time.Second,
|
||||
TLS: c.TLS,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
resp, err := cli.MemberList(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, member := range resp.Members {
|
||||
if member.GetPeerURLs()[0] == peerURL {
|
||||
return member.GetID(), nil
|
||||
}
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// RemoveMember notifies an etcd cluster to remove an existing member
|
||||
func (c Client) RemoveMember(id uint64) ([]Member, error) {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: c.Endpoints,
|
||||
DialTimeout: 30 * time.Second,
|
||||
TLS: c.TLS,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
// Remove an existing member from the cluster
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
resp, err := cli.MemberRemove(ctx, id)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Returns the updated list of etcd members
|
||||
ret := []Member{}
|
||||
for _, m := range resp.Members {
|
||||
ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// AddMember notifies an existing etcd cluster that a new member is joining
|
||||
func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
|
||||
// Parse the peer address, required to add the client URL later to the list
|
||||
|
|
Loading…
Reference in New Issue