|
|
@ -48,7 +48,7 @@ import (
|
|
|
|
"k8s.io/apimachinery/pkg/types"
|
|
|
|
"k8s.io/apimachinery/pkg/types"
|
|
|
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
|
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
nodeHelper "k8s.io/component-helpers/node/util"
|
|
|
|
nodeHelper "k8s.io/component-helpers/node/util"
|
|
|
|
nodeUtil "k8s.io/kubernetes/pkg/controller/util/node"
|
|
|
|
nodeUtil "k8s.io/kubernetes/pkg/controller/util/node"
|
|
|
|
)
|
|
|
|
)
|
|
|
@ -70,6 +70,13 @@ const (
|
|
|
|
defaultKeepAliveTimeout = 10 * time.Second
|
|
|
|
defaultKeepAliveTimeout = 10 * time.Second
|
|
|
|
|
|
|
|
|
|
|
|
maxBackupRetention = 5
|
|
|
|
maxBackupRetention = 5
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
etcdStatusType = v1.NodeConditionType("EtcdIsVoter")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StatusUnjoined MemberStatus = "unjoined"
|
|
|
|
|
|
|
|
StatusUnhealthy MemberStatus = "unhealthy"
|
|
|
|
|
|
|
|
StatusLearner MemberStatus = "learner"
|
|
|
|
|
|
|
|
StatusVoter MemberStatus = "voter"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
var (
|
|
|
@ -91,6 +98,8 @@ type NodeControllerGetter func() controllerv1.NodeController
|
|
|
|
// explicit interface check
|
|
|
|
// explicit interface check
|
|
|
|
var _ managed.Driver = &ETCD{}
|
|
|
|
var _ managed.Driver = &ETCD{}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type MemberStatus string
|
|
|
|
|
|
|
|
|
|
|
|
type ETCD struct {
|
|
|
|
type ETCD struct {
|
|
|
|
client *clientv3.Client
|
|
|
|
client *clientv3.Client
|
|
|
|
config *config.Control
|
|
|
|
config *config.Control
|
|
|
@ -1036,6 +1045,7 @@ func (e *ETCD) manageLearners(ctx context.Context) {
|
|
|
|
logrus.Debug("Etcd client was nil")
|
|
|
|
logrus.Debug("Etcd client was nil")
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
endpoints := getEndpoints(e.config)
|
|
|
|
endpoints := getEndpoints(e.config)
|
|
|
|
if status, err := e.client.Status(ctx, endpoints[0]); err != nil {
|
|
|
|
if status, err := e.client.Status(ctx, endpoints[0]); err != nil {
|
|
|
|
logrus.Errorf("Failed to check local etcd status for learner management: %v", err)
|
|
|
|
logrus.Errorf("Failed to check local etcd status for learner management: %v", err)
|
|
|
@ -1067,29 +1077,52 @@ func (e *ETCD) manageLearners(ctx context.Context) {
|
|
|
|
logrus.Warnf("Failed to list nodes with etcd role: %v", err)
|
|
|
|
logrus.Warnf("Failed to list nodes with etcd role: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// a map to track if a node is a member of the etcd cluster or not
|
|
|
|
|
|
|
|
nodeIsMember := make(map[string]bool)
|
|
|
|
|
|
|
|
nodesMap := make(map[string]*v1.Node)
|
|
|
|
|
|
|
|
for _, node := range nodes {
|
|
|
|
|
|
|
|
nodeIsMember[node.Name] = false
|
|
|
|
|
|
|
|
nodesMap[node.Name] = node
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, member := range members.Members {
|
|
|
|
for _, member := range members.Members {
|
|
|
|
|
|
|
|
var node *v1.Node
|
|
|
|
|
|
|
|
for _, node = range nodes {
|
|
|
|
|
|
|
|
if strings.HasPrefix(member.Name, node.Name+"-") {
|
|
|
|
|
|
|
|
nodeIsMember[node.Name] = true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if member.IsLearner {
|
|
|
|
if member.IsLearner {
|
|
|
|
if err := e.trackLearnerProgress(ctx, progress, member); err != nil {
|
|
|
|
if err := e.trackLearnerProgress(ctx, progress, member); err != nil {
|
|
|
|
logrus.Errorf("Failed to track learner progress towards promotion: %v", err)
|
|
|
|
logrus.Errorf("Failed to track learner progress towards promotion: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
|
|
|
|
|
|
if strings.HasPrefix(member.Name, node.Name+"-") {
|
|
|
|
if err := e.setEtcdStatusCondition(node, client, member.Name, StatusLearner, ""); err != nil {
|
|
|
|
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, false)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
|
|
|
|
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// verify if the member is healthy and set the etcd status condition
|
|
|
|
|
|
|
|
if _, err := e.getETCDStatus(ctx, member.ClientURLs[0]); err != nil {
|
|
|
|
|
|
|
|
if err := e.setEtcdStatusCondition(node, client, member.Name, StatusUnhealthy, err.Error()); err != nil {
|
|
|
|
|
|
|
|
logrus.Errorf("Unable to set etcd status condition for unhealthy node %s: %v", member.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, node := range nodes {
|
|
|
|
if err := e.setEtcdStatusCondition(node, client, member.Name, StatusVoter, ""); err != nil {
|
|
|
|
if strings.HasPrefix(member.Name, node.Name+"-") {
|
|
|
|
|
|
|
|
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, true)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
|
|
|
|
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for nodeName, isMember := range nodeIsMember {
|
|
|
|
|
|
|
|
if !isMember {
|
|
|
|
|
|
|
|
node := nodesMap[nodeName]
|
|
|
|
|
|
|
|
if err := e.setEtcdStatusCondition(node, client, nodeName, StatusUnjoined, ""); err != nil {
|
|
|
|
|
|
|
|
logrus.Errorf("Unable to set etcd status condition for a node that is not a cluster member %s: %v", nodeName, err)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -1129,9 +1162,7 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
|
|
|
|
|
|
|
|
|
|
|
|
// Update progress by retrieving status from the member's first reachable client URL
|
|
|
|
// Update progress by retrieving status from the member's first reachable client URL
|
|
|
|
for _, ep := range member.ClientURLs {
|
|
|
|
for _, ep := range member.ClientURLs {
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
|
|
|
|
status, err := e.getETCDStatus(ctx, ep)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
status, err := e.client.Status(ctx, ep)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
logrus.Debugf("Failed to get etcd status from learner %s at %s: %v", member.Name, ep, err)
|
|
|
|
logrus.Debugf("Failed to get etcd status from learner %s at %s: %v", member.Name, ep, err)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
@ -1162,46 +1193,77 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
|
|
|
|
return e.setLearnerProgress(ctx, progress)
|
|
|
|
return e.setLearnerProgress(ctx, progress)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client corev1.CoreV1Interface, memberName string, promoted bool) (*v1.Node, []byte, error) {
|
|
|
|
func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) {
|
|
|
|
etcdStatusType := v1.NodeConditionType("EtcdIsVoter")
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
|
|
|
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
resp, err := e.client.Status(ctx, url)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return resp, errors.Wrap(err, "failed to check etcd member status")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(resp.Errors) != 0 {
|
|
|
|
|
|
|
|
return resp, errors.New("etcd member has status errors: " + strings.Join(resp.Errors, ","))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return resp, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client kubernetes.Interface, memberName string, memberStatus MemberStatus, message string) error {
|
|
|
|
var newCondition v1.NodeCondition
|
|
|
|
var newCondition v1.NodeCondition
|
|
|
|
if promoted {
|
|
|
|
switch memberStatus {
|
|
|
|
|
|
|
|
case StatusLearner:
|
|
|
|
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
|
|
|
|
Type: etcdStatusType,
|
|
|
|
|
|
|
|
Status: "False",
|
|
|
|
|
|
|
|
Reason: "MemberIsLearner",
|
|
|
|
|
|
|
|
Message: "Node has not been promoted to voting member of the etcd cluster",
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
case StatusVoter:
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
Type: etcdStatusType,
|
|
|
|
Type: etcdStatusType,
|
|
|
|
Status: "True",
|
|
|
|
Status: "True",
|
|
|
|
Reason: "MemberNotLearner",
|
|
|
|
Reason: "MemberNotLearner",
|
|
|
|
Message: "Node is a voting member of the etcd cluster",
|
|
|
|
Message: "Node is a voting member of the etcd cluster",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
case StatusUnhealthy:
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
Type: etcdStatusType,
|
|
|
|
Type: etcdStatusType,
|
|
|
|
Status: "False",
|
|
|
|
Status: "False",
|
|
|
|
Reason: "MemberIsLearner",
|
|
|
|
Reason: "Unhealthy",
|
|
|
|
Message: "Node has not been promoted to voting member of the etcd cluster",
|
|
|
|
Message: "Node is unhealthy",
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
case StatusUnjoined:
|
|
|
|
|
|
|
|
newCondition = v1.NodeCondition{
|
|
|
|
|
|
|
|
Type: etcdStatusType,
|
|
|
|
|
|
|
|
Status: "False",
|
|
|
|
|
|
|
|
Reason: "NotAMember",
|
|
|
|
|
|
|
|
Message: "Node is not a member of the etcd cluster",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
logrus.Warnf("Unknown etcd member status %s", memberStatus)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if message != "" {
|
|
|
|
|
|
|
|
newCondition.Message = message
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updatedNode := *node
|
|
|
|
if find, condition := nodeUtil.GetNodeCondition(&node.Status, etcdStatusType); find >= 0 {
|
|
|
|
if find, condition := nodeUtil.GetNodeCondition(&updatedNode.Status, etcdStatusType); find >= 0 {
|
|
|
|
if condition.Status == newCondition.Status && memberStatus != StatusUnjoined {
|
|
|
|
if condition.Status == newCondition.Status {
|
|
|
|
logrus.Debugf("Node %s is not changing etcd status condition", memberName)
|
|
|
|
logrus.Debugf("Member %s is not changing etcd status condition", memberName)
|
|
|
|
|
|
|
|
condition.LastHeartbeatTime = metav1.Now()
|
|
|
|
condition.LastHeartbeatTime = metav1.Now()
|
|
|
|
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
|
|
|
|
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Debugf("Member %s is changing etcd condition", memberName)
|
|
|
|
logrus.Debugf("Node %s is changing etcd status condition", memberName)
|
|
|
|
condition = &newCondition
|
|
|
|
condition = &newCondition
|
|
|
|
condition.LastHeartbeatTime = metav1.Now()
|
|
|
|
condition.LastHeartbeatTime = metav1.Now()
|
|
|
|
condition.LastTransitionTime = metav1.Now()
|
|
|
|
condition.LastTransitionTime = metav1.Now()
|
|
|
|
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
|
|
|
|
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
logrus.Infof("Adding etcd member %s status condition", memberName)
|
|
|
|
logrus.Infof("Adding node %s etcd status condition", memberName)
|
|
|
|
newCondition.LastHeartbeatTime = metav1.Now()
|
|
|
|
newCondition.LastHeartbeatTime = metav1.Now()
|
|
|
|
newCondition.LastTransitionTime = metav1.Now()
|
|
|
|
newCondition.LastTransitionTime = metav1.Now()
|
|
|
|
updatedNode.Status.Conditions = append(updatedNode.Status.Conditions, newCondition)
|
|
|
|
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), newCondition)
|
|
|
|
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// getLearnerProgress returns the stored learnerProgress struct as retrieved from etcd
|
|
|
|
// getLearnerProgress returns the stored learnerProgress struct as retrieved from etcd
|
|
|
|