From bd2301a62890cbf894f4231e616af3858d205ed3 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 22 Feb 2019 16:09:07 -0800 Subject: [PATCH] nodelifecycle controller: reconcile node OS/arch labels --- pkg/controller/controller_utils.go | 51 ++++++ pkg/controller/nodelifecycle/BUILD | 2 + .../node_lifecycle_controller.go | 138 ++++++++++++---- .../node_lifecycle_controller_test.go | 147 ++++++++++++++++++ pkg/controller/util/node/controller_utils.go | 17 ++ 5 files changed, 329 insertions(+), 26 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 4b2373af6b..ecae8d7e87 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -88,6 +88,12 @@ var UpdateTaintBackoff = wait.Backoff{ Jitter: 1.0, } +var UpdateLabelBackoff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Jitter: 1.0, +} + var ( KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc ) @@ -1045,3 +1051,48 @@ func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string { return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32())) } + +func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error { + firstTry := true + return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error { + var err error + var node *v1.Node + // First we try getting node from the API server cache, as it's cheaper. If it fails + // we get it from etcd to be sure to have fresh data. + if firstTry { + node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"}) + firstTry = false + } else { + node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + } + if err != nil { + return err + } + + // Make a copy of the node and update the labels. + newNode := node.DeepCopy() + if newNode.Labels == nil { + newNode.Labels = make(map[string]string) + } + for key, value := range labelsToUpdate { + newNode.Labels[key] = value + } + + oldData, err := json.Marshal(node) + if err != nil { + return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err) + } + newData, err := json.Marshal(newNode) + if err != nil { + return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err) + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create a two-way merge patch: %v", err) + } + if _, err := kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes); err != nil { + return fmt.Errorf("failed to patch the node: %v", err) + } + return nil + }) +} diff --git a/pkg/controller/nodelifecycle/BUILD b/pkg/controller/nodelifecycle/BUILD index 0dfe54eede..61622ba196 100644 --- a/pkg/controller/nodelifecycle/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -13,6 +13,7 @@ go_library( "//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/util/node:go_default_library", "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/util/metrics:go_default_library", "//pkg/util/node:go_default_library", @@ -72,6 +73,7 @@ go_test( "//pkg/controller/testutil:go_default_library", "//pkg/controller/util/node:go_default_library", "//pkg/features:go_default_library", + "//pkg/kubelet/apis:go_default_library", "//pkg/scheduler/api:go_default_library", "//pkg/util/node:go_default_library", "//pkg/util/taints:go_default_library", diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 9359a611b6..510210881e 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/util/metrics" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -130,6 +131,37 @@ const ( retrySleepTime = 20 * time.Millisecond ) +// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them. +// primaryKey and secondaryKey are keys of labels to reconcile. +// - If both keys exist, but their values don't match. Use the value from the +// primaryKey as the source of truth to reconcile. +// - If ensureSecondaryExists is true, and the secondaryKey does not +// exist, secondaryKey will be added with the value of the primaryKey. +var labelReconcileInfo = []struct { + primaryKey string + secondaryKey string + ensureSecondaryExists bool +}{ + { + // Reconcile the beta and the stable OS label using the beta label as + // the source of truth. + // TODO(#73084): switch to using the stable label as the source of + // truth in v1.18. + primaryKey: kubeletapis.LabelOS, + secondaryKey: v1.LabelOSStable, + ensureSecondaryExists: true, + }, + { + // Reconcile the beta and the stable arch label using the beta label as + // the source of truth. + // TODO(#73084): switch to using the stable label as the source of + // truth in v1.18. + primaryKey: kubeletapis.LabelArch, + secondaryKey: v1.LabelArchStable, + ensureSecondaryExists: true, + }, +} + type nodeHealthData struct { probeTimestamp metav1.Time readyTransitionTimestamp metav1.Time @@ -355,18 +387,20 @@ func NewNodeLifecycleController( }) } + klog.Infof("Controller will reconcile labels.") + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { + nc.nodeUpdateQueue.Add(node.Name) + return nil + }), + UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { + nc.nodeUpdateQueue.Add(newNode.Name) + return nil + }), + }) + if nc.taintNodeByCondition { klog.Infof("Controller will taint node by condition.") - nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { - nc.nodeUpdateQueue.Add(node.Name) - return nil - }), - UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { - nc.nodeUpdateQueue.Add(newNode.Name) - return nil - }), - }) } nc.leaseLister = leaseInformer.Lister() @@ -401,18 +435,16 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { go nc.taintManager.Run(stopCh) } - if nc.taintNodeByCondition { - // Close node update queue to cleanup go routine. - defer nc.nodeUpdateQueue.ShutDown() + // Close node update queue to cleanup go routine. + defer nc.nodeUpdateQueue.ShutDown() - // Start workers to update NoSchedule taint for nodes. - for i := 0; i < scheduler.UpdateWorkerSize; i++ { - // Thanks to "workqueue", each worker just need to get item from queue, because - // the item is flagged when got from queue: if new event come, the new item will - // be re-queued until "Done", so no more than one worker handle the same item and - // no event missed. - go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh) - } + // Start workers to reconcile labels and/or update NoSchedule taint for nodes. + for i := 0; i < scheduler.UpdateWorkerSize; i++ { + // Thanks to "workqueue", each worker just need to get item from queue, because + // the item is flagged when got from queue: if new event come, the new item will + // be re-queued until "Done", so no more than one worker handle the same item and + // no event missed. + go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh) } if nc.useTaintBasedEvictions { @@ -436,7 +468,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) { <-stopCh } -func (nc *Controller) doNoScheduleTaintingPassWorker() { +func (nc *Controller) doNodeProcessingPassWorker() { for { obj, shutdown := nc.nodeUpdateQueue.Get() // "nodeUpdateQueue" will be shutdown when "stopCh" closed; @@ -445,10 +477,17 @@ func (nc *Controller) doNoScheduleTaintingPassWorker() { return } nodeName := obj.(string) - - if err := nc.doNoScheduleTaintingPass(nodeName); err != nil { - // TODO (k82cn): Add nodeName back to the queue. - klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err) + if nc.taintNodeByCondition { + if err := nc.doNoScheduleTaintingPass(nodeName); err != nil { + klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err) + // TODO(k82cn): Add nodeName back to the queue + } + } + // TODO: re-evaluate whether there are any labels that need to be + // reconcile in 1.19. Remove this function if it's no longer necessary. + if err := nc.reconcileNodeLabels(nodeName); err != nil { + klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err) + // TODO(yujuhong): Add nodeName back to the queue } nc.nodeUpdateQueue.Done(nodeName) } @@ -1191,6 +1230,53 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) } } +// reconcileNodeLabels reconciles node labels. +func (nc *Controller) reconcileNodeLabels(nodeName string) error { + node, err := nc.nodeLister.Get(nodeName) + if err != nil { + // If node not found, just ignore it. + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if node.Labels == nil { + // Nothing to reconcile. + return nil + } + + labelsToUpdate := map[string]string{} + for _, r := range labelReconcileInfo { + primaryValue, primaryExists := node.Labels[r.primaryKey] + secondaryValue, secondaryExists := node.Labels[r.secondaryKey] + + if !primaryExists { + // The primary label key does not exist. This should not happen + // within our supported version skew range, when no external + // components/factors modifying the node object. Ignore this case. + continue + } + if secondaryExists && primaryValue != secondaryValue { + // Secondary label exists, but not consistent with the primary + // label. Need to reconcile. + labelsToUpdate[r.secondaryKey] = primaryValue + + } else if !secondaryExists && r.ensureSecondaryExists { + // Apply secondary label based on primary label. + labelsToUpdate[r.secondaryKey] = primaryValue + } + } + + if len(labelsToUpdate) == 0 { + return nil + } + if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) { + return fmt.Errorf("failed update labels for node %+v", node) + } + return nil +} + func hash(val string, max int) int { hasher := fnv.New32a() io.WriteString(hasher, val) diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index a935a00d88..bc53b1eeb4 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -42,6 +42,7 @@ import ( "k8s.io/kubernetes/pkg/controller/testutil" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" "k8s.io/kubernetes/pkg/features" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/util/node" taintutils "k8s.io/kubernetes/pkg/util/taints" @@ -2910,3 +2911,149 @@ func TestNodeEventGeneration(t *testing.T) { } } } + +func TestReconcileNodeLabels(t *testing.T) { + fakeNow := metav1.Date(2017, 1, 1, 12, 0, 0, 0, time.UTC) + evictionTimeout := 10 * time.Minute + + fakeNodeHandler := &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + v1.LabelZoneRegion: "region1", + v1.LabelZoneFailureDomain: "zone1", + }, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + } + + nodeController, _ := newNodeLifecycleControllerFromClient( + fakeNodeHandler, + evictionTimeout, + testRateLimiterQPS, + testRateLimiterQPS, + testLargeClusterThreshold, + testUnhealthyThreshold, + testNodeMonitorGracePeriod, + testNodeStartupGracePeriod, + testNodeMonitorPeriod, + true) + nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() + + tests := []struct { + Name string + Node *v1.Node + ExpectedLabels map[string]string + }{ + { + Name: "No-op if node has no labels", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + ExpectedLabels: nil, + }, + { + Name: "No-op if no target labels present", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + v1.LabelZoneRegion: "region1", + }, + }, + }, + ExpectedLabels: map[string]string{ + v1.LabelZoneRegion: "region1", + }, + }, + { + Name: "Create OS/arch stable labels when they don't exist", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + kubeletapis.LabelOS: "linux", + kubeletapis.LabelArch: "amd64", + }, + }, + }, + ExpectedLabels: map[string]string{ + kubeletapis.LabelOS: "linux", + kubeletapis.LabelArch: "amd64", + v1.LabelOSStable: "linux", + v1.LabelArchStable: "amd64", + }, + }, + { + Name: "Reconcile OS/arch stable labels to match beta labels", + Node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + kubeletapis.LabelOS: "linux", + kubeletapis.LabelArch: "amd64", + v1.LabelOSStable: "windows", + v1.LabelArchStable: "arm", + }, + }, + }, + ExpectedLabels: map[string]string{ + kubeletapis.LabelOS: "linux", + kubeletapis.LabelArch: "amd64", + v1.LabelOSStable: "linux", + v1.LabelArchStable: "amd64", + }, + }, + } + + for _, test := range tests { + fakeNodeHandler.Update(test.Node) + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Fatalf("unexpected error: %v", err) + } + nodeController.reconcileNodeLabels(test.Node.Name) + if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil { + t.Fatalf("unexpected error: %v", err) + } + node0, err := nodeController.nodeLister.Get("node0") + if err != nil { + t.Fatalf("Can't get current node0...") + } + if len(node0.Labels) != len(test.ExpectedLabels) { + t.Errorf("%s: Unexpected number of taints: expected %d, got %d", + test.Name, len(test.ExpectedLabels), len(node0.Labels)) + } + for key, expectedValue := range test.ExpectedLabels { + actualValue, ok := node0.Labels[key] + if !ok { + t.Errorf("%s: Can't find label %v in %v", test.Name, key, node0.Labels) + } + if actualValue != expectedValue { + t.Errorf("%s: label %q: expected value %q, got value %q", test.Name, key, expectedValue, actualValue) + } + + } + } +} diff --git a/pkg/controller/util/node/controller_utils.go b/pkg/controller/util/node/controller_utils.go index c4e8b31604..d919e7177a 100644 --- a/pkg/controller/util/node/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -214,6 +214,23 @@ func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taints return true } +// AddOrUpdateLabelsOnNode updates the labels on the node and returns true on +// success and false on failure. +func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool { + err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate) + if err != nil { + utilruntime.HandleError( + fmt.Errorf( + "unable to update labels %+v for Node %q: %v", + labelsToUpdate, + node.Name, + err)) + return false + } + klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name) + return true +} + // CreateAddNodeHandler creates an add node handler. func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) {