From 1272fda2990b4bafdb72bdbf4acdc400fe1addd1 Mon Sep 17 00:00:00 2001 From: David Eads Date: Tue, 27 Mar 2018 14:44:17 -0400 Subject: [PATCH] make reapers tolerate 404s on scaling down --- pkg/kubectl/delete.go | 11 ++- pkg/kubectl/delete_test.go | 84 ++++++++++---------- pkg/kubectl/scale.go | 54 +++---------- pkg/kubectl/scale_test.go | 21 ++--- staging/src/k8s.io/client-go/scale/client.go | 2 +- 5 files changed, 67 insertions(+), 105 deletions(-) diff --git a/pkg/kubectl/delete.go b/pkg/kubectl/delete.go index 974798a247..310ed8bb02 100644 --- a/pkg/kubectl/delete.go +++ b/pkg/kubectl/delete.go @@ -206,7 +206,7 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout // No overlapping controllers. retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) { return err } } @@ -276,7 +276,7 @@ func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Durati // No overlapping ReplicaSets. retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) { return err } } @@ -340,7 +340,7 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil && !errors.IsNotFound(err) { return err } @@ -368,7 +368,7 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra // TODO: handle overlapping jobs retry := NewRetryParams(reaper.pollInterval, reaper.timeout) waitForJobs := NewRetryParams(reaper.pollInterval, timeout) - if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil { + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) { return err } // at this point only dead pods are left, that should be removed @@ -444,8 +444,7 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati errList := []error{} for _, rs := range rss { if err := rsReaper.Stop(rs.Namespace, rs.Name, timeout, gracePeriod); err != nil { - scaleGetErr, ok := err.(ScaleError) - if errors.IsNotFound(err) || (ok && errors.IsNotFound(scaleGetErr.ActualError)) { + if errors.IsNotFound(err) { continue } errList = append(errList, err) diff --git a/pkg/kubectl/delete_test.go b/pkg/kubectl/delete_test.go index e45f550d4b..6bc06d1404 100644 --- a/pkg/kubectl/delete_test.go +++ b/pkg/kubectl/delete_test.go @@ -210,54 +210,54 @@ func TestReplicationControllerStop(t *testing.T) { } for _, test := range tests { - copiedForWatch := test.Objs[0].DeepCopyObject() - scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil) - fake := fake.NewSimpleClientset(test.Objs...) - fakeWatch := watch.NewFake() - fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil)) + t.Run(test.Name, func(t *testing.T) { + copiedForWatch := test.Objs[0].DeepCopyObject() + scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil) + fake := fake.NewSimpleClientset(test.Objs...) + fakeWatch := watch.NewFake() + fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil)) - go func() { - fakeWatch.Add(copiedForWatch) - }() + go func() { + fakeWatch.Add(copiedForWatch) + }() - reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient} - err := reaper.Stop(ns, name, 0, nil) - if !reflect.DeepEqual(err, test.StopError) { - t.Errorf("%s unexpected error: %v", test.Name, err) - continue - } + reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient} + err := reaper.Stop(ns, name, 0, nil) + if !reflect.DeepEqual(err, test.StopError) { + t.Fatalf("unexpected error: %v", err) + } - actions := fake.Actions() - if len(actions) != len(test.ExpectedActions) { - t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) - continue - } - for i, verb := range test.ExpectedActions { - if actions[i].GetResource().GroupResource() != api.Resource("replicationcontrollers") { - t.Errorf("%s unexpected action: %+v, expected %s-replicationController", test.Name, actions[i], verb) + actions := fake.Actions() + if len(actions) != len(test.ExpectedActions) { + t.Fatalf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ExpectedActions), len(actions)) } - if actions[i].GetVerb() != verb { - t.Errorf("%s unexpected action: %+v, expected %s-replicationController", test.Name, actions[i], verb) - } - } - if test.ScaledDown { - scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name) - if err != nil { - t.Error(err) - } - if scale.Spec.Replicas != 0 { - t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas) - } - actions := scaleClient.Actions() - if len(actions) != len(test.ScaleClientExpectedAction) { - t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions)) - } - for i, verb := range test.ScaleClientExpectedAction { + for i, verb := range test.ExpectedActions { + if actions[i].GetResource().GroupResource() != api.Resource("replicationcontrollers") { + t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb) + } if actions[i].GetVerb() != verb { - t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb) + t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb) } } - } + if test.ScaledDown { + scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name) + if err != nil { + t.Error(err) + } + if scale.Spec.Replicas != 0 { + t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas) + } + actions := scaleClient.Actions() + if len(actions) != len(test.ScaleClientExpectedAction) { + t.Errorf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ScaleClientExpectedAction), len(actions)) + } + for i, verb := range test.ScaleClientExpectedAction { + if actions[i].GetVerb() != verb { + t.Errorf("unexpected action: %+v, expected %s", actions[i].GetVerb(), verb) + } + } + } + }) } } @@ -776,7 +776,7 @@ func TestDeploymentNotFoundError(t *testing.T) { }, ) fake.AddReactor("get", "replicasets", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { - return true, nil, ScaleError{ActualError: errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter")} + return true, nil, errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter") }) reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, nil, schema.GroupResource{}} diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index a60c991f59..bc2eb178aa 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -87,30 +87,6 @@ func (pe PreconditionError) Error() string { return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue) } -type ScaleErrorType int - -const ( - ScaleGetFailure ScaleErrorType = iota - ScaleUpdateFailure - ScaleUpdateConflictFailure -) - -// A ScaleError is returned when a scale request passes -// preconditions but fails to actually scale the controller. -type ScaleError struct { - FailureType ScaleErrorType - ResourceVersion string - ActualError error -} - -func (c ScaleError) Error() string { - msg := fmt.Sprintf("Scaling the resource failed with: %v", c.ActualError) - if len(c.ResourceVersion) > 0 { - msg += fmt.Sprintf("; Current resource version %s", c.ResourceVersion) - } - return msg -} - // RetryParams encapsulates the retry parameters used by kubectl's scaler. type RetryParams struct { Interval, Timeout time.Duration @@ -127,16 +103,14 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s if updatedResourceVersion != nil { *updatedResourceVersion = rv } - switch e, _ := err.(ScaleError); err.(type) { - case nil: - return true, nil - case ScaleError: - // Retry only on update conflicts. - if e.FailureType == ScaleUpdateConflictFailure { - return false, nil - } + // Retry only on update conflicts. + if errors.IsConflict(err) { + return false, nil } - return false, err + if err != nil { + return false, err + } + return true, nil } } @@ -163,7 +137,7 @@ type jobScaler struct { func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{}) if err != nil { - return "", ScaleError{ScaleGetFailure, "", err} + return "", err } if preconditions != nil { if err := preconditions.ValidateJob(job); err != nil { @@ -174,10 +148,7 @@ func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *Scal job.Spec.Parallelism = ¶llelism updatedJob, err := scaler.c.Jobs(namespace).Update(job) if err != nil { - if errors.IsConflict(err) { - return "", ScaleError{ScaleUpdateConflictFailure, job.ResourceVersion, err} - } - return "", ScaleError{ScaleUpdateFailure, job.ResourceVersion, err} + return "", err } return updatedJob.ObjectMeta.ResourceVersion, nil } @@ -234,7 +205,7 @@ var _ Scaler = &genericScaler{} func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) { scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name) if err != nil { - return "", ScaleError{ScaleGetFailure, "", err} + return "", err } if preconditions != nil { if err := preconditions.validate(scale); err != nil { @@ -245,10 +216,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale scale.Spec.Replicas = int32(newSize) updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale) if err != nil { - if errors.IsConflict(err) { - return "", ScaleError{ScaleUpdateConflictFailure, scale.ResourceVersion, err} - } - return "", ScaleError{ScaleUpdateFailure, scale.ResourceVersion, err} + return "", err } return updatedScale.ResourceVersion, nil } diff --git a/pkg/kubectl/scale_test.go b/pkg/kubectl/scale_test.go index e4b8973764..d75b9ac15a 100644 --- a/pkg/kubectl/scale_test.go +++ b/pkg/kubectl/scale_test.go @@ -87,8 +87,7 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } - e, ok := err.(ScaleError) - if err == nil || !ok || e.FailureType != ScaleUpdateFailure { + if err == nil { t.Errorf("Expected error on invalid update failure, got %v", err) } actions := scaleClient.Actions() @@ -252,8 +251,7 @@ func TestJobScaleInvalid(t *testing.T) { if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } - e, ok := err.(ScaleError) - if err == nil || !ok || e.FailureType != ScaleUpdateFailure { + if err == nil { t.Errorf("Expected error on invalid update failure, got %v", err) } } @@ -486,8 +484,7 @@ func TestDeploymentScaleInvalid(t *testing.T) { if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } - e, ok := err.(ScaleError) - if err == nil || !ok || e.FailureType != ScaleUpdateFailure { + if err == nil { t.Errorf("Expected error on invalid update failure, got %v", err) } actions := scaleClient.Actions() @@ -599,8 +596,7 @@ func TestStatefulSetScaleInvalid(t *testing.T) { if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } - e, ok := err.(ScaleError) - if err == nil || !ok || e.FailureType != ScaleUpdateFailure { + if err == nil { t.Errorf("Expected error on invalid update failure, got %v", err) } actions := scaleClient.Actions() @@ -712,8 +708,7 @@ func TestReplicaSetScaleInvalid(t *testing.T) { if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) } - e, ok := err.(ScaleError) - if err == nil || !ok || e.FailureType != ScaleUpdateFailure { + if err == nil { t.Errorf("Expected error on invalid update failure, got %v", err) } actions := scaleClient.Actions() @@ -859,7 +854,7 @@ func TestGenericScale(t *testing.T) { resName: "abc", scaleGetter: scaleClient, }, - // scenario 2: a resource name cannot be empty + //scenario 2: a resource name cannot be empty { name: "a resource name cannot be empty", precondition: ScalePrecondition{10, ""}, @@ -883,8 +878,8 @@ func TestGenericScale(t *testing.T) { } // act - for index, scenario := range scenarios { - t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) { + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { target := NewScaler(scenario.scaleGetter, scenario.targetGR) err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas) diff --git a/staging/src/k8s.io/client-go/scale/client.go b/staging/src/k8s.io/client-go/scale/client.go index e9d75173c8..545b9a1668 100644 --- a/staging/src/k8s.io/client-go/scale/client.go +++ b/staging/src/k8s.io/client-go/scale/client.go @@ -138,7 +138,7 @@ func (c *namespacedScaleClient) Get(resource schema.GroupResource, name string) SubResource("scale"). Do() if err := result.Error(); err != nil { - return nil, fmt.Errorf("could not fetch the scale for %s %s: %v", resource.String(), name, err) + return nil, err } scaleBytes, err := result.Raw()