diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index c92e9e3ff1..b36fafdb59 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -48,7 +48,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/kubernetes" nodeHelper "k8s.io/component-helpers/node/util" nodeUtil "k8s.io/kubernetes/pkg/controller/util/node" ) @@ -70,6 +70,13 @@ const ( defaultKeepAliveTimeout = 10 * time.Second maxBackupRetention = 5 + + etcdStatusType = v1.NodeConditionType("EtcdIsVoter") + + StatusUnjoined MemberStatus = "unjoined" + StatusUnhealthy MemberStatus = "unhealthy" + StatusLearner MemberStatus = "learner" + StatusVoter MemberStatus = "voter" ) var ( @@ -91,6 +98,8 @@ type NodeControllerGetter func() controllerv1.NodeController // explicit interface check var _ managed.Driver = &ETCD{} +type MemberStatus string + type ETCD struct { client *clientv3.Client config *config.Control @@ -1036,6 +1045,7 @@ func (e *ETCD) manageLearners(ctx context.Context) { logrus.Debug("Etcd client was nil") continue } + endpoints := getEndpoints(e.config) if status, err := e.client.Status(ctx, endpoints[0]); err != nil { logrus.Errorf("Failed to check local etcd status for learner management: %v", err) @@ -1067,28 +1077,51 @@ func (e *ETCD) manageLearners(ctx context.Context) { logrus.Warnf("Failed to list nodes with etcd role: %v", err) } + // a map to track if a node is a member of the etcd cluster or not + nodeIsMember := make(map[string]bool) + nodesMap := make(map[string]*v1.Node) + for _, node := range nodes { + nodeIsMember[node.Name] = false + nodesMap[node.Name] = node + } + for _, member := range members.Members { + var node *v1.Node + for _, node = range nodes { + if strings.HasPrefix(member.Name, node.Name+"-") { + nodeIsMember[node.Name] = true + } + } + if member.IsLearner { if err := e.trackLearnerProgress(ctx, progress, member); err != nil { logrus.Errorf("Failed to track learner progress towards promotion: %v", err) } - for _, node := range nodes { - if strings.HasPrefix(member.Name, node.Name+"-") { - _, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, false) - if err != nil { - logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err) - } - } + + if err := e.setEtcdStatusCondition(node, client, member.Name, StatusLearner, ""); err != nil { + logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err) } break } - for _, node := range nodes { - if strings.HasPrefix(member.Name, node.Name+"-") { - _, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, true) - if err != nil { - logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err) - } + // verify if the member is healthy and set the etcd status condition + if _, err := e.getETCDStatus(ctx, member.ClientURLs[0]); err != nil { + if err := e.setEtcdStatusCondition(node, client, member.Name, StatusUnhealthy, err.Error()); err != nil { + logrus.Errorf("Unable to set etcd status condition for unhealthy node %s: %v", member.Name, err) + } + continue + } + + if err := e.setEtcdStatusCondition(node, client, member.Name, StatusVoter, ""); err != nil { + logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err) + } + } + + for nodeName, isMember := range nodeIsMember { + if !isMember { + node := nodesMap[nodeName] + if err := e.setEtcdStatusCondition(node, client, nodeName, StatusUnjoined, ""); err != nil { + logrus.Errorf("Unable to set etcd status condition for a node that is not a cluster member %s: %v", nodeName, err) } } } @@ -1129,9 +1162,7 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre // Update progress by retrieving status from the member's first reachable client URL for _, ep := range member.ClientURLs { - ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() - status, err := e.client.Status(ctx, ep) + status, err := e.getETCDStatus(ctx, ep) if err != nil { logrus.Debugf("Failed to get etcd status from learner %s at %s: %v", member.Name, ep, err) continue @@ -1162,46 +1193,77 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre return e.setLearnerProgress(ctx, progress) } -func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client corev1.CoreV1Interface, memberName string, promoted bool) (*v1.Node, []byte, error) { - etcdStatusType := v1.NodeConditionType("EtcdIsVoter") +func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) { + ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout) + defer cancel() + resp, err := e.client.Status(ctx, url) + if err != nil { + return resp, errors.Wrap(err, "failed to check etcd member status") + } + if len(resp.Errors) != 0 { + return resp, errors.New("etcd member has status errors: " + strings.Join(resp.Errors, ",")) + } + return resp, nil +} +func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client kubernetes.Interface, memberName string, memberStatus MemberStatus, message string) error { var newCondition v1.NodeCondition - if promoted { + switch memberStatus { + case StatusLearner: + newCondition = v1.NodeCondition{ + Type: etcdStatusType, + Status: "False", + Reason: "MemberIsLearner", + Message: "Node has not been promoted to voting member of the etcd cluster", + } + case StatusVoter: newCondition = v1.NodeCondition{ Type: etcdStatusType, Status: "True", Reason: "MemberNotLearner", Message: "Node is a voting member of the etcd cluster", } - } else { + case StatusUnhealthy: newCondition = v1.NodeCondition{ Type: etcdStatusType, Status: "False", - Reason: "MemberIsLearner", - Message: "Node has not been promoted to voting member of the etcd cluster", + Reason: "Unhealthy", + Message: "Node is unhealthy", } + case StatusUnjoined: + newCondition = v1.NodeCondition{ + Type: etcdStatusType, + Status: "False", + Reason: "NotAMember", + Message: "Node is not a member of the etcd cluster", + } + default: + logrus.Warnf("Unknown etcd member status %s", memberStatus) + return nil + } + + if message != "" { + newCondition.Message = message } - updatedNode := *node - if find, condition := nodeUtil.GetNodeCondition(&updatedNode.Status, etcdStatusType); find >= 0 { - if condition.Status == newCondition.Status { - logrus.Debugf("Member %s is not changing etcd status condition", memberName) + if find, condition := nodeUtil.GetNodeCondition(&node.Status, etcdStatusType); find >= 0 { + if condition.Status == newCondition.Status && memberStatus != StatusUnjoined { + logrus.Debugf("Node %s is not changing etcd status condition", memberName) condition.LastHeartbeatTime = metav1.Now() - return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode) + return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition) } - logrus.Debugf("Member %s is changing etcd condition", memberName) + logrus.Debugf("Node %s is changing etcd status condition", memberName) condition = &newCondition condition.LastHeartbeatTime = metav1.Now() condition.LastTransitionTime = metav1.Now() - return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode) + return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition) } - logrus.Infof("Adding etcd member %s status condition", memberName) + logrus.Infof("Adding node %s etcd status condition", memberName) newCondition.LastHeartbeatTime = metav1.Now() newCondition.LastTransitionTime = metav1.Now() - updatedNode.Status.Conditions = append(updatedNode.Status.Conditions, newCondition) - return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode) + return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), newCondition) } // getLearnerProgress returns the stored learnerProgress struct as retrieved from etcd diff --git a/pkg/etcd/metadata_controller.go b/pkg/etcd/metadata_controller.go index da86e81263..50b2f6c74d 100644 --- a/pkg/etcd/metadata_controller.go +++ b/pkg/etcd/metadata_controller.go @@ -13,6 +13,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/util/retry" + nodeUtil "k8s.io/kubernetes/pkg/controller/util/node" ) func registerMetadataHandlers(ctx context.Context, etcd *ETCD) { @@ -108,6 +109,10 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) { node.Labels = map[string]string{} } + if find, _ := nodeUtil.GetNodeCondition(&node.Status, etcdStatusType); find >= 0 { + node.Status.Conditions = append(node.Status.Conditions[:find], node.Status.Conditions[find+1:]...) + } + delete(node.Annotations, NodeNameAnnotation) delete(node.Annotations, NodeAddressAnnotation) delete(node.Labels, util.ETCDRoleLabelKey)