diff --git a/pkg/volume/csi/nodeinfomanager/BUILD b/pkg/volume/csi/nodeinfomanager/BUILD index 2b95c9d474..e949893feb 100644 --- a/pkg/volume/csi/nodeinfomanager/BUILD +++ b/pkg/volume/csi/nodeinfomanager/BUILD @@ -15,10 +15,12 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", + "//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go index 14b2077815..cd832e169f 100644 --- a/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go +++ b/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go @@ -29,15 +29,18 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/util/retry" csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" + csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" nodeutil "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/util" + "time" ) const ( @@ -45,7 +48,15 @@ const ( annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid" ) -var nodeKind = v1.SchemeGroupVersion.WithKind("Node") +var ( + nodeKind = v1.SchemeGroupVersion.WithKind("Node") + updateBackoff = wait.Backoff{ + Steps: 4, + Duration: 10 * time.Millisecond, + Factor: 5.0, + Jitter: 0.1, + } +) // nodeInfoManager contains necessary common dependencies to update node info on both // the Node and CSINodeInfo objects. @@ -137,51 +148,59 @@ func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error { return nil } +func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error { + var updateErrs []error + err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { + if err := nim.tryUpdateNode(updateFuncs...); err != nil { + updateErrs = append(updateErrs, err) + return false, nil + } + return true, nil + }) + if err != nil { + return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) + } + return nil +} + // updateNode repeatedly attempts to update the corresponding node object // which is modified by applying the given update functions sequentially. // Because updateFuncs are applied sequentially, later updateFuncs should take into account // the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple // functions update the same field, updates in the last function are persisted. -func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - // Retrieve the latest version of Node before attempting update, so that - // existing changes are not overwritten. RetryOnConflict uses - // exponential backoff to avoid exhausting the apiserver. +func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error { + // Retrieve the latest version of Node before attempting update, so that + // existing changes are not overwritten. - kubeClient := nim.volumeHost.GetKubeClient() - if kubeClient == nil { - return fmt.Errorf("error getting kube client") - } - - nodeClient := kubeClient.CoreV1().Nodes() - originalNode, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{}) - node := originalNode.DeepCopy() - if err != nil { - return err // do not wrap error - } - - needUpdate := false - for _, update := range updateFuncs { - newNode, updated, err := update(node) - if err != nil { - return err - } - node = newNode - needUpdate = needUpdate || updated - } - - if needUpdate { - // PatchNodeStatus can update both node's status and labels or annotations - // Updating status by directly updating node does not work - _, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node) - return updateErr // do not wrap error - } - - return nil - }) - if retryErr != nil { - return fmt.Errorf("node update failed: %v", retryErr) + kubeClient := nim.volumeHost.GetKubeClient() + if kubeClient == nil { + return fmt.Errorf("error getting kube client") } + + nodeClient := kubeClient.CoreV1().Nodes() + originalNode, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{}) + if err != nil { + return err + } + node := originalNode.DeepCopy() + + needUpdate := false + for _, update := range updateFuncs { + newNode, updated, err := update(node) + if err != nil { + return err + } + node = newNode + needUpdate = needUpdate || updated + } + + if needUpdate { + // PatchNodeStatus can update both node's status and labels or annotations + // Updating status by directly updating node does not work + _, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node) + return updateErr + } + return nil } @@ -333,23 +352,37 @@ func (nim *nodeInfoManager) updateCSINodeInfo( return fmt.Errorf("error getting CSI client") } - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) - if nodeInfo == nil || errors.IsNotFound(err) { - nodeInfo, err = nim.CreateCSINodeInfo() + var updateErrs []error + err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { + if err := nim.tryUpdateCSINodeInfo(csiKubeClient, driverName, driverNodeID, topology); err != nil { + updateErrs = append(updateErrs, err) + return false, nil } - if err != nil { - return err // do not wrap error - } - - return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology) + return true, nil }) - if retryErr != nil { - return fmt.Errorf("CSINodeInfo update failed: %v", retryErr) + if err != nil { + return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) } return nil } +func (nim *nodeInfoManager) tryUpdateCSINodeInfo( + csiKubeClient csiclientset.Interface, + driverName string, + driverNodeID string, + topology *csipb.Topology) error { + + nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) + if nodeInfo == nil || errors.IsNotFound(err) { + nodeInfo, err = nim.CreateCSINodeInfo() + } + if err != nil { + return err + } + + return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology) +} + func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error) { kubeClient := nim.volumeHost.GetKubeClient() @@ -364,7 +397,7 @@ func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{}) if err != nil { - return nil, err // do not wrap error + return nil, err } nodeInfo := &csiv1alpha1.CSINodeInfo{ @@ -464,55 +497,68 @@ func (nim *nodeInfoManager) installDriverToCSINodeInfo( return err } _, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo) - return err // do not wrap error + return err } -func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo(csiDriverName string) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { +func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo( + csiDriverName string) error { - csiKubeClient := nim.volumeHost.GetCSIClient() - if csiKubeClient == nil { - return fmt.Errorf("error getting CSI client") + csiKubeClient := nim.volumeHost.GetCSIClient() + if csiKubeClient == nil { + return fmt.Errorf("error getting CSI client") + } + + var updateErrs []error + err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) { + if err := nim.tryUninstallDriverFromCSINodeInfo(csiKubeClient, csiDriverName); err != nil { + updateErrs = append(updateErrs, err) + return false, nil } - - nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos() - nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{}) - if err != nil { - return err // do not wrap error - } - - hasModified := false - newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{} - for _, driverStatus := range nodeInfo.Status.Drivers { - if driverStatus.Name == csiDriverName { - // Uninstall the driver if we find it - hasModified = driverStatus.Available - driverStatus.Available = false - } - newDriverStatuses = append(newDriverStatuses, driverStatus) - } - - nodeInfo.Status.Drivers = newDriverStatuses - - if !hasModified { - // No changes, don't update - return nil - } - - err = validateCSINodeInfo(nodeInfo) - if err != nil { - return err - } - _, updateErr := nodeInfoClient.Update(nodeInfo) - return updateErr // do not wrap error - + return true, nil }) - if retryErr != nil { - return fmt.Errorf("CSINodeInfo update failed: %v", retryErr) + if err != nil { + return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs)) } return nil } +func (nim *nodeInfoManager) tryUninstallDriverFromCSINodeInfo( + csiKubeClient csiclientset.Interface, + csiDriverName string) error { + + nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos() + nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{}) + if err != nil { + return err // do not wrap error + } + + hasModified := false + newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{} + for _, driverStatus := range nodeInfo.Status.Drivers { + if driverStatus.Name == csiDriverName { + // Uninstall the driver if we find it + hasModified = driverStatus.Available + driverStatus.Available = false + } + newDriverStatuses = append(newDriverStatuses, driverStatus) + } + + nodeInfo.Status.Drivers = newDriverStatuses + + if !hasModified { + // No changes, don't update + return nil + } + + err = validateCSINodeInfo(nodeInfo) + if err != nil { + return err + } + _, updateErr := nodeInfoClient.Update(nodeInfo) + return updateErr // do not wrap error + +} + func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc { return func(node *v1.Node) (*v1.Node, bool, error) { if maxLimit <= 0 {