mirror of https://github.com/k3s-io/k3s
Merge pull request #61790 from deads2k/cli-21-scaler
Automatic merge from submit-queue (batch tested with PRs 61790, 61808, 60339, 61615, 61757). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. make reapers tolerate 404s on scaling down fixes https://github.com/kubernetes/kubernetes/issues/61748 This fixes the scale client to return the actual API error, not a wrapped one. It also updates scalers to do the same. Then it fixes the reapers to tolerate 404s, since that means they achieved their objective. /assign @janetkuo /assign @p0lyn0mial ```release-note NONE ```pull/8/head
commit
dea3c0a610
|
@ -206,7 +206,7 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
|
|||
// No overlapping controllers.
|
||||
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -276,7 +276,7 @@ func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Durati
|
|||
// No overlapping ReplicaSets.
|
||||
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -340,7 +340,7 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat
|
|||
|
||||
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout)
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil {
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -368,7 +368,7 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
|
|||
// TODO: handle overlapping jobs
|
||||
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||
waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
|
||||
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
// at this point only dead pods are left, that should be removed
|
||||
|
@ -444,8 +444,7 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
|
|||
errList := []error{}
|
||||
for _, rs := range rss {
|
||||
if err := rsReaper.Stop(rs.Namespace, rs.Name, timeout, gracePeriod); err != nil {
|
||||
scaleGetErr, ok := err.(ScaleError)
|
||||
if errors.IsNotFound(err) || (ok && errors.IsNotFound(scaleGetErr.ActualError)) {
|
||||
if errors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
errList = append(errList, err)
|
||||
|
|
|
@ -210,54 +210,54 @@ func TestReplicationControllerStop(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, test := range tests {
|
||||
copiedForWatch := test.Objs[0].DeepCopyObject()
|
||||
scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil)
|
||||
fake := fake.NewSimpleClientset(test.Objs...)
|
||||
fakeWatch := watch.NewFake()
|
||||
fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil))
|
||||
t.Run(test.Name, func(t *testing.T) {
|
||||
copiedForWatch := test.Objs[0].DeepCopyObject()
|
||||
scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil)
|
||||
fake := fake.NewSimpleClientset(test.Objs...)
|
||||
fakeWatch := watch.NewFake()
|
||||
fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil))
|
||||
|
||||
go func() {
|
||||
fakeWatch.Add(copiedForWatch)
|
||||
}()
|
||||
go func() {
|
||||
fakeWatch.Add(copiedForWatch)
|
||||
}()
|
||||
|
||||
reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient}
|
||||
err := reaper.Stop(ns, name, 0, nil)
|
||||
if !reflect.DeepEqual(err, test.StopError) {
|
||||
t.Errorf("%s unexpected error: %v", test.Name, err)
|
||||
continue
|
||||
}
|
||||
reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient}
|
||||
err := reaper.Stop(ns, name, 0, nil)
|
||||
if !reflect.DeepEqual(err, test.StopError) {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
actions := fake.Actions()
|
||||
if len(actions) != len(test.ExpectedActions) {
|
||||
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
|
||||
continue
|
||||
}
|
||||
for i, verb := range test.ExpectedActions {
|
||||
if actions[i].GetResource().GroupResource() != api.Resource("replicationcontrollers") {
|
||||
t.Errorf("%s unexpected action: %+v, expected %s-replicationController", test.Name, actions[i], verb)
|
||||
actions := fake.Actions()
|
||||
if len(actions) != len(test.ExpectedActions) {
|
||||
t.Fatalf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ExpectedActions), len(actions))
|
||||
}
|
||||
if actions[i].GetVerb() != verb {
|
||||
t.Errorf("%s unexpected action: %+v, expected %s-replicationController", test.Name, actions[i], verb)
|
||||
}
|
||||
}
|
||||
if test.ScaledDown {
|
||||
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if scale.Spec.Replicas != 0 {
|
||||
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
if len(actions) != len(test.ScaleClientExpectedAction) {
|
||||
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
|
||||
}
|
||||
for i, verb := range test.ScaleClientExpectedAction {
|
||||
for i, verb := range test.ExpectedActions {
|
||||
if actions[i].GetResource().GroupResource() != api.Resource("replicationcontrollers") {
|
||||
t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb)
|
||||
}
|
||||
if actions[i].GetVerb() != verb {
|
||||
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
|
||||
t.Errorf("unexpected action: %+v, expected %s-replicationController", actions[i], verb)
|
||||
}
|
||||
}
|
||||
}
|
||||
if test.ScaledDown {
|
||||
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if scale.Spec.Replicas != 0 {
|
||||
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
if len(actions) != len(test.ScaleClientExpectedAction) {
|
||||
t.Errorf("unexpected actions: %v, expected %d actions got %d", actions, len(test.ScaleClientExpectedAction), len(actions))
|
||||
}
|
||||
for i, verb := range test.ScaleClientExpectedAction {
|
||||
if actions[i].GetVerb() != verb {
|
||||
t.Errorf("unexpected action: %+v, expected %s", actions[i].GetVerb(), verb)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -776,7 +776,7 @@ func TestDeploymentNotFoundError(t *testing.T) {
|
|||
},
|
||||
)
|
||||
fake.AddReactor("get", "replicasets", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
|
||||
return true, nil, ScaleError{ActualError: errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter")}
|
||||
return true, nil, errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter")
|
||||
})
|
||||
|
||||
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, nil, schema.GroupResource{}}
|
||||
|
|
|
@ -87,30 +87,6 @@ func (pe PreconditionError) Error() string {
|
|||
return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
|
||||
}
|
||||
|
||||
type ScaleErrorType int
|
||||
|
||||
const (
|
||||
ScaleGetFailure ScaleErrorType = iota
|
||||
ScaleUpdateFailure
|
||||
ScaleUpdateConflictFailure
|
||||
)
|
||||
|
||||
// A ScaleError is returned when a scale request passes
|
||||
// preconditions but fails to actually scale the controller.
|
||||
type ScaleError struct {
|
||||
FailureType ScaleErrorType
|
||||
ResourceVersion string
|
||||
ActualError error
|
||||
}
|
||||
|
||||
func (c ScaleError) Error() string {
|
||||
msg := fmt.Sprintf("Scaling the resource failed with: %v", c.ActualError)
|
||||
if len(c.ResourceVersion) > 0 {
|
||||
msg += fmt.Sprintf("; Current resource version %s", c.ResourceVersion)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
// RetryParams encapsulates the retry parameters used by kubectl's scaler.
|
||||
type RetryParams struct {
|
||||
Interval, Timeout time.Duration
|
||||
|
@ -127,16 +103,14 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
|
|||
if updatedResourceVersion != nil {
|
||||
*updatedResourceVersion = rv
|
||||
}
|
||||
switch e, _ := err.(ScaleError); err.(type) {
|
||||
case nil:
|
||||
return true, nil
|
||||
case ScaleError:
|
||||
// Retry only on update conflicts.
|
||||
if e.FailureType == ScaleUpdateConflictFailure {
|
||||
return false, nil
|
||||
}
|
||||
// Retry only on update conflicts.
|
||||
if errors.IsConflict(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,7 +137,7 @@ type jobScaler struct {
|
|||
func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
|
||||
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return "", ScaleError{ScaleGetFailure, "", err}
|
||||
return "", err
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.ValidateJob(job); err != nil {
|
||||
|
@ -174,10 +148,7 @@ func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *Scal
|
|||
job.Spec.Parallelism = ¶llelism
|
||||
updatedJob, err := scaler.c.Jobs(namespace).Update(job)
|
||||
if err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
return "", ScaleError{ScaleUpdateConflictFailure, job.ResourceVersion, err}
|
||||
}
|
||||
return "", ScaleError{ScaleUpdateFailure, job.ResourceVersion, err}
|
||||
return "", err
|
||||
}
|
||||
return updatedJob.ObjectMeta.ResourceVersion, nil
|
||||
}
|
||||
|
@ -234,7 +205,7 @@ var _ Scaler = &genericScaler{}
|
|||
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) {
|
||||
scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name)
|
||||
if err != nil {
|
||||
return "", ScaleError{ScaleGetFailure, "", err}
|
||||
return "", err
|
||||
}
|
||||
if preconditions != nil {
|
||||
if err := preconditions.validate(scale); err != nil {
|
||||
|
@ -245,10 +216,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
|
|||
scale.Spec.Replicas = int32(newSize)
|
||||
updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale)
|
||||
if err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
return "", ScaleError{ScaleUpdateConflictFailure, scale.ResourceVersion, err}
|
||||
}
|
||||
return "", ScaleError{ScaleUpdateFailure, scale.ResourceVersion, err}
|
||||
return "", err
|
||||
}
|
||||
return updatedScale.ResourceVersion, nil
|
||||
}
|
||||
|
|
|
@ -87,8 +87,7 @@ func TestReplicationControllerScaleInvalid(t *testing.T) {
|
|||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
}
|
||||
e, ok := err.(ScaleError)
|
||||
if err == nil || !ok || e.FailureType != ScaleUpdateFailure {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error on invalid update failure, got %v", err)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
|
@ -252,8 +251,7 @@ func TestJobScaleInvalid(t *testing.T) {
|
|||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
}
|
||||
e, ok := err.(ScaleError)
|
||||
if err == nil || !ok || e.FailureType != ScaleUpdateFailure {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error on invalid update failure, got %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -486,8 +484,7 @@ func TestDeploymentScaleInvalid(t *testing.T) {
|
|||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
}
|
||||
e, ok := err.(ScaleError)
|
||||
if err == nil || !ok || e.FailureType != ScaleUpdateFailure {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error on invalid update failure, got %v", err)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
|
@ -599,8 +596,7 @@ func TestStatefulSetScaleInvalid(t *testing.T) {
|
|||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
}
|
||||
e, ok := err.(ScaleError)
|
||||
if err == nil || !ok || e.FailureType != ScaleUpdateFailure {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error on invalid update failure, got %v", err)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
|
@ -712,8 +708,7 @@ func TestReplicaSetScaleInvalid(t *testing.T) {
|
|||
if pass {
|
||||
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
|
||||
}
|
||||
e, ok := err.(ScaleError)
|
||||
if err == nil || !ok || e.FailureType != ScaleUpdateFailure {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error on invalid update failure, got %v", err)
|
||||
}
|
||||
actions := scaleClient.Actions()
|
||||
|
@ -859,7 +854,7 @@ func TestGenericScale(t *testing.T) {
|
|||
resName: "abc",
|
||||
scaleGetter: scaleClient,
|
||||
},
|
||||
// scenario 2: a resource name cannot be empty
|
||||
//scenario 2: a resource name cannot be empty
|
||||
{
|
||||
name: "a resource name cannot be empty",
|
||||
precondition: ScalePrecondition{10, ""},
|
||||
|
@ -883,8 +878,8 @@ func TestGenericScale(t *testing.T) {
|
|||
}
|
||||
|
||||
// act
|
||||
for index, scenario := range scenarios {
|
||||
t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) {
|
||||
for _, scenario := range scenarios {
|
||||
t.Run(scenario.name, func(t *testing.T) {
|
||||
target := NewScaler(scenario.scaleGetter, scenario.targetGR)
|
||||
|
||||
err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas)
|
||||
|
|
|
@ -138,7 +138,7 @@ func (c *namespacedScaleClient) Get(resource schema.GroupResource, name string)
|
|||
SubResource("scale").
|
||||
Do()
|
||||
if err := result.Error(); err != nil {
|
||||
return nil, fmt.Errorf("could not fetch the scale for %s %s: %v", resource.String(), name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
scaleBytes, err := result.Raw()
|
||||
|
|
Loading…
Reference in New Issue