From beba1ebbf8fac67635339daa9b9bb27e6fc16743 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Thu, 1 Dec 2016 14:46:20 -0800 Subject: [PATCH] Use PatchStatus to update node status in kubelet. --- .../statusupdater/node_status_updater.go | 1 + pkg/kubelet/BUILD | 2 + pkg/kubelet/kubelet_node_status.go | 42 +++- pkg/kubelet/kubelet_node_status_test.go | 238 ++++++++++-------- pkg/util/node/BUILD | 1 + pkg/util/node/node.go | 30 +++ 6 files changed, 196 insertions(+), 118 deletions(-) diff --git a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go index 9a7ffcd9a4..3a6560faf4 100644 --- a/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go +++ b/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go @@ -91,6 +91,7 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error { clonedNode) } + // TODO: Change to pkg/util/node.UpdateNodeStatus. oldData, err := json.Marshal(node) if err != nil { return fmt.Errorf( diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index ec875fd40e..d3785fc3ac 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -51,6 +51,7 @@ go_library( "//pkg/client/clientset_generated/release_1_5:go_default_library", "//pkg/client/record:go_default_library", "//pkg/cloudprovider:go_default_library", + "//pkg/conversion:go_default_library", "//pkg/fieldpath:go_default_library", "//pkg/fields:go_default_library", "//pkg/kubelet/api:go_default_library", @@ -192,6 +193,7 @@ go_test( "//pkg/util/rand:go_default_library", "//pkg/util/runtime:go_default_library", "//pkg/util/sets:go_default_library", + "//pkg/util/strategicpatch:go_default_library", "//pkg/util/testing:go_default_library", "//pkg/util/uuid:go_default_library", "//pkg/util/wait:go_default_library", diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 4c938fb4a8..9f4d592d65 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -31,11 +31,14 @@ import ( "k8s.io/kubernetes/pkg/api/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" + "k8s.io/kubernetes/pkg/types" utilnet "k8s.io/kubernetes/pkg/util/net" + nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume/util/volumehelper" ) @@ -109,6 +112,18 @@ func (kl *Kubelet) tryRegisterWithApiServer(node *v1.Node) bool { return false } + clonedNode, err := conversion.NewCloner().DeepCopy(existingNode) + if err != nil { + glog.Errorf("Unable to clone %q node object %#v: %v", kl.nodeName, existingNode, err) + return false + } + + originalNode, ok := clonedNode.(*v1.Node) + if !ok || originalNode == nil { + glog.Errorf("Unable to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode) + return false + } + if existingNode.Spec.ExternalID == node.Spec.ExternalID { glog.Infof("Node %s was previously registered", kl.nodeName) @@ -117,7 +132,8 @@ func (kl *Kubelet) tryRegisterWithApiServer(node *v1.Node) bool { // annotation. requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) if requiresUpdate { - if _, err := kl.kubeClient.Core().Nodes().UpdateStatus(existingNode); err != nil { + if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), + originalNode, existingNode); err != nil { glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) return false } @@ -336,20 +352,30 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { } node := &nodes.Items[0] + clonedNode, err := conversion.NewCloner().DeepCopy(node) + if err != nil { + return fmt.Errorf("error clone node %q: %v", kl.nodeName, err) + } + + originalNode, ok := clonedNode.(*v1.Node) + if !ok || originalNode == nil { + return fmt.Errorf("failed to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode) + } + kl.updatePodCIDR(node.Spec.PodCIDR) if err := kl.setNodeStatus(node); err != nil { return err } - // Update the current status on the API server - updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node) + // Patch the current status on the API server + updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node) + if err != nil { + return err + } // If update finishes sucessfully, mark the volumeInUse as reportedInUse to indicate // those volumes are already updated in the node's status - if err == nil { - kl.volumeManager.MarkVolumesAsReportedInUse( - updatedNode.Status.VolumesInUse) - } - return err + kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) + return nil } // recordNodeStatusEvent records an event of the given type with the given diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 4a870380a2..2e5ae3f0ed 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -17,6 +17,7 @@ limitations under the License. package kubelet import ( + "encoding/json" "fmt" "reflect" goruntime "runtime" @@ -39,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/rand" + "k8s.io/kubernetes/pkg/util/strategicpatch" "k8s.io/kubernetes/pkg/util/uuid" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/version" @@ -90,6 +92,23 @@ func generateImageTags() []string { return tagList } +func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) { + original, err := json.Marshal(originalNode) + if err != nil { + return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err) + } + updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{}) + if err != nil { + return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v", + patch, originalNode, err) + } + updatedNode := &v1.Node{} + if err := json.Unmarshal(updated, updatedNode); err != nil { + return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err) + } + return updatedNode, nil +} + func TestUpdateNewNodeStatus(t *testing.T) { // generate one more than maxImagesInNodeStatus in inputImageList inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1) @@ -97,9 +116,8 @@ func TestUpdateNewNodeStatus(t *testing.T) { t, inputImageList, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{ - {ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain + existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}} + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -200,12 +218,12 @@ func TestUpdateNewNodeStatus(t *testing.T) { if len(actions) != 2 { t.Fatalf("unexpected actions: %v", actions) } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" { t.Fatalf("unexpected actions: %v", actions) } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node) - if !ok { - t.Errorf("unexpected object type") + updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) + if err != nil { + t.Fatalf("can't apply node status patch: %v", err) } for i, cond := range updatedNode.Status.Conditions { if cond.LastHeartbeatTime.IsZero() { @@ -237,9 +255,8 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{ - {ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain + existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}} + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -280,12 +297,13 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) { if len(actions) != 2 { t.Fatalf("unexpected actions: %v", actions) } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + // StrategicMergePatch(original, patch []byte, dataStruct interface{}) ([]byte, error) + if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" { t.Fatalf("unexpected actions: %v", actions) } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node) - if !ok { - t.Errorf("unexpected object type") + updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) + if err != nil { + t.Fatalf("can't apply node status patch: %v", err) } var oodCondition v1.NodeCondition @@ -312,58 +330,57 @@ func TestUpdateExistingNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{ - { - ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeOutOfDisk, - Status: v1.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: v1.NodeMemoryPressure, - Status: v1.ConditionFalse, - Reason: "KubeletHasSufficientMemory", - Message: fmt.Sprintf("kubelet has sufficient memory available"), - LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: v1.NodeDiskPressure, - Status: v1.ConditionFalse, - Reason: "KubeletHasSufficientDisk", - Message: fmt.Sprintf("kubelet has sufficient disk space available"), - LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, + existingNode := v1.Node{ + ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeOutOfDisk, + Status: v1.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + { + Type: v1.NodeMemoryPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - Allocatable: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + { + Type: v1.NodeDiskPressure, + Status: v1.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, }, + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), + }, }, - }}).ReactionChain + } + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain mockCadvisor := testKubelet.fakeCadvisor mockCadvisor.On("Start").Return(nil) machineInfo := &cadvisorapi.MachineInfo{ @@ -474,13 +491,13 @@ func TestUpdateExistingNodeStatus(t *testing.T) { if len(actions) != 2 { t.Errorf("unexpected actions: %v", actions) } - updateAction, ok := actions[1].(core.UpdateAction) + patchAction, ok := actions[1].(core.PatchActionImpl) if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + t.Errorf("unexpected action type. expected PatchActionImpl, got %#v", actions[1]) } - updatedNode, ok := updateAction.GetObject().(*v1.Node) + updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch()) if !ok { - t.Errorf("unexpected object type") + t.Fatalf("can't apply node status patch: %v", err) } for i, cond := range updatedNode.Status.Conditions { // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. @@ -508,33 +525,35 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) kubelet := testKubelet.kubelet clock := testKubelet.fakeClock + // Do not set nano second, because apiserver function doesn't support nano second. (Only support + // RFC3339). + clock.SetTime(time.Unix(123456, 0)) kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{ - { - ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - Reason: "KubeletReady", - Message: fmt.Sprintf("kubelet is posting ready status"), - LastHeartbeatTime: metav1.NewTime(clock.Now()), - LastTransitionTime: metav1.NewTime(clock.Now()), - }, - { - Type: v1.NodeOutOfDisk, - Status: v1.ConditionTrue, - Reason: "KubeletOutOfDisk", - Message: "out of disk space", - LastHeartbeatTime: metav1.NewTime(clock.Now()), - LastTransitionTime: metav1.NewTime(clock.Now()), - }, + existingNode := v1.Node{ + ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + Reason: "KubeletReady", + Message: fmt.Sprintf("kubelet is posting ready status"), + LastHeartbeatTime: metav1.NewTime(clock.Now()), + LastTransitionTime: metav1.NewTime(clock.Now()), + }, + { + Type: v1.NodeOutOfDisk, + Status: v1.ConditionTrue, + Reason: "KubeletOutOfDisk", + Message: "out of disk space", + LastHeartbeatTime: metav1.NewTime(clock.Now()), + LastTransitionTime: metav1.NewTime(clock.Now()), }, }, }, - }}).ReactionChain + } + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain mockCadvisor := testKubelet.fakeCadvisor machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", @@ -637,13 +656,13 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) if len(actions) != 2 { t.Errorf("%d. unexpected actions: %v", tcIdx, actions) } - updateAction, ok := actions[1].(core.UpdateAction) + patchAction, ok := actions[1].(core.PatchActionImpl) if !ok { - t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1]) + t.Errorf("%d. unexpected action type. expected PatchActionImpl, got %#v", tcIdx, actions[1]) } - updatedNode, ok := updateAction.GetObject().(*v1.Node) - if !ok { - t.Errorf("%d. unexpected object type", tcIdx) + updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch()) + if err != nil { + t.Fatalf("can't apply node status patch: %v", err) } kubeClient.ClearActions() @@ -656,7 +675,6 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) if !reflect.DeepEqual(tc.expected, oodCondition) { t.Errorf("%d.\nunexpected objects: %s", tcIdx, diff.ObjectDiff(tc.expected, oodCondition)) - } } } @@ -666,9 +684,8 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { kubelet := testKubelet.kubelet clock := testKubelet.fakeClock kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{ - {ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}, - }}).ReactionChain + existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}} + kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain mockCadvisor := testKubelet.fakeCadvisor mockCadvisor.On("Start").Return(nil) machineInfo := &cadvisorapi.MachineInfo{ @@ -772,12 +789,12 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { if len(actions) != 2 { t.Fatalf("unexpected actions: %v", actions) } - if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" { + if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" { t.Fatalf("unexpected actions: %v", actions) } - updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node) - if !ok { - t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1]) + updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) + if err != nil { + t.Fatalf("can't apply node status patch: %v", err) } for i, cond := range updatedNode.Status.Conditions { @@ -984,7 +1001,7 @@ func TestTryRegisterWithApiServer(t *testing.T) { existingNode *v1.Node createError error getError error - updateError error + patchError error deleteError error expectedResult bool expectedActions int @@ -1056,7 +1073,7 @@ func TestTryRegisterWithApiServer(t *testing.T) { newNode: newNode(false, "a"), createError: alreadyExists, existingNode: newNode(true, "a"), - updateError: conflict, + patchError: conflict, expectedResult: false, expectedActions: 3, }, @@ -1087,9 +1104,9 @@ func TestTryRegisterWithApiServer(t *testing.T) { // Return an existing (matching) node on get. return true, tc.existingNode, tc.getError }) - kubeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) { if action.GetSubresource() == "status" { - return true, nil, tc.updateError + return true, nil, tc.patchError } return notImplemented(action) }) @@ -1124,11 +1141,12 @@ func TestTryRegisterWithApiServer(t *testing.T) { t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, createAction.GetObject()) continue } - } else if action.GetVerb() == "update" { - updateAction := action.(core.UpdateAction) - savedNode, ok = updateAction.GetObject().(*v1.Node) - if !ok { - t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, updateAction.GetObject()) + } else if action.GetVerb() == "patch" { + patchAction := action.(core.PatchActionImpl) + var err error + savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch()) + if err != nil { + t.Errorf("can't apply node status patch: %v", err) continue } } diff --git a/pkg/util/node/BUILD b/pkg/util/node/BUILD index 97e8b3817b..62b87c9a7f 100644 --- a/pkg/util/node/BUILD +++ b/pkg/util/node/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/apis/meta/v1:go_default_library", "//pkg/client/clientset_generated/release_1_5:go_default_library", "//pkg/types:go_default_library", + "//pkg/util/strategicpatch:go_default_library", "//vendor:github.com/golang/glog", ], ) diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index 7a5eb57775..4334624b09 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/strategicpatch" ) const ( @@ -153,3 +154,32 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N _, err = c.Core().Nodes().PatchStatus(string(node), patch) return err } + +// PatchNodeStatus patches node status. +func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { + oldData, err := json.Marshal(oldNode) + if err != nil { + return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) + } + + // Reset spec to make sure only patch for Status or ObjectMeta is generated. + // Note that we don't reset ObjectMeta here, because: + // 1. This aligns with Nodes().UpdateStatus(). + // 2. Some component does use this to update node annotations. + newNode.Spec = oldNode.Spec + newData, err := json.Marshal(newNode) + if err != nil { + return nil, fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + if err != nil { + return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) + } + + updatedNode, err := c.Core().Nodes().Patch(string(nodeName), api.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) + } + return updatedNode, nil +}