nodelifecycle controller: reconcile node OS/arch labels

pull/564/head
Yu-Ju Hong 2019-02-22 16:09:07 -08:00
parent 9eebfe7a04
commit bd2301a628
5 changed files with 329 additions and 26 deletions

View File

@ -88,6 +88,12 @@ var UpdateTaintBackoff = wait.Backoff{
Jitter: 1.0,
}
var UpdateLabelBackoff = wait.Backoff{
Steps: 5,
Duration: 100 * time.Millisecond,
Jitter: 1.0,
}
var (
KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)
@ -1045,3 +1051,48 @@ func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string {
return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32()))
}
func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error {
firstTry := true
return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error {
var err error
var node *v1.Node
// First we try getting node from the API server cache, as it's cheaper. If it fails
// we get it from etcd to be sure to have fresh data.
if firstTry {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{ResourceVersion: "0"})
firstTry = false
} else {
node, err = kubeClient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
}
if err != nil {
return err
}
// Make a copy of the node and update the labels.
newNode := node.DeepCopy()
if newNode.Labels == nil {
newNode.Labels = make(map[string]string)
}
for key, value := range labelsToUpdate {
newNode.Labels[key] = value
}
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
}
newData, err := json.Marshal(newNode)
if err != nil {
return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return fmt.Errorf("failed to create a two-way merge patch: %v", err)
}
if _, err := kubeClient.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes); err != nil {
return fmt.Errorf("failed to patch the node: %v", err)
}
return nil
})
}

View File

