diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 648ab62cd6..4226ef5d4f 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -27,7 +27,9 @@ import ( "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" ) // Scaler provides an interface for resources that can be scaled. @@ -171,15 +173,23 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) - if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { - rc, err := scaler.c.ReplicationControllers(namespace).Get(name) + watchOptions := api.ListOptions{FieldSelector: fields.OneTermEqualSelector("metadata.name", name), ResourceVersion: "0"} + watcher, err := scaler.c.ReplicationControllers(namespace).Watch(watchOptions) if err != nil { return err } - err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, client.ControllerHasDesiredReplicas(scaler.c, rc)) + _, err = watch.Until(waitForReplicas.Timeout, watcher, func(event watch.Event) (bool, error) { + if event.Type != watch.Added && event.Type != watch.Modified { + return false, nil + } + + rc := event.Object.(*api.ReplicationController) + return rc.Status.ObservedGeneration >= rc.Generation && rc.Status.Replicas == rc.Spec.Replicas, nil + }) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) } diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 05b5955b94..aa116f3c1f 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/testclient" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" ) func TestReplicationControllerStop(t *testing.T) { @@ -69,7 +70,7 @@ func TestReplicationControllerStop(t *testing.T) { }, }, StopError: nil, - ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, + ExpectedActions: []string{"get", "list", "get", "update", "watch", "delete"}, }, { Name: "NoOverlapping", @@ -107,7 +108,7 @@ func TestReplicationControllerStop(t *testing.T) { }, }, StopError: nil, - ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, + ExpectedActions: []string{"get", "list", "get", "update", "watch", "delete"}, }, { Name: "OverlappingError", @@ -242,9 +243,20 @@ func TestReplicationControllerStop(t *testing.T) { } for _, test := range tests { + copiedForWatch, err := api.Scheme.Copy(test.Objs[0]) + if err != nil { + t.Fatalf("%s unexpected error: %v", test.Name, err) + } fake := testclient.NewSimpleFake(test.Objs...) + fakeWatch := watch.NewFake() + fake.PrependWatchReactor("replicationcontrollers", testclient.DefaultWatchReactor(fakeWatch, nil)) + + go func() { + fakeWatch.Add(copiedForWatch) + }() + reaper := ReplicationControllerReaper{fake, time.Millisecond, time.Millisecond} - err := reaper.Stop(ns, name, 0, nil) + err = reaper.Stop(ns, name, 0, nil) if !reflect.DeepEqual(err, test.StopError) { t.Errorf("%s unexpected error: %v", test.Name, err) continue