mirror of https://github.com/k3s-io/k3s
Initial leader elected etcd member management controller (#4011)
Signed-off-by: Chris Kim <oats87g@gmail.com>pull/4016/head
parent
f2c7882750
commit
34de120875
|
@ -193,10 +193,11 @@ type ControlRuntimeBootstrap struct {
|
|||
type ControlRuntime struct {
|
||||
ControlRuntimeBootstrap
|
||||
|
||||
HTTPBootstrap bool
|
||||
APIServerReady <-chan struct{}
|
||||
ETCDReady <-chan struct{}
|
||||
ClusterControllerStart func(ctx context.Context) error
|
||||
HTTPBootstrap bool
|
||||
APIServerReady <-chan struct{}
|
||||
ETCDReady <-chan struct{}
|
||||
ClusterControllerStart func(ctx context.Context) error
|
||||
LeaderElectedClusterControllerStart func(ctx context.Context) error
|
||||
|
||||
ClientKubeAPICert string
|
||||
ClientKubeAPIKey string
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
NodeID = "etcd.k3s.cattle.io/node-name"
|
||||
NodeAddress = "etcd.k3s.cattle.io/node-address"
|
||||
master = "node-role.kubernetes.io/master"
|
||||
controlPlane = "node-role.kubernetes.io/control-plane"
|
||||
etcdRole = "node-role.kubernetes.io/etcd"
|
||||
)
|
||||
|
||||
type NodeControllerGetter func() controllerv1.NodeController
|
||||
|
||||
func Register(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) {
|
||||
h := &handler{
|
||||
etcd: etcd,
|
||||
nodeController: nodes,
|
||||
ctx: ctx,
|
||||
}
|
||||
nodes.OnChange(ctx, "managed-etcd-controller", h.sync)
|
||||
nodes.OnRemove(ctx, "managed-etcd-controller", h.onRemove)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
etcd *ETCD
|
||||
nodeController controllerv1.NodeController
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
|
||||
if node == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
if nodeName == "" {
|
||||
logrus.Debug("waiting for node to be assigned for etcd controller")
|
||||
h.nodeController.EnqueueAfter(key, 5*time.Second)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
if key == nodeName {
|
||||
return h.handleSelf(node)
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
|
||||
if node.Annotations[NodeID] == h.etcd.name &&
|
||||
node.Annotations[NodeAddress] == h.etcd.address &&
|
||||
node.Labels[etcdRole] == "true" &&
|
||||
node.Labels[controlPlane] == "true" ||
|
||||
h.etcd.config.DisableETCD {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
node = node.DeepCopy()
|
||||
if node.Annotations == nil {
|
||||
node.Annotations = map[string]string{}
|
||||
}
|
||||
node.Annotations[NodeID] = h.etcd.name
|
||||
node.Annotations[NodeAddress] = h.etcd.address
|
||||
node.Labels[etcdRole] = "true"
|
||||
node.Labels[master] = "true"
|
||||
node.Labels[controlPlane] = "true"
|
||||
|
||||
return h.nodeController.Update(node)
|
||||
}
|
||||
|
||||
func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
|
||||
if _, ok := node.Labels[etcdRole]; !ok {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
id := node.Annotations[NodeID]
|
||||
address, ok := node.Annotations[NodeAddress]
|
||||
if !ok {
|
||||
return node, nil
|
||||
}
|
||||
return node, h.etcd.removePeer(h.ctx, id, address, false)
|
||||
}
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/rancher/k3s/pkg/daemons/control/deps"
|
||||
"github.com/rancher/k3s/pkg/daemons/executor"
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
etcd "go.etcd.io/etcd/clientv3"
|
||||
|
@ -54,6 +55,10 @@ const (
|
|||
defaultKeepAliveTimeout = 10 * time.Second
|
||||
|
||||
maxBackupRetention = 5
|
||||
|
||||
MasterLabel = "node-role.kubernetes.io/master"
|
||||
ControlPlaneLabel = "node-role.kubernetes.io/control-plane"
|
||||
EtcdRoleLabel = "node-role.kubernetes.io/etcd"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -62,8 +67,13 @@ var (
|
|||
AddressKey = version.Program + "/apiaddresses"
|
||||
|
||||
snapshotConfigMapName = version.Program + "-etcd-snapshots"
|
||||
|
||||
NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name"
|
||||
NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address"
|
||||
)
|
||||
|
||||
type NodeControllerGetter func() controllerv1.NodeController
|
||||
|
||||
type ETCD struct {
|
||||
client *etcd.Client
|
||||
config *config.Control
|
||||
|
@ -367,14 +377,6 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
|
|||
e.config.Datastore.Config.CertFile = e.runtime.ClientETCDCert
|
||||
e.config.Datastore.Config.KeyFile = e.runtime.ClientETCDKey
|
||||
|
||||
if err := e.setName(false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
|
||||
Register(ctx, e, e.config.Runtime.Core.Core().V1().Node())
|
||||
return nil
|
||||
}
|
||||
|
||||
tombstoneFile := filepath.Join(etcdDBDir(e.config), "tombstone")
|
||||
if _, err := os.Stat(tombstoneFile); err == nil {
|
||||
logrus.Infof("tombstone file has been detected, removing data dir to rejoin the cluster")
|
||||
|
@ -382,6 +384,20 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.setName(false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
|
||||
RegisterMetadataHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node())
|
||||
return nil
|
||||
}
|
||||
|
||||
e.config.Runtime.LeaderElectedClusterControllerStart = func(ctx context.Context) error {
|
||||
RegisterMemberHandlers(ctx, e, e.config.Runtime.Core.Core().V1().Node())
|
||||
return nil
|
||||
}
|
||||
|
||||
return e.handler(handler), err
|
||||
}
|
||||
|
||||
|
@ -560,8 +576,8 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
|
|||
})
|
||||
}
|
||||
|
||||
// removePeer removes a peer from the cluster. The peer ID and IP address must both match.
|
||||
func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bool) error {
|
||||
// RemovePeer removes a peer from the cluster. The peer name and IP address must both match.
|
||||
func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRemoval bool) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout)
|
||||
defer cancel()
|
||||
members, err := e.client.MemberList(ctx)
|
||||
|
@ -570,7 +586,7 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bo
|
|||
}
|
||||
|
||||
for _, member := range members.Members {
|
||||
if member.Name != id {
|
||||
if member.Name != name {
|
||||
continue
|
||||
}
|
||||
for _, peerURL := range member.PeerURLs {
|
||||
|
@ -579,8 +595,8 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string, removeSelf bo
|
|||
return err
|
||||
}
|
||||
if u.Hostname() == address {
|
||||
if e.address == address && !removeSelf {
|
||||
return errors.New("node has been deleted from the cluster")
|
||||
if e.address == address && !allowSelfRemoval {
|
||||
return errors.New("not removing self from etcd cluster")
|
||||
}
|
||||
logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address)
|
||||
_, err := e.client.MemberRemove(ctx, member.ID)
|
||||
|
@ -1337,9 +1353,27 @@ func (e *ETCD) GetMembersClientURLs(ctx context.Context) ([]string, error) {
|
|||
return memberUrls, nil
|
||||
}
|
||||
|
||||
// GetMembersNames will list through the member lists in etcd and return
|
||||
// back a combined list of member names
|
||||
func (e *ETCD) GetMembersNames(ctx context.Context) ([]string, error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, testTimeout)
|
||||
defer cancel()
|
||||
|
||||
members, err := e.client.MemberList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var memberNames []string
|
||||
for _, member := range members.Members {
|
||||
memberNames = append(memberNames, member.Name)
|
||||
}
|
||||
return memberNames, nil
|
||||
}
|
||||
|
||||
// RemoveSelf will remove the member if it exists in the cluster
|
||||
func (e *ETCD) RemoveSelf(ctx context.Context) error {
|
||||
if err := e.removePeer(ctx, e.name, e.address, true); err != nil {
|
||||
if err := e.RemovePeer(ctx, e.name, e.address, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func RegisterMemberHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) {
|
||||
e := &etcdMemberHandler{
|
||||
etcd: etcd,
|
||||
nodeController: nodes,
|
||||
ctx: ctx,
|
||||
}
|
||||
nodes.OnChange(ctx, "managed-etcd-member-controller", e.sync)
|
||||
nodes.OnRemove(ctx, "managed-etcd-member-controller", e.onRemove)
|
||||
}
|
||||
|
||||
var (
|
||||
removalAnnotation = "etcd." + version.Program + ".cattle.io/remove"
|
||||
removedNodeNameAnnotation = "etcd." + version.Program + ".cattle.io/removed-node-name"
|
||||
)
|
||||
|
||||
type etcdMemberHandler struct {
|
||||
etcd *ETCD
|
||||
nodeController controllerv1.NodeController
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (e *etcdMemberHandler) sync(key string, node *v1.Node) (*v1.Node, error) {
|
||||
if node == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if _, ok := node.Labels[EtcdRoleLabel]; !ok {
|
||||
logrus.Debugf("Node %s was not labeled etcd node, skipping sync", key)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
node = node.DeepCopy()
|
||||
|
||||
if removalRequested, ok := node.Annotations[removalAnnotation]; ok {
|
||||
if removed, ok := node.Annotations[removedNodeNameAnnotation]; ok {
|
||||
// check to see if removed is true. if it is, nothing to do.
|
||||
if currentNodeName, ok := node.Annotations[NodeNameAnnotation]; ok {
|
||||
if currentNodeName != removed {
|
||||
// If the current node name is not the same as the removed node name, reset the tainted annotation and removed node name
|
||||
logrus.Infof("Resetting removed node flag as removed node name ( did not match current node name")
|
||||
delete(node.Annotations, removedNodeNameAnnotation)
|
||||
node.Annotations[removalAnnotation] = "false"
|
||||
return e.nodeController.Update(node)
|
||||
}
|
||||
// this is the case where the current node name matches the removed node name. We have already removed the
|
||||
// node, so no need to perform any action. Fallthrough to the non-op below.
|
||||
}
|
||||
// This is the edge case where the removed annotation exists, but there is not a current node name annotation.
|
||||
// This should be a non-op, as we can't remove the node anyway.
|
||||
logrus.Debugf("etcd member %s was already marked via annotations as removed", key)
|
||||
return node, nil
|
||||
}
|
||||
if strings.ToLower(removalRequested) == "true" {
|
||||
// remove the member.
|
||||
name, ok := node.Annotations[NodeNameAnnotation]
|
||||
if !ok {
|
||||
return node, fmt.Errorf("node name annotation for node %s not found", key)
|
||||
}
|
||||
address, ok := node.Annotations[NodeAddressAnnotation]
|
||||
if !ok {
|
||||
return node, fmt.Errorf("node address annotation for node %s not found", key)
|
||||
}
|
||||
|
||||
logrus.Debugf("removing etcd member from cluster name: %s address: %s", name, address)
|
||||
if err := e.etcd.RemovePeer(e.ctx, name, address, true); err != nil {
|
||||
return node, err
|
||||
}
|
||||
logrus.Debugf("etcd member removal successful for name: %s address: %s", name, address)
|
||||
// Set the removed node name annotation and clean up the other etcd node annotations.
|
||||
// These will be set if the tombstone file is then created and the etcd member is re-added, to their new
|
||||
// respective values.
|
||||
node.Annotations[removedNodeNameAnnotation] = name
|
||||
delete(node.Annotations, NodeNameAnnotation)
|
||||
delete(node.Annotations, NodeAddressAnnotation)
|
||||
return e.nodeController.Update(node)
|
||||
}
|
||||
// In the event that we had an unexpected removal value, simply return.
|
||||
// Fallthrough to the non-op below.
|
||||
}
|
||||
// This is a non-op, as we don't have a tainted annotation to worry about.
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (e *etcdMemberHandler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
|
||||
if _, ok := node.Labels[EtcdRoleLabel]; !ok {
|
||||
logrus.Debugf("Node %s was not labeled etcd node, skipping etcd member removal", key)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
name, ok := node.Annotations[NodeNameAnnotation]
|
||||
if !ok {
|
||||
return node, fmt.Errorf("node name annotation for node %s not found", key)
|
||||
}
|
||||
address, ok := node.Annotations[NodeAddressAnnotation]
|
||||
if !ok {
|
||||
return node, fmt.Errorf("node address annotation for node %s not found", key)
|
||||
}
|
||||
|
||||
return node, e.etcd.RemovePeer(e.ctx, name, address, true)
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
||||
"github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func RegisterMetadataHandlers(ctx context.Context, etcd *ETCD, nodes controllerv1.NodeController) {
|
||||
h := &metadataHandler{
|
||||
etcd: etcd,
|
||||
nodeController: nodes,
|
||||
ctx: ctx,
|
||||
}
|
||||
nodes.OnChange(ctx, "managed-etcd-metadata-controller", h.sync)
|
||||
}
|
||||
|
||||
type metadataHandler struct {
|
||||
etcd *ETCD
|
||||
nodeController controllerv1.NodeController
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (m *metadataHandler) sync(key string, node *v1.Node) (*v1.Node, error) {
|
||||
if node == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
if nodeName == "" {
|
||||
logrus.Debug("waiting for node to be assigned for etcd controller")
|
||||
m.nodeController.EnqueueAfter(key, 5*time.Second)
|
||||
return node, nil
|
||||
}
|
||||
|
||||
if key == nodeName {
|
||||
return m.handleSelf(node)
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) {
|
||||
if node.Annotations[NodeNameAnnotation] == m.etcd.name &&
|
||||
node.Annotations[NodeAddressAnnotation] == m.etcd.address &&
|
||||
node.Labels[EtcdRoleLabel] == "true" &&
|
||||
node.Labels[ControlPlaneLabel] == "true" ||
|
||||
m.etcd.config.DisableETCD {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
node = node.DeepCopy()
|
||||
if node.Annotations == nil {
|
||||
node.Annotations = map[string]string{}
|
||||
}
|
||||
|
||||
node.Annotations[NodeNameAnnotation] = m.etcd.name
|
||||
node.Annotations[NodeAddressAnnotation] = m.etcd.address
|
||||
node.Labels[EtcdRoleLabel] = "true"
|
||||
node.Labels[MasterLabel] = "true"
|
||||
node.Labels[ControlPlaneLabel] = "true"
|
||||
|
||||
return m.nodeController.Update(node)
|
||||
}
|
|
@ -80,14 +80,14 @@ func setETCDLabelsAndAnnotations(ctx context.Context, config *Config) error {
|
|||
continue
|
||||
}
|
||||
etcdNodeName := string(data)
|
||||
node.Annotations[etcd.NodeID] = etcdNodeName
|
||||
node.Annotations[etcd.NodeNameAnnotation] = etcdNodeName
|
||||
|
||||
address, err := etcd.GetAdvertiseAddress(controlConfig.PrivateIP)
|
||||
if err != nil {
|
||||
logrus.Infof("Waiting for etcd node address to be available: %v", err)
|
||||
continue
|
||||
}
|
||||
node.Annotations[etcd.NodeAddress] = address
|
||||
node.Annotations[etcd.NodeAddressAnnotation] = address
|
||||
|
||||
_, err = nodes.Update(node)
|
||||
if err == nil {
|
||||
|
|
|
@ -154,6 +154,11 @@ func runControllers(ctx context.Context, wg *sync.WaitGroup, config *Config) err
|
|||
if err := coreControllers(ctx, sc, config); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if controlConfig.Runtime.LeaderElectedClusterControllerStart != nil {
|
||||
if err := controlConfig.Runtime.LeaderElectedClusterControllerStart(ctx); err != nil {
|
||||
panic(errors.Wrap(err, "failed to start leader elected cluster controllers"))
|
||||
}
|
||||
}
|
||||
for _, controller := range config.LeaderControllers {
|
||||
if err := controller(ctx, sc); err != nil {
|
||||
panic(errors.Wrap(err, "leader controller"))
|
||||
|
|
Loading…
Reference in New Issue