From 5fd19d3766b0bd07d279c943d5b64b233585d434 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Fri, 19 Aug 2016 21:08:23 -0700 Subject: [PATCH] fix RC watch in scale --- pkg/kubectl/scale.go | 115 +++++++++++++++++++++++--------------- pkg/kubectl/scale_test.go | 18 +++--- 2 files changed, 78 insertions(+), 55 deletions(-) diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index a5aca04f00..25b8a0ee5e 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -41,7 +41,7 @@ type Scaler interface { Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error // ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but // a necessary building block for Scale - ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error + ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) } func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { @@ -113,9 +113,12 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams { } // ScaleCondition is a closure around Scale that facilitates retries via util.wait -func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint) wait.ConditionFunc { +func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc { return func() (bool, error) { - err := r.ScaleSimple(namespace, name, precondition, count) + rv, err := r.ScaleSimple(namespace, name, precondition, count) + if updatedResourceVersion != nil { + *updatedResourceVersion = rv + } switch e, _ := err.(ScaleError); err.(type) { case nil: return true, nil @@ -155,24 +158,27 @@ type ReplicationControllerScaler struct { c client.Interface } -func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { +// ScaleSimple does a simple one-shot attempt at scaling. It returns the +// resourceVersion of the replication controller if the update is successful. +func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { controller, err := scaler.c.ReplicationControllers(namespace).Get(name) if err != nil { - return ScaleError{ScaleGetFailure, "Unknown", err} + return "", ScaleError{ScaleGetFailure, "Unknown", err} } if preconditions != nil { if err := preconditions.ValidateReplicationController(controller); err != nil { - return err + return "", err } } controller.Spec.Replicas = int32(newSize) - if _, err := scaler.c.ReplicationControllers(namespace).Update(controller); err != nil { + updatedRC, err := scaler.c.ReplicationControllers(namespace).Update(controller) + if err != nil { if errors.IsConflict(err) { - return ScaleError{ScaleUpdateConflictFailure, controller.ResourceVersion, err} + return "", ScaleError{ScaleUpdateConflictFailure, controller.ResourceVersion, err} } - return ScaleError{ScaleUpdateFailure, controller.ResourceVersion, err} + return "", ScaleError{ScaleUpdateFailure, controller.ResourceVersion, err} } - return nil + return updatedRC.ObjectMeta.ResourceVersion, nil } // Scale updates a ReplicationController to a new size, with optional precondition check (if preconditions is not nil), @@ -186,12 +192,13 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + var updatedResourceVersion string + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, &updatedResourceVersion) if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { - watchOptions := api.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", name), ResourceVersion: "0"} + watchOptions := api.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", name), ResourceVersion: updatedResourceVersion} watcher, err := scaler.c.ReplicationControllers(namespace).Watch(watchOptions) if err != nil { return err @@ -202,6 +209,10 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize } rc := event.Object.(*api.ReplicationController) + if uint(rc.Spec.Replicas) != newSize { + // the size is changed by other party. Don't need to wait for the new change to complete. + return true, nil + } return rc.Status.ObservedGeneration >= rc.Generation && rc.Status.Replicas == rc.Spec.Replicas, nil }) if err == wait.ErrWaitTimeout { @@ -227,24 +238,27 @@ type ReplicaSetScaler struct { c client.ExtensionsInterface } -func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { +// ScaleSimple does a simple one-shot attempt at scaling. It returns the +// resourceVersion of the replicaset if the update is successful. +func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { rs, err := scaler.c.ReplicaSets(namespace).Get(name) if err != nil { - return ScaleError{ScaleGetFailure, "Unknown", err} + return "", ScaleError{ScaleGetFailure, "Unknown", err} } if preconditions != nil { if err := preconditions.ValidateReplicaSet(rs); err != nil { - return err + return "", err } } rs.Spec.Replicas = int32(newSize) - if _, err := scaler.c.ReplicaSets(namespace).Update(rs); err != nil { + updatedRS, err := scaler.c.ReplicaSets(namespace).Update(rs) + if err != nil { if errors.IsConflict(err) { - return ScaleError{ScaleUpdateConflictFailure, rs.ResourceVersion, err} + return "", ScaleError{ScaleUpdateConflictFailure, rs.ResourceVersion, err} } - return ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err} + return "", ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err} } - return nil + return updatedRS.ObjectMeta.ResourceVersion, nil } // Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is @@ -258,7 +272,7 @@ func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, prec // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { return err } @@ -268,6 +282,7 @@ func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, prec return err } err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.ReplicaSetHasDesiredReplicas(scaler.c, rs)) + if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) } @@ -294,24 +309,27 @@ type PetSetScaler struct { c client.AppsInterface } -func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { +// ScaleSimple does a simple one-shot attempt at scaling. It returns the +// resourceVersion of the petset if the update is successful. +func (scaler *PetSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { ps, err := scaler.c.PetSets(namespace).Get(name) if err != nil { - return ScaleError{ScaleGetFailure, "Unknown", err} + return "", ScaleError{ScaleGetFailure, "Unknown", err} } if preconditions != nil { if err := preconditions.ValidatePetSet(ps); err != nil { - return err + return "", err } } ps.Spec.Replicas = int(newSize) - if _, err := scaler.c.PetSets(namespace).Update(ps); err != nil { + updatedPetSet, err := scaler.c.PetSets(namespace).Update(ps) + if err != nil { if errors.IsConflict(err) { - return ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err} + return "", ScaleError{ScaleUpdateConflictFailure, ps.ResourceVersion, err} } - return ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err} + return "", ScaleError{ScaleUpdateFailure, ps.ResourceVersion, err} } - return nil + return updatedPetSet.ResourceVersion, nil } func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { @@ -322,7 +340,7 @@ func (scaler *PetSetScaler) Scale(namespace, name string, newSize uint, precondi // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { return err } @@ -344,26 +362,28 @@ type JobScaler struct { c client.BatchInterface } -// ScaleSimple is responsible for updating job's parallelism. -func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { +// ScaleSimple is responsible for updating job's parallelism. It returns the +// resourceVersion of the job if the update is successful. +func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { job, err := scaler.c.Jobs(namespace).Get(name) if err != nil { - return ScaleError{ScaleGetFailure, "Unknown", err} + return "", ScaleError{ScaleGetFailure, "Unknown", err} } if preconditions != nil { if err := preconditions.ValidateJob(job); err != nil { - return err + return "", err } } parallelism := int32(newSize) job.Spec.Parallelism = ¶llelism - if _, err := scaler.c.Jobs(namespace).Update(job); err != nil { + udpatedJob, err := scaler.c.Jobs(namespace).Update(job) + if err != nil { if errors.IsConflict(err) { - return ScaleError{ScaleUpdateConflictFailure, job.ResourceVersion, err} + return "", ScaleError{ScaleUpdateConflictFailure, job.ResourceVersion, err} } - return ScaleError{ScaleUpdateFailure, job.ResourceVersion, err} + return "", ScaleError{ScaleUpdateFailure, job.ResourceVersion, err} } - return nil + return udpatedJob.ObjectMeta.ResourceVersion, nil } // Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), @@ -377,7 +397,7 @@ func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditio // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { return err } @@ -410,28 +430,31 @@ type DeploymentScaler struct { c client.ExtensionsInterface } -// ScaleSimple is responsible for updating a deployment's desired replicas count. -func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { +// ScaleSimple is responsible for updating a deployment's desired replicas +// count. It returns the resourceVersion of the deployment if the update is +// successful. +func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { deployment, err := scaler.c.Deployments(namespace).Get(name) if err != nil { - return ScaleError{ScaleGetFailure, "Unknown", err} + return "", ScaleError{ScaleGetFailure, "Unknown", err} } if preconditions != nil { if err := preconditions.ValidateDeployment(deployment); err != nil { - return err + return "", err } } // TODO(madhusudancs): Fix this when Scale group issues are resolved (see issue #18528). // For now I'm falling back to regular Deployment update operation. deployment.Spec.Replicas = int32(newSize) - if _, err := scaler.c.Deployments(namespace).Update(deployment); err != nil { + updatedDeployment, err := scaler.c.Deployments(namespace).Update(deployment) + if err != nil { if errors.IsConflict(err) { - return ScaleError{ScaleUpdateConflictFailure, deployment.ResourceVersion, err} + return "", ScaleError{ScaleUpdateConflictFailure, deployment.ResourceVersion, err} } - return ScaleError{ScaleUpdateFailure, deployment.ResourceVersion, err} + return "", ScaleError{ScaleUpdateFailure, deployment.ResourceVersion, err} } - return nil + return updatedDeployment.ObjectMeta.ResourceVersion, nil } // Scale updates a deployment to a new size, with optional precondition check (if preconditions is not nil), @@ -444,7 +467,7 @@ func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, prec // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { return err } diff --git a/pkg/kubectl/scale_test.go b/pkg/kubectl/scale_test.go index 734eb3d2f2..9306a1828e 100644 --- a/pkg/kubectl/scale_test.go +++ b/pkg/kubectl/scale_test.go @@ -66,7 +66,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -75,7 +75,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update conflict failure, got %v", err) } preconditions = ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err = scaleFunc() if err == nil { t.Errorf("Expected error on precondition failure") @@ -90,7 +90,7 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -304,7 +304,7 @@ func TestJobScaleRetry(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -313,7 +313,7 @@ func TestJobScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update failure, got %v", err) } preconditions = ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err = scaleFunc() if err == nil { t.Errorf("Expected error on precondition failure") @@ -348,7 +348,7 @@ func TestJobScaleInvalid(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -553,7 +553,7 @@ func TestDeploymentScaleRetry(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count) + scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass != false { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) @@ -562,7 +562,7 @@ func TestDeploymentScaleRetry(t *testing.T) { t.Errorf("Did not expect an error on update failure, got %v", err) } preconditions = &ScalePrecondition{3, ""} - scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count) + scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil) pass, err = scaleFunc() if err == nil { t.Errorf("Expected error on precondition failure") @@ -597,7 +597,7 @@ func TestDeploymentScaleInvalid(t *testing.T) { name := "foo" namespace := "default" - scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count, nil) pass, err := scaleFunc() if pass { t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)