@ -13,6 +13,7 @@ go_library(
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
@ -72,6 +73,7 @@ go_test(
"//pkg/controller/testutil:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/util/taints:go_default_library",

View File

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -130,6 +131,37 @@ const (
retrySleepTime = 20 * time.Millisecond
)
// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
// primaryKey and secondaryKey are keys of labels to reconcile.
// - If both keys exist, but their values don't match. Use the value from the
// primaryKey as the source of truth to reconcile.
// - If ensureSecondaryExists is true, and the secondaryKey does not
// exist, secondaryKey will be added with the value of the primaryKey.
var labelReconcileInfo = []struct {
primaryKey string
secondaryKey string
ensureSecondaryExists bool
}{
{
// Reconcile the beta and the stable OS label using the beta label as
// the source of truth.
// TODO(#73084): switch to using the stable label as the source of
// truth in v1.18.
primaryKey: kubeletapis.LabelOS,
secondaryKey: v1.LabelOSStable,
ensureSecondaryExists: true,
},
{
// Reconcile the beta and the stable arch label using the beta label as
// the source of truth.
// TODO(#73084): switch to using the stable label as the source of
// truth in v1.18.
primaryKey: kubeletapis.LabelArch,
secondaryKey: v1.LabelArchStable,
ensureSecondaryExists: true,
},
}
type nodeHealthData struct {
probeTimestamp metav1.Time
readyTransitionTimestamp metav1.Time
@ -355,18 +387,20 @@ func NewNodeLifecycleController(
})
}
klog.Infof("Controller will reconcile labels.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
})
if nc.taintNodeByCondition {
klog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
})
}
nc.leaseLister = leaseInformer.Lister()
@ -401,18 +435,16 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
}
if nc.useTaintBasedEvictions {
@ -436,7 +468,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
<-stopCh
}
func (nc *Controller) doNoScheduleTaintingPassWorker() {
func (nc *Controller) doNodeProcessingPassWorker() {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
@ -445,10 +477,17 @@ func (nc *Controller) doNoScheduleTaintingPassWorker() {
return
}
nodeName := obj.(string)
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
// TODO (k82cn): Add nodeName back to the queue.
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
if nc.taintNodeByCondition {
if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
// TODO(k82cn): Add nodeName back to the queue
}
}
// TODO: re-evaluate whether there are any labels that need to be
// reconcile in 1.19. Remove this function if it's no longer necessary.
if err := nc.reconcileNodeLabels(nodeName); err != nil {
klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
// TODO(yujuhong): Add nodeName back to the queue
}
nc.nodeUpdateQueue.Done(nodeName)
}
@ -1191,6 +1230,53 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition)
}
}
// reconcileNodeLabels reconciles node labels.
func (nc *Controller) reconcileNodeLabels(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if node.Labels == nil {
// Nothing to reconcile.
return nil
}
labelsToUpdate := map[string]string{}
for _, r := range labelReconcileInfo {
primaryValue, primaryExists := node.Labels[r.primaryKey]
secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
if !primaryExists {
// The primary label key does not exist. This should not happen
// within our supported version skew range, when no external
// components/factors modifying the node object. Ignore this case.
continue
}
if secondaryExists && primaryValue != secondaryValue {
// Secondary label exists, but not consistent with the primary
// label. Need to reconcile.
labelsToUpdate[r.secondaryKey] = primaryValue
} else if !secondaryExists && r.ensureSecondaryExists {
// Apply secondary label based on primary label.
labelsToUpdate[r.secondaryKey] = primaryValue
}
}
if len(labelsToUpdate) == 0 {
return nil
}
if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}
return nil
}
func hash(val string, max int) int {
hasher := fnv.New32a()
io.WriteString(hasher, val)

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controller/testutil"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
@ -2910,3 +2911,149 @@ func TestNodeEventGeneration(t *testing.T) {
}
}
}
func TestReconcileNodeLabels(t *testing.T) {
fakeNow := metav1.Date(2017, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute
fakeNodeHandler := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
v1.LabelZoneRegion: "region1",
v1.LabelZoneFailureDomain: "zone1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
true)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
tests := []struct {
Name string
Node *v1.Node
ExpectedLabels map[string]string
}{
{
Name: "No-op if node has no labels",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
ExpectedLabels: nil,
},
{
Name: "No-op if no target labels present",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
v1.LabelZoneRegion: "region1",
},
},
},
ExpectedLabels: map[string]string{
v1.LabelZoneRegion: "region1",
},
},
{
Name: "Create OS/arch stable labels when they don't exist",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelOS: "linux",
kubeletapis.LabelArch: "amd64",
},
},
},
ExpectedLabels: map[string]string{
kubeletapis.LabelOS: "linux",
kubeletapis.LabelArch: "amd64",
v1.LabelOSStable: "linux",
v1.LabelArchStable: "amd64",
},
},
{
Name: "Reconcile OS/arch stable labels to match beta labels",
Node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
kubeletapis.LabelOS: "linux",
kubeletapis.LabelArch: "amd64",
v1.LabelOSStable: "windows",
v1.LabelArchStable: "arm",
},
},
},
ExpectedLabels: map[string]string{
kubeletapis.LabelOS: "linux",
kubeletapis.LabelArch: "amd64",
v1.LabelOSStable: "linux",
v1.LabelArchStable: "amd64",
},
},
}
for _, test := range tests {
fakeNodeHandler.Update(test.Node)
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Fatalf("unexpected error: %v", err)
}
nodeController.reconcileNodeLabels(test.Node.Name)
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Fatalf("unexpected error: %v", err)
}
node0, err := nodeController.nodeLister.Get("node0")
if err != nil {
t.Fatalf("Can't get current node0...")
}
if len(node0.Labels) != len(test.ExpectedLabels) {
t.Errorf("%s: Unexpected number of taints: expected %d, got %d",
test.Name, len(test.ExpectedLabels), len(node0.Labels))
}
for key, expectedValue := range test.ExpectedLabels {
actualValue, ok := node0.Labels[key]
if !ok {
t.Errorf("%s: Can't find label %v in %v", test.Name, key, node0.Labels)
}
if actualValue != expectedValue {
t.Errorf("%s: label %q: expected value %q, got value %q", test.Name, key, expectedValue, actualValue)
}
}
}
}

View File

@ -214,6 +214,23 @@ func SwapNodeControllerTaint(kubeClient clientset.Interface, taintsToAdd, taints
return true
}
// AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
// success and false on failure.
func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate)
if err != nil {
utilruntime.HandleError(
fmt.Errorf(
"unable to update labels %+v for Node %q: %v",
labelsToUpdate,
node.Name,
err))
return false
}
klog.V(4).Infof("Updated labels %+v to Node %v", labelsToUpdate, node.Name)
return true
}
// CreateAddNodeHandler creates an add node handler.
func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
return func(originalObj interface{}) {