Augmenting API call retry in nodeinfomanager

pull/58/head
Cheng Xing 2018-11-14 18:07:40 -08:00
parent 7ff49c99d3
commit ca18690ceb
2 changed files with 141 additions and 93 deletions

View File

@ -15,10 +15,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library", "//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],

View File

@ -29,15 +29,18 @@ import (
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/retry"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
nodeutil "k8s.io/kubernetes/pkg/util/node" nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util"
"time"
) )
const ( const (
@ -45,7 +48,15 @@ const (
annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid" annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
) )
var nodeKind = v1.SchemeGroupVersion.WithKind("Node") var (
nodeKind = v1.SchemeGroupVersion.WithKind("Node")
updateBackoff = wait.Backoff{
Steps: 4,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
)
// nodeInfoManager contains necessary common dependencies to update node info on both // nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects. // the Node and CSINodeInfo objects.
@ -137,51 +148,59 @@ func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
return nil return nil
} }
func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryUpdateNode(updateFuncs...); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
}
return nil
}
// updateNode repeatedly attempts to update the corresponding node object // updateNode repeatedly attempts to update the corresponding node object
// which is modified by applying the given update functions sequentially. // which is modified by applying the given update functions sequentially.
// Because updateFuncs are applied sequentially, later updateFuncs should take into account // Because updateFuncs are applied sequentially, later updateFuncs should take into account
// the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple // the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
// functions update the same field, updates in the last function are persisted. // functions update the same field, updates in the last function are persisted.
func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error { func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { // Retrieve the latest version of Node before attempting update, so that
// Retrieve the latest version of Node before attempting update, so that // existing changes are not overwritten.
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
kubeClient := nim.volumeHost.GetKubeClient() kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil { if kubeClient == nil {
return fmt.Errorf("error getting kube client") return fmt.Errorf("error getting kube client")
}
nodeClient := kubeClient.CoreV1().Nodes()
originalNode, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
node := originalNode.DeepCopy()
if err != nil {
return err // do not wrap error
}
needUpdate := false
for _, update := range updateFuncs {
newNode, updated, err := update(node)
if err != nil {
return err
}
node = newNode
needUpdate = needUpdate || updated
}
if needUpdate {
// PatchNodeStatus can update both node's status and labels or annotations
// Updating status by directly updating node does not work
_, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
return updateErr // do not wrap error
}
return nil
})
if retryErr != nil {
return fmt.Errorf("node update failed: %v", retryErr)
} }
nodeClient := kubeClient.CoreV1().Nodes()
originalNode, err := nodeClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err
}
node := originalNode.DeepCopy()
needUpdate := false
for _, update := range updateFuncs {
newNode, updated, err := update(node)
if err != nil {
return err
}
node = newNode
needUpdate = needUpdate || updated
}
if needUpdate {
// PatchNodeStatus can update both node's status and labels or annotations
// Updating status by directly updating node does not work
_, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
return updateErr
}
return nil return nil
} }
@ -333,23 +352,37 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
return fmt.Errorf("error getting CSI client") return fmt.Errorf("error getting CSI client")
} }
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { var updateErrs []error
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{}) err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if nodeInfo == nil || errors.IsNotFound(err) { if err := nim.tryUpdateCSINodeInfo(csiKubeClient, driverName, driverNodeID, topology); err != nil {
nodeInfo, err = nim.CreateCSINodeInfo() updateErrs = append(updateErrs, err)
return false, nil
} }
if err != nil { return true, nil
return err // do not wrap error
}
return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology)
}) })
if retryErr != nil { if err != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr) return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
} }
return nil return nil
} }
func (nim *nodeInfoManager) tryUpdateCSINodeInfo(
csiKubeClient csiclientset.Interface,
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
nodeInfo, err = nim.CreateCSINodeInfo()
}
if err != nil {
return err
}
return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology)
}
func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error) { func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error) {
kubeClient := nim.volumeHost.GetKubeClient() kubeClient := nim.volumeHost.GetKubeClient()
@ -364,7 +397,7 @@ func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error
node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{}) node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil { if err != nil {
return nil, err // do not wrap error return nil, err
} }
nodeInfo := &csiv1alpha1.CSINodeInfo{ nodeInfo := &csiv1alpha1.CSINodeInfo{
@ -464,55 +497,68 @@ func (nim *nodeInfoManager) installDriverToCSINodeInfo(
return err return err
} }
_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo) _, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
return err // do not wrap error return err
} }
func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo(csiDriverName string) error { func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo(
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { csiDriverName string) error {
csiKubeClient := nim.volumeHost.GetCSIClient() csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil { if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client") return fmt.Errorf("error getting CSI client")
}
var updateErrs []error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryUninstallDriverFromCSINodeInfo(csiKubeClient, csiDriverName); err != nil {
updateErrs = append(updateErrs, err)
return false, nil
} }
return true, nil
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
hasModified := false
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverStatus := range nodeInfo.Status.Drivers {
if driverStatus.Name == csiDriverName {
// Uninstall the driver if we find it
hasModified = driverStatus.Available
driverStatus.Available = false
}
newDriverStatuses = append(newDriverStatuses, driverStatus)
}
nodeInfo.Status.Drivers = newDriverStatuses
if !hasModified {
// No changes, don't update
return nil
}
err = validateCSINodeInfo(nodeInfo)
if err != nil {
return err
}
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
}) })
if retryErr != nil { if err != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr) return fmt.Errorf("error updating CSINodeInfo: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
} }
return nil return nil
} }
func (nim *nodeInfoManager) tryUninstallDriverFromCSINodeInfo(
csiKubeClient csiclientset.Interface,
csiDriverName string) error {
nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
}
hasModified := false
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverStatus := range nodeInfo.Status.Drivers {
if driverStatus.Name == csiDriverName {
// Uninstall the driver if we find it
hasModified = driverStatus.Available
driverStatus.Available = false
}
newDriverStatuses = append(newDriverStatuses, driverStatus)
}
nodeInfo.Status.Drivers = newDriverStatuses
if !hasModified {
// No changes, don't update
return nil
}
err = validateCSINodeInfo(nodeInfo)
if err != nil {
return err
}
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
}
func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc { func updateMaxAttachLimit(driverName string, maxLimit int64) nodeUpdateFunc {
return func(node *v1.Node) (*v1.Node, bool, error) { return func(node *v1.Node) (*v1.Node, bool, error) {
if maxLimit <= 0 { if maxLimit <= 0 {