From 8a3c2dcc6d11dfd7c4124be0fd7d8b96bd4939bb Mon Sep 17 00:00:00 2001 From: Krzysztof Siedlecki Date: Thu, 7 Jun 2018 10:57:43 +0200 Subject: [PATCH] Adding scale error retries --- pkg/kubectl/scale.go | 25 +++++++++++++++++-------- test/e2e/framework/BUILD | 1 - test/e2e/framework/util.go | 4 +--- test/utils/update_resources.go | 31 ++++++++++++++++++++++++++----- 4 files changed, 44 insertions(+), 17 deletions(-) diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 590b600fd4..f1d46b7a11 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -149,14 +149,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec return err } if waitForReplicas != nil { - err := wait.PollImmediate( - waitForReplicas.Interval, - waitForReplicas.Timeout, - scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize))) - if err == wait.ErrWaitTimeout { - return fmt.Errorf("timed out waiting for %q to be synced", resourceName) - } - return err + return WaitForScaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, newSize, waitForReplicas) } return nil } @@ -177,3 +170,19 @@ func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupRe desiredReplicas == actualScale.Status.Replicas, nil } } + +// WaitForScaleHasDesiredReplicas waits until condition scaleHasDesiredReplicas is satisfied +// or returns error when timeout happens +func WaitForScaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, newSize uint, waitForReplicas *RetryParams) error { + if waitForReplicas == nil { + return fmt.Errorf("waitForReplicas parameter cannot be nil!") + } + err := wait.PollImmediate( + waitForReplicas.Interval, + waitForReplicas.Timeout, + scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize))) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", resourceName) + } + return err +} diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 9f6cba3800..fc650ed9fc 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -60,7 +60,6 @@ go_library( "//pkg/controller/deployment/util:go_default_library", "//pkg/controller/nodelifecycle:go_default_library", "//pkg/features:go_default_library", - "//pkg/kubectl:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index d5d3c8668c..889c403afe 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -88,7 +88,6 @@ import ( "k8s.io/kubernetes/pkg/controller" nodectlr "k8s.io/kubernetes/pkg/controller/nodelifecycle" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/kubectl" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/master/ports" @@ -2847,8 +2846,7 @@ func ScaleResource( gr schema.GroupResource, ) error { By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) - scaler := kubectl.NewScaler(scalesGetter) - if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size, gr); err != nil { + if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gr); err != nil { return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err) } if !wait { diff --git a/test/utils/update_resources.go b/test/utils/update_resources.go index b666454e54..210847cb37 100644 --- a/test/utils/update_resources.go +++ b/test/utils/update_resources.go @@ -21,6 +21,8 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/kubectl" ) @@ -30,13 +32,32 @@ const ( updateRetryInterval = 5 * time.Second updateRetryTimeout = 1 * time.Minute waitRetryInterval = 5 * time.Second - waitRetryTImeout = 5 * time.Minute + waitRetryTimeout = 5 * time.Minute ) -func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint, gr schema.GroupResource) error { - waitForScale := kubectl.NewRetryParams(updateRetryInterval, updateRetryTimeout) - waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTImeout) - if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas, gr); err != nil { +func RetryErrorCondition(condition wait.ConditionFunc) wait.ConditionFunc { + return func() (bool, error) { + done, err := condition() + if err != nil && IsRetryableAPIError(err) { + return false, nil + } + return done, err + } +} + +func ScaleResourceWithRetries(scalesGetter scale.ScalesGetter, namespace, name string, size uint, gr schema.GroupResource) error { + scaler := kubectl.NewScaler(scalesGetter) + preconditions := &kubectl.ScalePrecondition{ + Size: -1, + ResourceVersion: "", + } + waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTimeout) + cond := RetryErrorCondition(kubectl.ScaleCondition(scaler, preconditions, namespace, name, size, nil, gr)) + err := wait.PollImmediate(updateRetryInterval, updateRetryTimeout, cond) + if err == nil { + err = kubectl.WaitForScaleHasDesiredReplicas(scalesGetter, gr, name, namespace, size, waitForReplicas) + } + if err != nil { return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err) } return nil