mirror of https://github.com/k3s-io/k3s
Merge pull request #72873 from taragu/exponential-backoff-plugin-registration
Add exponential backoff for plugin registrationpull/564/head
commit
9487d37666
|
@ -22,12 +22,14 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
api "k8s.io/api/core/v1"
|
api "k8s.io/api/core/v1"
|
||||||
utilversion "k8s.io/apimachinery/pkg/util/version"
|
utilversion "k8s.io/apimachinery/pkg/util/version"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
|
@ -99,6 +101,12 @@ type nodeV0ClientCreator func(addr csiAddr) (
|
||||||
err error,
|
err error,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
initialDuration = 1 * time.Second
|
||||||
|
factor = 2.0
|
||||||
|
steps = 5
|
||||||
|
)
|
||||||
|
|
||||||
// newV1NodeClient creates a new NodeClient with the internally used gRPC
|
// newV1NodeClient creates a new NodeClient with the internally used gRPC
|
||||||
// connection set up. It also returns a closer which must to be called to close
|
// connection set up. It also returns a closer which must to be called to close
|
||||||
// the gRPC connection when the NodeClient is not used anymore.
|
// the gRPC connection when the NodeClient is not used anymore.
|
||||||
|
@ -177,13 +185,26 @@ func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
|
||||||
accessibleTopology map[string]string,
|
accessibleTopology map[string]string,
|
||||||
err error) {
|
err error) {
|
||||||
klog.V(4).Info(log("calling NodeGetInfo rpc"))
|
klog.V(4).Info(log("calling NodeGetInfo rpc"))
|
||||||
if c.nodeV1ClientCreator != nil {
|
|
||||||
return c.nodeGetInfoV1(ctx)
|
|
||||||
} else if c.nodeV0ClientCreator != nil {
|
|
||||||
return c.nodeGetInfoV0(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = fmt.Errorf("failed to call NodeGetInfo. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")
|
// TODO retries should happen at a lower layer (issue #73371)
|
||||||
|
backoff := wait.Backoff{Duration: initialDuration, Factor: factor, Steps: steps}
|
||||||
|
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||||
|
var getNodeInfoError error
|
||||||
|
if c.nodeV1ClientCreator != nil {
|
||||||
|
nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
|
||||||
|
} else if c.nodeV0ClientCreator != nil {
|
||||||
|
nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV0(ctx)
|
||||||
|
}
|
||||||
|
if nodeID != "" {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
// kubelet plugin registration service not implemented is a terminal error, no need to retry
|
||||||
|
if strings.Contains(getNodeInfoError.Error(), "no handler registered for plugin type") {
|
||||||
|
return false, getNodeInfoError
|
||||||
|
}
|
||||||
|
// Continue with exponential backoff
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
|
||||||
return nodeID, maxVolumePerNode, accessibleTopology, err
|
return nodeID, maxVolumePerNode, accessibleTopology, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,7 +159,6 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO (verult) retry with exponential backoff, possibly added in csi client library.
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -167,7 +166,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Error(log("registrationHandler.RegisterPlugin failed at CSI.NodeGetInfo: %v", err))
|
klog.Error(log("registrationHandler.RegisterPlugin failed at CSI.NodeGetInfo: %v", err))
|
||||||
if unregErr := unregisterDriver(pluginName); unregErr != nil {
|
if unregErr := unregisterDriver(pluginName); unregErr != nil {
|
||||||
klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous: %v", unregErr))
|
klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
|
||||||
return unregErr
|
return unregErr
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue