Merge pull request #37871 from Random-Liu/use-patch-in-kubelet

Automatic merge from submit-queue (batch tested with PRs 36692, 37871)

Use PatchStatus to update node status in kubelet.

Fixes https://github.com/kubernetes/kubernetes/issues/37771.

This PR changes kubelet to update node status with `PatchStatus`.

@caesarxuchao @ymqytw told me that there is a limitation of current `CreateTwoWayMergePatch`, it doesn't support primitive type slice which uses strategic merge.
* I checked the node status, the only primitive type slices in NodeStatus are as follows, they are not using strategic merge:
  * [`ContainerImage.Names`](https://github.com/kubernetes/kubernetes/blob/master/pkg/api/v1/types.go#L2963)
  * [`VolumesInUse`](https://github.com/kubernetes/kubernetes/blob/master/pkg/api/v1/types.go#L2909)
* Volume package is already [using `CreateStrategicMergePath` to generate node status update patch](https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/volume/attachdetach/statusupdater/node_status_updater.go#L111), and till now everything is fine. 

@yujuhong @dchen1107 
/cc @kubernetes/sig-node
pull/6/head
Kubernetes Submit Queue 2016-12-09 11:29:12 -08:00 committed by GitHub
commit 43233caaf0
6 changed files with 196 additions and 118 deletions

View File

@ -91,6 +91,7 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
clonedNode)
}
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(

View File

@ -51,6 +51,7 @@ go_library(
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/client/record:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/conversion:go_default_library",
"//pkg/fieldpath:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/kubelet/api:go_default_library",
@ -192,6 +193,7 @@ go_test(
"//pkg/util/rand:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/util/uuid:go_default_library",
"//pkg/util/wait:go_default_library",

View File

@ -32,11 +32,14 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/types"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
@ -110,6 +113,18 @@ func (kl *Kubelet) tryRegisterWithApiServer(node *v1.Node) bool {
return false
}
clonedNode, err := conversion.NewCloner().DeepCopy(existingNode)
if err != nil {
glog.Errorf("Unable to clone %q node object %#v: %v", kl.nodeName, existingNode, err)
return false
}
originalNode, ok := clonedNode.(*v1.Node)
if !ok || originalNode == nil {
glog.Errorf("Unable to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode)
return false
}
if existingNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", kl.nodeName)
@ -118,7 +133,8 @@ func (kl *Kubelet) tryRegisterWithApiServer(node *v1.Node) bool {
// annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
if requiresUpdate {
if _, err := kl.kubeClient.Core().Nodes().UpdateStatus(existingNode); err != nil {
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName),
originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false
}
@ -347,20 +363,30 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
}
node := &nodes.Items[0]
clonedNode, err := conversion.NewCloner().DeepCopy(node)
if err != nil {
return fmt.Errorf("error clone node %q: %v", kl.nodeName, err)
}
originalNode, ok := clonedNode.(*v1.Node)
if !ok || originalNode == nil {
return fmt.Errorf("failed to cast %q node object %#v to v1.Node", kl.nodeName, clonedNode)
}
kl.updatePodCIDR(node.Spec.PodCIDR)
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
// Patch the current status on the API server
updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}
// If update finishes sucessfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
}
return err
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
}
// recordNodeStatusEvent records an event of the given type with the given

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"encoding/json"
"fmt"
"reflect"
goruntime "runtime"
@ -39,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
@ -90,6 +92,23 @@ func generateImageTags() []string {
return tagList
}
func applyNodeStatusPatch(originalNode *v1.Node, patch []byte) (*v1.Node, error) {
original, err := json.Marshal(originalNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal original node %#v: %v", originalNode, err)
}
updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to apply strategic merge patch %q on node %#v: %v",
patch, originalNode, err)
}
updatedNode := &v1.Node{}
if err := json.Unmarshal(updated, updatedNode); err != nil {
return nil, fmt.Errorf("failed to unmarshal updated node %q: %v", updated, err)
}
return updatedNode, nil
}
func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
@ -97,9 +116,8 @@ func TestUpdateNewNodeStatus(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -200,12 +218,12 @@ func TestUpdateNewNodeStatus(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected object type")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
@ -237,9 +255,8 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
@ -280,12 +297,13 @@ func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
// StrategicMergePatch(original, patch []byte, dataStruct interface{}) ([]byte, error)
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected object type")
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
var oodCondition v1.NodeCondition
@ -312,8 +330,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{
existingNode := v1.Node{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
@ -362,8 +379,8 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
@ -474,13 +491,13 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
patchAction, ok := actions[1].(core.PatchActionImpl)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
t.Errorf("unexpected action type. expected PatchActionImpl, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*v1.Node)
updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch())
if !ok {
t.Errorf("unexpected object type")
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
@ -508,9 +525,11 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
// Do not set nano second, because apiserver function doesn't support nano second. (Only support
// RFC3339).
clock.SetTime(time.Unix(123456, 0))
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{
existingNode := v1.Node{
ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
@ -533,8 +552,8 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
},
},
},
},
}}).ReactionChain
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
@ -637,13 +656,13 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
if len(actions) != 2 {
t.Errorf("%d. unexpected actions: %v", tcIdx, actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
patchAction, ok := actions[1].(core.PatchActionImpl)
if !ok {
t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1])
t.Errorf("%d. unexpected action type. expected PatchActionImpl, got %#v", tcIdx, actions[1])
}
updatedNode, ok := updateAction.GetObject().(*v1.Node)
if !ok {
t.Errorf("%d. unexpected object type", tcIdx)
updatedNode, err := applyNodeStatusPatch(&existingNode, patchAction.GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
kubeClient.ClearActions()
@ -656,7 +675,6 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T)
if !reflect.DeepEqual(tc.expected, oodCondition) {
t.Errorf("%d.\nunexpected objects: %s", tcIdx, diff.ObjectDiff(tc.expected, oodCondition))
}
}
}
@ -666,9 +684,8 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{
{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
existingNode := v1.Node{ObjectMeta: v1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
@ -772,12 +789,12 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
if !actions[1].Matches("patch", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*v1.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
if err != nil {
t.Fatalf("can't apply node status patch: %v", err)
}
for i, cond := range updatedNode.Status.Conditions {
@ -984,7 +1001,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
existingNode *v1.Node
createError error
getError error
updateError error
patchError error
deleteError error
expectedResult bool
expectedActions int
@ -1056,7 +1073,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
newNode: newNode(false, "a"),
createError: alreadyExists,
existingNode: newNode(true, "a"),
updateError: conflict,
patchError: conflict,
expectedResult: false,
expectedActions: 3,
},
@ -1087,9 +1104,9 @@ func TestTryRegisterWithApiServer(t *testing.T) {
// Return an existing (matching) node on get.
return true, tc.existingNode, tc.getError
})
kubeClient.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
kubeClient.AddReactor("patch", "nodes", func(action core.Action) (bool, runtime.Object, error) {
if action.GetSubresource() == "status" {
return true, nil, tc.updateError
return true, nil, tc.patchError
}
return notImplemented(action)
})
@ -1124,11 +1141,12 @@ func TestTryRegisterWithApiServer(t *testing.T) {
t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, createAction.GetObject())
continue
}
} else if action.GetVerb() == "update" {
updateAction := action.(core.UpdateAction)
savedNode, ok = updateAction.GetObject().(*v1.Node)
if !ok {
t.Errorf("%v: unexpected type; couldn't convert to *v1.Node: %+v", tc.name, updateAction.GetObject())
} else if action.GetVerb() == "patch" {
patchAction := action.(core.PatchActionImpl)
var err error
savedNode, err = applyNodeStatusPatch(tc.existingNode, patchAction.GetPatch())
if err != nil {
t.Errorf("can't apply node status patch: %v", err)
continue
}
}

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/apis/meta/v1:go_default_library",
"//pkg/client/clientset_generated/release_1_5:go_default_library",
"//pkg/types:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//vendor:github.com/golang/glog",
],
)

View File

@ -30,6 +30,7 @@ import (
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/strategicpatch"
)
const (
@ -153,3 +154,32 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N
_, err = c.Core().Nodes().PatchStatus(string(node), patch)
return err
}
// PatchNodeStatus patches node status.
func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) {
oldData, err := json.Marshal(oldNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
}
// Reset spec to make sure only patch for Status or ObjectMeta is generated.
// Note that we don't reset ObjectMeta here, because:
// 1. This aligns with Nodes().UpdateStatus().
// 2. Some component does use this to update node annotations.
newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode)
if err != nil {
return nil, fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err)
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil {
return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
}
updatedNode, err := c.Core().Nodes().Patch(string(nodeName), api.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, nil
}