From 34de12087531ebe8b8010c6ac27f4dfc56769384 Mon Sep 17 00:00:00 2001 From: Chris Kim <30601846+Oats87@users.noreply.github.com> Date: Tue, 14 Sep 2021 07:19:38 -1000 Subject: [PATCH] Initial leader elected etcd member management controller (#4011) Signed-off-by: Chris Kim --- pkg/daemons/config/types.go | 9 +-- pkg/etcd/controller.go | 91 ------------------------- pkg/etcd/etcd.go | 62 ++++++++++++++---- pkg/etcd/member_controller.go | 113 ++++++++++++++++++++++++++++++++ pkg/etcd/metadata_controller.go | 68 +++++++++++++++++++ pkg/server/etcd.go | 4 +- pkg/server/server.go | 5 ++ 7 files changed, 241 insertions(+), 111 deletions(-) delete mode 100644 pkg/etcd/controller.go create mode 100644 pkg/etcd/member_controller.go create mode 100644 pkg/etcd/metadata_controller.go diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 71db71cc0e..d5055eb148 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -193,10 +193,11 @@ type ControlRuntimeBootstrap struct { type ControlRuntime struct { ControlRuntimeBootstrap - HTTPBootstrap bool - APIServerReady <-chan struct{} - ETCDReady <-chan struct{} - ClusterControllerStart func(ctx context.Context) error + HTTPBootstrap bool + APIServerReady <-chan struct{} + ETCDReady <-chan struct{} + ClusterControllerStart func(ctx context.Context) error + LeaderElectedClusterControllerStart func(ctx context.Context) error ClientKubeAPICert string ClientKubeAPIKey string diff --git a/pkg/etcd/controller.go b/pkg/etcd/controller.go deleted file mode 100644 index 81f46a69c8..0000000000 --- a/pkg/etcd/controller.go +++ /dev/null @@ -1,91 +0,0 @@ -package etcd - -import ( - "context" - "os" - "time" - - controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" - "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" -) - -const ( - NodeID = "etcd.k3s.cattle.io/node-name" - NodeAddress = "etcd.k3s.cattle.io/node-address" - master = "node-role.kubernetes.io/master" - controlPlane = "node-role.kubernetes.io/control-plane" - etcdRole = "node-role.kubernetes.io/etcd" -) - -type NodeControllerGetter func() controllerv1.NodeController - -func Register(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { - h := &handler{ - etcd: etcd, - nodeController: nodes, - ctx: ctx, - } - nodes.OnChange(ctx, "managed-etcd-controller", h.sync) - nodes.OnRemove(ctx, "managed-etcd-controller", h.onRemove) -} - -type handler struct { - etcd *ETCD - nodeController controllerv1.NodeController - ctx context.Context -} - -func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) { - if node == nil { - return nil, nil - } - - nodeName := os.Getenv("NODE_NAME") - if nodeName == "" { - logrus.Debug("waiting for node to be assigned for etcd controller") - h.nodeController.EnqueueAfter(key, 5*time.Second) - return node, nil - } - - if key == nodeName { - return h.handleSelf(node) - } - - return node, nil -} - -func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) { - if node.Annotations[NodeID] == h.etcd.name && - node.Annotations[NodeAddress] == h.etcd.address && - node.Labels[etcdRole] == "true" && - node.Labels[controlPlane] == "true" || - h.etcd.config.DisableETCD { - return node, nil - } - - node = node.DeepCopy() - if node.Annotations == nil { - node.Annotations = map[string]string{} - } - node.Annotations[NodeID] = h.etcd.name - node.Annotations[NodeAddress] = h.etcd.address - node.Labels[etcdRole] = "true" - node.Labels[master] = "true" - node.Labels[controlPlane] = "true" - - return h.nodeController.Update(node) -} - -func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) { - if _, ok := node.Labels[etcdRole]; !ok { - return node, nil - } - - id := node.Annotations[NodeID] - address, ok := node.Annotations[NodeAddress] - if !ok { - return node, nil - } - return node, h.etcd.removePeer(h.ctx, id, address, false) -} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 21a208c826..7c06fa1b4d 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -27,6 +27,7 @@ import ( "github.com/rancher/k3s/pkg/daemons/control/deps" "github.com/rancher/k3s/pkg/daemons/executor" "github.com/rancher/k3s/pkg/version" + controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" etcd "go.etcd.io/etcd/clientv3" @@ -54,6 +55,10 @@ const ( defaultKeepAliveTimeout = 10 * time.Second maxBackupRetention = 5 + + MasterLabel = "node-role.kubernetes.io/master" + ControlPlaneLabel = "node-role.kubernetes.io/control-plane" + EtcdRoleLabel = "node-role.kubernetes.io/etcd" ) var ( @@ -62,8 +67,13 @@ var ( AddressKey = version.Program + "/apiaddresses" snapshotConfigMapName = version.Program + "-etcd-snapshots" + + NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name" + NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address" ) +type NodeControllerGetter func() controllerv1.NodeController + type ETCD struct { client *etcd.Client config *config.Control @@ -367,14 +377,6 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt e.config.Datastore.Config.CertFile = e.runtime.ClientETCDCert e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey - if err := e.setName(false); err != nil { - return nil, err - } - e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { - Register(ctx, e, e.config.Runtime.Core.Core().V1().Node()) - return nil - } - tombstoneFile := filepath.Join(etcdDBDir(e.config), "tombstone") if _, err := os.Stat(tombstoneFile); err == nil { logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster") @@ -382,6 +384,20 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt return nil, err } } + + if err := e.setName(false); err != nil { + return nil, err + } + e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error { + RegisterMetadataHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + return nil + } + + e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error { + RegisterMemberHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node()) + return nil + } + return e.handler(handler), err } @@ -560,8 +576,8 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init }) } -// removePeer removes a peer from the cluster. The peer ID and IP address must both match. -func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bool) error { +// RemovePeer removes a peer from the cluster. The peer name and IP address must both match. +func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRemoval bool) error { ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout) defer cancel() members, err := e.client.MemberList(ctx) @@ -570,7 +586,7 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bo } for _, member := range members.Members { - if member.Name != id { + if member.Name != name { continue } for _, peerURL := range member.PeerURLs { @@ -579,8 +595,8 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bo return err } if u.Hostname() == address { - if e.address == address && !removeSelf { - return errors.New("node has been deleted from the cluster") + if e.address == address && !allowSelfRemoval { + return errors.New("not removing self from etcd cluster") } logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address) _, err := e.client.MemberRemove(ctx, member.ID) @@ -1337,9 +1353,27 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) { return memberUrls, nil } +// GetMembersNames will list through the member lists in etcd and return +// back a combined list of member names +func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) { + ctx, cancel := context.WithTimeout(ctx, testTimeout) + defer cancel() + + members, err := e.client.MemberList(ctx) + if err != nil { + return nil, err + } + + var memberNames []string + for _, member := range members.Members { + memberNames = append(memberNames, member.Name) + } + return memberNames, nil +} + // RemoveSelf will remove the member if it exists in the cluster func (e *ETCD) RemoveSelf(ctx context.Context) error { - if err := e.removePeer(ctx, e.name, e.address, true); err != nil { + if err := e.RemovePeer(ctx, e.name, e.address, true); err != nil { return err } diff --git a/pkg/etcd/member_controller.go b/pkg/etcd/member_controller.go new file mode 100644 index 0000000000..d2a004e497 --- /dev/null +++ b/pkg/etcd/member_controller.go @@ -0,0 +1,113 @@ +package etcd + +import ( + "context" + "fmt" + "strings" + + "github.com/rancher/k3s/pkg/version" + controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" +) + +func RegisterMemberHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { + e := &etcdMemberHandler{ + etcd: etcd, + nodeController: nodes, + ctx: ctx, + } + nodes.OnChange(ctx, "managed-etcd-member-controller", e.sync) + nodes.OnRemove(ctx, "managed-etcd-member-controller", e.onRemove) +} + +var ( + removalAnnotation = "etcd." + version.Program + ".cattle.io/remove" + removedNodeNameAnnotation = "etcd." + version.Program + ".cattle.io/removed-node-name" +) + +type etcdMemberHandler struct { + etcd *ETCD + nodeController controllerv1.NodeController + ctx context.Context +} + +func (e *etcdMemberHandler) sync(key string, node *v1.Node) (*v1.Node, error) { + if node == nil { + return nil, nil + } + + if _, ok := node.Labels[EtcdRoleLabel]; !ok { + logrus.Debugf("Node %s was not labeled etcd node, skipping sync", key) + return node, nil + } + + node = node.DeepCopy() + + if removalRequested, ok := node.Annotations[removalAnnotation]; ok { + if removed, ok := node.Annotations[removedNodeNameAnnotation]; ok { + // check to see if removed is true. if it is, nothing to do. + if currentNodeName, ok := node.Annotations[NodeNameAnnotation]; ok { + if currentNodeName != removed { + // If the current node name is not the same as the removed node name, reset the tainted annotation and removed node name + logrus.Infof("Resetting removed node flag as removed node name ( did not match current node name") + delete(node.Annotations, removedNodeNameAnnotation) + node.Annotations[removalAnnotation] = "false" + return e.nodeController.Update(node) + } + // this is the case where the current node name matches the removed node name. We have already removed the + // node, so no need to perform any action. Fallthrough to the non-op below. + } + // This is the edge case where the removed annotation exists, but there is not a current node name annotation. + // This should be a non-op, as we can't remove the node anyway. + logrus.Debugf("etcd member %s was already marked via annotations as removed", key) + return node, nil + } + if strings.ToLower(removalRequested) == "true" { + // remove the member. + name, ok := node.Annotations[NodeNameAnnotation] + if !ok { + return node, fmt.Errorf("node name annotation for node %s not found", key) + } + address, ok := node.Annotations[NodeAddressAnnotation] + if !ok { + return node, fmt.Errorf("node address annotation for node %s not found", key) + } + + logrus.Debugf("removing etcd member from cluster name: %s address: %s", name, address) + if err := e.etcd.RemovePeer(e.ctx, name, address, true); err != nil { + return node, err + } + logrus.Debugf("etcd member removal successful for name: %s address: %s", name, address) + // Set the removed node name annotation and clean up the other etcd node annotations. + // These will be set if the tombstone file is then created and the etcd member is re-added, to their new + // respective values. + node.Annotations[removedNodeNameAnnotation] = name + delete(node.Annotations, NodeNameAnnotation) + delete(node.Annotations, NodeAddressAnnotation) + return e.nodeController.Update(node) + } + // In the event that we had an unexpected removal value, simply return. + // Fallthrough to the non-op below. + } + // This is a non-op, as we don't have a tainted annotation to worry about. + return node, nil +} + +func (e *etcdMemberHandler) onRemove(key string, node *v1.Node) (*v1.Node, error) { + if _, ok := node.Labels[EtcdRoleLabel]; !ok { + logrus.Debugf("Node %s was not labeled etcd node, skipping etcd member removal", key) + return node, nil + } + + name, ok := node.Annotations[NodeNameAnnotation] + if !ok { + return node, fmt.Errorf("node name annotation for node %s not found", key) + } + address, ok := node.Annotations[NodeAddressAnnotation] + if !ok { + return node, fmt.Errorf("node address annotation for node %s not found", key) + } + + return node, e.etcd.RemovePeer(e.ctx, name, address, true) +} diff --git a/pkg/etcd/metadata_controller.go b/pkg/etcd/metadata_controller.go new file mode 100644 index 0000000000..6d56e32372 --- /dev/null +++ b/pkg/etcd/metadata_controller.go @@ -0,0 +1,68 @@ +package etcd + +import ( + "context" + "os" + "time" + + controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" +) + +func RegisterMetadataHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) { + h := &metadataHandler{ + etcd: etcd, + nodeController: nodes, + ctx: ctx, + } + nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync) +} + +type metadataHandler struct { + etcd *ETCD + nodeController controllerv1.NodeController + ctx context.Context +} + +func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) { + if node == nil { + return nil, nil + } + + nodeName := os.Getenv("NODE_NAME") + if nodeName == "" { + logrus.Debug("waiting for node to be assigned for etcd controller") + m.nodeController.EnqueueAfter(key, 5*time.Second) + return node, nil + } + + if key == nodeName { + return m.handleSelf(node) + } + + return node, nil +} + +func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { + if node.Annotations[NodeNameAnnotation] == m.etcd.name && + node.Annotations[NodeAddressAnnotation] == m.etcd.address && + node.Labels[EtcdRoleLabel] == "true" && + node.Labels[ControlPlaneLabel] == "true" || + m.etcd.config.DisableETCD { + return node, nil + } + + node = node.DeepCopy() + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + + node.Annotations[NodeNameAnnotation] = m.etcd.name + node.Annotations[NodeAddressAnnotation] = m.etcd.address + node.Labels[EtcdRoleLabel] = "true" + node.Labels[MasterLabel] = "true" + node.Labels[ControlPlaneLabel] = "true" + + return m.nodeController.Update(node) +} diff --git a/pkg/server/etcd.go b/pkg/server/etcd.go index c1c15ac7e1..9b1821a452 100644 --- a/pkg/server/etcd.go +++ b/pkg/server/etcd.go @@ -80,14 +80,14 @@ func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error { continue } etcdNodeName := string(data) - node.Annotations[etcd.NodeID] = etcdNodeName + node.Annotations[etcd.NodeNameAnnotation] = etcdNodeName address, err := etcd.GetAdvertiseAddress(controlConfig.PrivateIP) if err != nil { logrus.Infof("Waiting for etcd node address to be available: %v", err) continue } - node.Annotations[etcd.NodeAddress] = address + node.Annotations[etcd.NodeAddressAnnotation] = address _, err = nodes.Update(node) if err == nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 7cdee43586..439079ba79 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -154,6 +154,11 @@ func runControllers(ctx context.Context, wg *sync.WaitGroup, config *Config) err if err := coreControllers(ctx, sc, config); err != nil { panic(err) } + if controlConfig.Runtime.LeaderElectedClusterControllerStart != nil { + if err := controlConfig.Runtime.LeaderElectedClusterControllerStart(ctx); err != nil { + panic(errors.Wrap(err, "failed to start leader elected cluster controllers")) + } + } for _, controller := range config.LeaderControllers { if err := controller(ctx, sc); err != nil { panic(errors.Wrap(err, "leader controller"))