Merge pull request #49856 from dixudx/polish_UpdateNodeStatus

Automatic merge from submit-queue (batch tested with PRs 49856, 56257, 57027, 57695, 57432). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Change to pkg/util/node.UpdateNodeStatus

**What this PR does / why we need it**:

> // TODO: Change to pkg/util/node.UpdateNodeStatus.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #

**Special notes for your reviewer**:
/cc @brendandburns @dchen1107 @lavalamp 

**Release note**:

```release-note
None
```
pull/6/head
Kubernetes Submit Queue 2018-01-02 13:15:42 -08:00 committed by GitHub
commit 27d2ffb32f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 59 deletions

View File

@ -192,7 +192,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return return
} }
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) _, _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil { if err != nil {
glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) glog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
} }

View File

@ -14,11 +14,11 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater", importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater",
deps = [ deps = [
"//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
], ],

View File

@ -19,18 +19,15 @@ limitations under the License.
package statusupdater package statusupdater
import ( import (
"encoding/json"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
nodeutil "k8s.io/kubernetes/pkg/util/node"
) )
// NodeStatusUpdater defines a set of operations for updating the // NodeStatusUpdater defines a set of operations for updating the
@ -100,47 +97,12 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error { func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error {
node := nodeObj.DeepCopy() node := nodeObj.DeepCopy()
// TODO: Change to pkg/util/node.UpdateNodeStatus.
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf(
"failed to Marshal oldData for node %q. %v",
nodeName,
err)
}
node.Status.VolumesAttached = attachedVolumes node.Status.VolumesAttached = attachedVolumes
_, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node)
newData, err := json.Marshal(node)
if err != nil { if err != nil {
return fmt.Errorf( return err
"failed to Marshal newData for node %q. %v",
nodeName,
err)
} }
patchBytes, err := glog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes)
strategicpatch.CreateTwoWayMergePatch(oldData, newData, node)
if err != nil {
return fmt.Errorf(
"failed to CreateTwoWayMergePatch for node %q. %v",
nodeName,
err)
}
_, err = nsu.kubeClient.CoreV1().Nodes().PatchStatus(string(nodeName), patchBytes)
if err != nil {
return fmt.Errorf(
"failed to kubeClient.CoreV1().Nodes().Patch for node %q. %v",
nodeName,
err)
}
glog.V(4).Infof(
"Updating status for node %q succeeded. patchBytes: %q VolumesAttached: %v",
nodeName,
string(patchBytes),
node.Status.VolumesAttached)
return nil return nil
} }

View File

@ -132,8 +132,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
if requiresUpdate { if requiresUpdate {
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false return false
} }
@ -142,8 +141,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
return true return true
} }
glog.Errorf( glog.Errorf("Previously node %q had externalID %q; now it is %q; will delete and recreate.",
"Previously node %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID, kl.nodeName, node.Spec.ExternalID, existingNode.Spec.ExternalID,
) )
if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil { if err := kl.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
@ -408,7 +406,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
kl.setNodeStatus(node) kl.setNodeStatus(node)
// Patch the current status on the API server // Patch the current status on the API server
updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node) updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node)
if err != nil { if err != nil {
return err return err
} }

View File

@ -166,10 +166,23 @@ func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) erro
} }
// PatchNodeStatus patches node status. // PatchNodeStatus patches node status.
func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) {
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
}
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) ([]byte, error) {
oldData, err := json.Marshal(oldNode) oldData, err := json.Marshal(oldNode)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err)
} }
// Reset spec to make sure only patch for Status or ObjectMeta is generated. // Reset spec to make sure only patch for Status or ObjectMeta is generated.
@ -179,17 +192,12 @@ func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode
newNode.Spec = oldNode.Spec newNode.Spec = oldNode.Spec
newData, err := json.Marshal(newNode) newData, err := json.Marshal(newNode)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNode, nodeName, err) return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err)
} }
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err)
} }
return patchBytes, nil
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, nil
} }