Merge pull request #64768 from krzysied/scale_retries

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Adding scale error retries

**What this PR does / why we need it**:
ScaleWithRetries will retry all retryable errors, not only conflict error.
ref #63030

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-06-12 09:31:46 -07:00 committed by GitHub
commit 7e41ab4ed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 17 deletions

View File

@ -149,14 +149,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
return err
}
if waitForReplicas != nil {
err := wait.PollImmediate(
waitForReplicas.Interval,
waitForReplicas.Timeout,
scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize)))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
}
return err
return WaitForScaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, newSize, waitForReplicas)
}
return nil
}
@ -177,3 +170,19 @@ func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupRe
desiredReplicas == actualScale.Status.Replicas, nil
}
}
// WaitForScaleHasDesiredReplicas waits until condition scaleHasDesiredReplicas is satisfied
// or returns error when timeout happens
func WaitForScaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, newSize uint, waitForReplicas *RetryParams) error {
if waitForReplicas == nil {
return fmt.Errorf("waitForReplicas parameter cannot be nil!")
}
err := wait.PollImmediate(
waitForReplicas.Interval,
waitForReplicas.Timeout,
scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize)))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
}
return err
}

View File

@ -60,7 +60,6 @@ go_library(
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/nodelifecycle:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",

View File

@ -88,7 +88,6 @@ import (
"k8s.io/kubernetes/pkg/controller"
nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubectl"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/master/ports"
@ -2847,8 +2846,7 @@ func ScaleResource(
gr schema.GroupResource,
) error {
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
scaler := kubectl.NewScaler(scalesGetter)
if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size, gr); err != nil {
if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gr); err != nil {
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
}
if !wait {

View File

@ -21,6 +21,8 @@ import (
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/kubectl"
)
@ -30,13 +32,32 @@ const (
updateRetryInterval = 5 * time.Second
updateRetryTimeout = 1 * time.Minute
waitRetryInterval = 5 * time.Second
waitRetryTImeout = 5 * time.Minute
waitRetryTimeout = 5 * time.Minute
)
func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint, gr schema.GroupResource) error {
waitForScale := kubectl.NewRetryParams(updateRetryInterval, updateRetryTimeout)
waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTImeout)
if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas, gr); err != nil {
func RetryErrorCondition(condition wait.ConditionFunc) wait.ConditionFunc {
return func() (bool, error) {
done, err := condition()
if err != nil && IsRetryableAPIError(err) {
return false, nil
}
return done, err
}
}
func ScaleResourceWithRetries(scalesGetter scale.ScalesGetter, namespace, name string, size uint, gr schema.GroupResource) error {
scaler := kubectl.NewScaler(scalesGetter)
preconditions := &kubectl.ScalePrecondition{
Size: -1,
ResourceVersion: "",
}
waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTimeout)
cond := RetryErrorCondition(kubectl.ScaleCondition(scaler, preconditions, namespace, name, size, nil, gr))
err := wait.PollImmediate(updateRetryInterval, updateRetryTimeout, cond)
if err == nil {
err = kubectl.WaitForScaleHasDesiredReplicas(scalesGetter, gr, name, namespace, size, waitForReplicas)
}
if err != nil {
return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err)
}
return nil