From fd2ea3ff3109f7685b289a4c7b2634beaea71d97 Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Mon, 20 Nov 2017 18:30:32 +0100 Subject: [PATCH] Make Scale() for RC poll-based until #31345 is fixed --- pkg/kubectl/BUILD | 1 - pkg/kubectl/cmd/apply_test.go | 19 +++++- pkg/kubectl/delete_test.go | 4 +- pkg/kubectl/scale.go | 67 ++++++--------------- test/fixtures/pkg/kubectl/cmd/apply/rc.yaml | 1 + 5 files changed, 40 insertions(+), 52 deletions(-) diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index 02f53b8440..7fe68ccde1 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -173,7 +173,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", diff --git a/pkg/kubectl/cmd/apply_test.go b/pkg/kubectl/cmd/apply_test.go index 08d07dd1e5..5397f8dc27 100644 --- a/pkg/kubectl/cmd/apply_test.go +++ b/pkg/kubectl/cmd/apply_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/http" "os" @@ -1147,7 +1148,7 @@ func TestForceApply(t *testing.T) { pathRC := "/namespaces/test/replicationcontrollers/" + nameRC pathRCList := "/namespaces/test/replicationcontrollers" expected := map[string]int{ - "getOk": 9, + "getOk": 10, "getNotFound": 1, "getList": 1, "patch": 6, @@ -1158,6 +1159,7 @@ func TestForceApply(t *testing.T) { for _, fn := range testingOpenAPISchemaFns { deleted := false + isScaledDownToZero := false counts := map[string]int{} tf := cmdtesting.NewTestFactory() tf.UnstructuredClient = &fake.RESTClient{ @@ -1170,7 +1172,18 @@ func TestForceApply(t *testing.T) { return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte{}))}, nil } counts["getOk"]++ - bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC)) + var bodyRC io.ReadCloser + if isScaledDownToZero { + rcObj := readReplicationControllerFromFile(t, filenameRC) + rcObj.Spec.Replicas = 0 + rcBytes, err := runtime.Encode(testapi.Default.Codec(), rcObj) + if err != nil { + t.Fatal(err) + } + bodyRC = ioutil.NopCloser(bytes.NewReader(rcBytes)) + } else { + bodyRC = ioutil.NopCloser(bytes.NewReader(currentRC)) + } return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil case strings.HasSuffix(p, pathRCList) && m == "GET": counts["getList"]++ @@ -1205,10 +1218,12 @@ func TestForceApply(t *testing.T) { case strings.HasSuffix(p, pathRC) && m == "PUT": counts["put"]++ bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC)) + isScaledDownToZero = true return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil case strings.HasSuffix(p, pathRCList) && m == "POST": counts["post"]++ deleted = false + isScaledDownToZero = false bodyRC := ioutil.NopCloser(bytes.NewReader(currentRC)) return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: bodyRC}, nil default: diff --git a/pkg/kubectl/delete_test.go b/pkg/kubectl/delete_test.go index 379023fc7f..38fb1e0412 100644 --- a/pkg/kubectl/delete_test.go +++ b/pkg/kubectl/delete_test.go @@ -64,7 +64,7 @@ func TestReplicationControllerStop(t *testing.T) { }, }, StopError: nil, - ExpectedActions: []string{"get", "list", "get", "update", "get", "delete"}, + ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, }, { Name: "NoOverlapping", @@ -93,7 +93,7 @@ func TestReplicationControllerStop(t *testing.T) { }, }, StopError: nil, - ExpectedActions: []string{"get", "list", "get", "update", "get", "delete"}, + ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"}, }, { Name: "OverlappingError", diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index d92f21427f..99e4b365db 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -24,10 +24,8 @@ import ( autoscalingapi "k8s.io/api/autoscaling/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" @@ -40,11 +38,14 @@ import ( extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" ) +// TODO: Figure out if we should be waiting on initializers in the Scale() functions below. + // Scaler provides an interface for resources that can be scaled. type Scaler interface { // Scale scales the named resource after checking preconditions. It optionally // retries in the event of resource version mismatch (if retry is not nil), // and optionally waits until the status of the resource matches newSize (if wait is not nil) + // TODO: Make the implementation of this watch-based (#56075) once #31345 is fixed. Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error // ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but // a necessary building block for Scale @@ -199,51 +200,23 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize // Make it try only once, immediately retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } - var updatedResourceVersion string - cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, &updatedResourceVersion) + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { - checkRC := func(rc *api.ReplicationController) bool { - if uint(rc.Spec.Replicas) != newSize { - // the size is changed by other party. Don't need to wait for the new change to complete. - return true - } - return rc.Status.ObservedGeneration >= rc.Generation && rc.Status.Replicas == rc.Spec.Replicas - } - // If number of replicas doesn't change, then the update may not event - // be sent to underlying database (we don't send no-op changes). - // In such case, will have value of the most - // recent update (which may be far in the past) so we may get "too old - // RV" error from watch or potentially no ReplicationController events - // will be deliver, since it may already be in the expected state. - // To protect from these two, we first issue Get() to ensure that we - // are not already in the expected state. - currentRC, err := scaler.c.ReplicationControllers(namespace).Get(name, metav1.GetOptions{}) + rc, err := scaler.c.ReplicationControllers(namespace).Get(name, metav1.GetOptions{}) if err != nil { return err } - if !checkRC(currentRC) { - watchOptions := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(), - ResourceVersion: updatedResourceVersion, - } - watcher, err := scaler.c.ReplicationControllers(namespace).Watch(watchOptions) - if err != nil { - return err - } - _, err = watch.Until(waitForReplicas.Timeout, watcher, func(event watch.Event) (bool, error) { - if event.Type != watch.Added && event.Type != watch.Modified { - return false, nil - } - return checkRC(event.Object.(*api.ReplicationController)), nil - }) - if err == wait.ErrWaitTimeout { - return fmt.Errorf("timed out waiting for %q to be synced", name) - } - return err + if rc.Initializers != nil { + return nil } + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, ControllerHasDesiredReplicas(scaler.c, rc)) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", name) + } + return err } return nil } @@ -299,7 +272,7 @@ func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, prec retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) - if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { @@ -310,7 +283,7 @@ func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, prec if rs.Initializers != nil { return nil } - err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, ReplicaSetHasDesiredReplicas(scaler.c, rs)) + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, ReplicaSetHasDesiredReplicas(scaler.c, rs)) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) @@ -371,7 +344,7 @@ func (scaler *StatefulSetScaler) Scale(namespace, name string, newSize uint, pre retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) - if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { @@ -382,7 +355,7 @@ func (scaler *StatefulSetScaler) Scale(namespace, name string, newSize uint, pre if job.Initializers != nil { return nil } - err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, StatefulSetHasDesiredReplicas(scaler.c, job)) + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, StatefulSetHasDesiredReplicas(scaler.c, job)) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) } @@ -431,7 +404,7 @@ func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditio retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) - if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { @@ -439,7 +412,7 @@ func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditio if err != nil { return err } - err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, JobHasDesiredParallelism(scaler.c, job)) + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, JobHasDesiredParallelism(scaler.c, job)) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) } @@ -502,7 +475,7 @@ func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, prec retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} } cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil) - if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { return err } if waitForReplicas != nil { @@ -510,7 +483,7 @@ func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, prec if err != nil { return err } - err = wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, DeploymentHasDesiredReplicas(scaler.c, deployment)) + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, DeploymentHasDesiredReplicas(scaler.c, deployment)) if err == wait.ErrWaitTimeout { return fmt.Errorf("timed out waiting for %q to be synced", name) } diff --git a/test/fixtures/pkg/kubectl/cmd/apply/rc.yaml b/test/fixtures/pkg/kubectl/cmd/apply/rc.yaml index 1c1bf15be7..24dc67ceb5 100644 --- a/test/fixtures/pkg/kubectl/cmd/apply/rc.yaml +++ b/test/fixtures/pkg/kubectl/cmd/apply/rc.yaml @@ -2,6 +2,7 @@ apiVersion: v1 kind: ReplicationController metadata: name: test-rc + namespace: test labels: name: test-rc spec: