From 48775319d937cca12d2a8770a3f72d73c205b4c6 Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Wed, 16 Sep 2015 17:32:59 +0200 Subject: [PATCH] Reaper and scaler for jobs --- pkg/client/unversioned/conditions.go | 21 ++ pkg/kubectl/cmd/util/factory.go | 2 +- pkg/kubectl/rolling_updater.go | 6 +- pkg/kubectl/scale.go | 123 ++++++--- pkg/kubectl/scale_test.go | 274 +++++++++++++++++++-- pkg/kubectl/stop.go | 36 ++- pkg/kubectl/stop_test.go | 73 ++++++ test/e2e/util.go | 2 +- test/integration/framework/master_utils.go | 2 +- 9 files changed, 473 insertions(+), 66 deletions(-) diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index c6901a37cb..cc6fed4d9d 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -18,6 +18,7 @@ package unversioned import ( "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/util/wait" ) @@ -41,3 +42,23 @@ func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationContro return ctrl.Status.ObservedGeneration >= desiredGeneration && ctrl.Status.Replicas == ctrl.Spec.Replicas, nil } } + +// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count +// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. +func JobHasDesiredParallelism(c Interface, job *experimental.Job) wait.ConditionFunc { + + return func() (bool, error) { + job, err := c.Experimental().Jobs(job.Namespace).Get(job.Name) + if err != nil { + return false, err + } + + // desired parallelism can be either the exact number, in which case return immediately + if job.Status.Active == *job.Spec.Parallelism { + return true, nil + } + // otherwise count successful + progress := *job.Spec.Completions - job.Status.Active - job.Status.Successful + return progress == 0, nil + } +} diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index 06f90b4551..c09f4bb436 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -212,7 +212,7 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) *Factory { if err != nil { return nil, err } - return kubectl.ScalerFor(mapping.Kind, kubectl.NewScalerClient(client)) + return kubectl.ScalerFor(mapping.Kind, client) }, Reaper: func(mapping *meta.RESTMapping) (kubectl.Reaper, error) { client, err := clients.ClientForVersion(mapping.APIVersion) diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 0d86b4294d..25ae11e573 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -340,8 +340,7 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi // scalerScaleAndWait scales a controller using a Scaler and a real client. func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) { - scalerClient := NewScalerClient(r.c) - scaler, err := ScalerFor("ReplicationController", scalerClient) + scaler, err := ScalerFor("ReplicationController", r.c) if err != nil { return nil, fmt.Errorf("Couldn't make scaler: %s", err) } @@ -452,8 +451,7 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl if err != nil { return err } - scalerClient := NewScalerClient(r.c) - if err = wait.Poll(config.Interval, config.Timeout, scalerClient.ControllerHasDesiredReplicas(newRc)); err != nil { + if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil { return err } newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name) diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index d16540147c..98ca4c7419 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/util/wait" ) @@ -70,8 +71,8 @@ func (c ControllerScaleError) Error() string { c.ActualError, c.ResourceVersion) } -// Validate ensures that the preconditions match. Returns nil if they are valid, an error otherwise -func (precondition *ScalePrecondition) Validate(controller *api.ReplicationController) error { +// ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise +func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error { if precondition.Size != -1 && controller.Spec.Replicas != precondition.Size { return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(controller.Spec.Replicas)} } @@ -81,6 +82,20 @@ func (precondition *ScalePrecondition) Validate(controller *api.ReplicationContr return nil } +// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise +func (precondition *ScalePrecondition) ValidateJob(job *experimental.Job) error { + if precondition.Size != -1 && job.Spec.Parallelism == nil { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} + } + if precondition.Size != -1 && *job.Spec.Parallelism != precondition.Size { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(*job.Spec.Parallelism)} + } + if precondition.ResourceVersion != "" && job.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} + } + return nil +} + type Scaler interface { // Scale scales the named resource after checking preconditions. It optionally // retries in the event of resource version mismatch (if retry is not nil), @@ -88,19 +103,24 @@ type Scaler interface { Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error // ScaleSimple does a simple one-shot attempt at scaling - not useful on it's own, but // a necessary building block for Scale - ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) + ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error } -func ScalerFor(kind string, c ScalerClient) (Scaler, error) { +func ScalerFor(kind string, c client.Interface) (Scaler, error) { switch kind { case "ReplicationController": return &ReplicationControllerScaler{c}, nil + case "Job": + return &JobScaler{c}, nil } return nil, fmt.Errorf("no scaler has been implemented for %q", kind) } type ReplicationControllerScaler struct { - c ScalerClient + c client.Interface +} +type JobScaler struct { + c client.Interface } // RetryParams encapsulates the retry parameters used by kubectl's scaler. @@ -115,7 +135,7 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams { // ScaleCondition is a closure around Scale that facilitates retries via util.wait func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint) wait.ConditionFunc { return func() (bool, error) { - _, err := r.ScaleSimple(namespace, name, precondition, count) + err := r.ScaleSimple(namespace, name, precondition, count) switch e, _ := err.(ControllerScaleError); err.(type) { case nil: return true, nil @@ -132,26 +152,26 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s } } -func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { - controller, err := scaler.c.GetReplicationController(namespace, name) +func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + controller, err := scaler.c.ReplicationControllers(namespace).Get(name) if err != nil { - return "", ControllerScaleError{ControllerScaleGetFailure, "Unknown", err} + return ControllerScaleError{ControllerScaleGetFailure, "Unknown", err} } if preconditions != nil { - if err := preconditions.Validate(controller); err != nil { - return "", err + if err := preconditions.ValidateReplicationController(controller); err != nil { + return err } } controller.Spec.Replicas = int(newSize) // TODO: do retry on 409 errors here? - if _, err := scaler.c.UpdateReplicationController(namespace, controller); err != nil { + if _, err := scaler.c.ReplicationControllers(namespace).Update(controller); err != nil { if errors.IsInvalid(err) { - return "", ControllerScaleError{ControllerScaleUpdateInvalidFailure, controller.ResourceVersion, err} + return ControllerScaleError{ControllerScaleUpdateInvalidFailure, controller.ResourceVersion, err} } - return "", ControllerScaleError{ControllerScaleUpdateFailure, controller.ResourceVersion, err} + return ControllerScaleError{ControllerScaleUpdateFailure, controller.ResourceVersion, err} } // TODO: do a better job of printing objects here. - return "scaled", nil + return nil } // Scale updates a ReplicationController to a new size, with optional precondition check (if preconditions is not nil), @@ -170,40 +190,61 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize return err } if waitForReplicas != nil { - rc, err := scaler.c.GetReplicationController(namespace, name) + rc, err := scaler.c.ReplicationControllers(namespace).Get(name) if err != nil { return err } return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, - scaler.c.ControllerHasDesiredReplicas(rc)) + client.ControllerHasDesiredReplicas(scaler.c, rc)) } return nil } -// ScalerClient abstracts access to ReplicationControllers. -type ScalerClient interface { - GetReplicationController(namespace, name string) (*api.ReplicationController, error) - UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) - ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc +// ScaleSimple is responsible for updating job's parallelism. +func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + job, err := scaler.c.Experimental().Jobs(namespace).Get(name) + if err != nil { + return ControllerScaleError{ControllerScaleGetFailure, "Unknown", err} + } + if preconditions != nil { + if err := preconditions.ValidateJob(job); err != nil { + return err + } + } + parallelism := int(newSize) + job.Spec.Parallelism = ¶llelism + if _, err := scaler.c.Experimental().Jobs(namespace).Update(job); err != nil { + if errors.IsInvalid(err) { + return ControllerScaleError{ControllerScaleUpdateInvalidFailure, job.ResourceVersion, err} + } + return ControllerScaleError{ControllerScaleUpdateFailure, job.ResourceVersion, err} + + } + return nil } -func NewScalerClient(c client.Interface) ScalerClient { - return &realScalerClient{c} -} - -// realScalerClient is a ScalerClient which uses a Kube client. -type realScalerClient struct { - client client.Interface -} - -func (c *realScalerClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) { - return c.client.ReplicationControllers(namespace).Get(name) -} - -func (c *realScalerClient) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) { - return c.client.ReplicationControllers(namespace).Update(rc) -} - -func (c *realScalerClient) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc { - return client.ControllerHasDesiredReplicas(c.client, rc) +// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), +// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired +// number, which can be less than requested based on job's current progress. +func (scaler *JobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + job, err := scaler.c.Experimental().Jobs(namespace).Get(name) + if err != nil { + return err + } + return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, + client.JobHasDesiredParallelism(scaler.c, job)) + } + return nil } diff --git a/pkg/kubectl/scale_test.go b/pkg/kubectl/scale_test.go index 6516ab7803..21015010c7 100644 --- a/pkg/kubectl/scale_test.go +++ b/pkg/kubectl/scale_test.go @@ -22,45 +22,74 @@ import ( "k8s.io/kubernetes/pkg/api" kerrors "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" ) type ErrorReplicationControllers struct { testclient.FakeReplicationControllers + invalid bool } func (c *ErrorReplicationControllers) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { + if c.invalid { + return nil, kerrors.NewInvalid(controller.Kind, controller.Name, nil) + } return nil, errors.New("Replication controller update failure") } type ErrorReplicationControllerClient struct { testclient.Fake + invalid bool } func (c *ErrorReplicationControllerClient) ReplicationControllers(namespace string) client.ReplicationControllerInterface { - return &ErrorReplicationControllers{testclient.FakeReplicationControllers{Fake: &c.Fake, Namespace: namespace}} + return &ErrorReplicationControllers{testclient.FakeReplicationControllers{Fake: &c.Fake, Namespace: namespace}, c.invalid} } -type InvalidReplicationControllers struct { - testclient.FakeReplicationControllers +type ErrorJobs struct { + testclient.FakeJobs + invalid bool } -func (c *InvalidReplicationControllers) Update(controller *api.ReplicationController) (*api.ReplicationController, error) { - return nil, kerrors.NewInvalid(controller.Kind, controller.Name, nil) +func (c *ErrorJobs) Update(job *experimental.Job) (*experimental.Job, error) { + if c.invalid { + return nil, kerrors.NewInvalid(job.Kind, job.Name, nil) + } + return nil, errors.New("Job update failure") } -type InvalidReplicationControllerClient struct { +func (c *ErrorJobs) Get(name string) (*experimental.Job, error) { + zero := 0 + return &experimental.Job{ + Spec: experimental.JobSpec{ + Parallelism: &zero, + }, + }, nil +} + +type ErrorJobClient struct { + testclient.FakeExperimental + invalid bool +} + +func (c *ErrorJobClient) Jobs(namespace string) client.JobInterface { + return &ErrorJobs{testclient.FakeJobs{Fake: &c.FakeExperimental, Namespace: namespace}, c.invalid} +} + +type ErrorExperimentalClient struct { testclient.Fake + invalid bool } -func (c *InvalidReplicationControllerClient) ReplicationControllers(namespace string) client.ReplicationControllerInterface { - return &InvalidReplicationControllers{testclient.FakeReplicationControllers{Fake: &c.Fake, Namespace: namespace}} +func (c *ErrorExperimentalClient) Experimental() client.ExperimentalInterface { + return &ErrorJobClient{testclient.FakeExperimental{&c.Fake}, c.invalid} } func TestReplicationControllerScaleRetry(t *testing.T) { - fake := &ErrorReplicationControllerClient{Fake: testclient.Fake{}} - scaler := ReplicationControllerScaler{NewScalerClient(fake)} + fake := &ErrorReplicationControllerClient{Fake: testclient.Fake{}, invalid: false} + scaler := ReplicationControllerScaler{fake} preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" @@ -83,8 +112,8 @@ func TestReplicationControllerScaleRetry(t *testing.T) { } func TestReplicationControllerScaleInvalid(t *testing.T) { - fake := &InvalidReplicationControllerClient{Fake: testclient.Fake{}} - scaler := ReplicationControllerScaler{NewScalerClient(fake)} + fake := &ErrorReplicationControllerClient{Fake: testclient.Fake{}, invalid: true} + scaler := ReplicationControllerScaler{fake} preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" @@ -103,7 +132,7 @@ func TestReplicationControllerScaleInvalid(t *testing.T) { func TestReplicationControllerScale(t *testing.T) { fake := &testclient.Fake{} - scaler := ReplicationControllerScaler{NewScalerClient(fake)} + scaler := ReplicationControllerScaler{fake} preconditions := ScalePrecondition{-1, ""} count := uint(3) name := "foo" @@ -127,7 +156,7 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) { Replicas: 10, }, }) - scaler := ReplicationControllerScaler{NewScalerClient(fake)} + scaler := ReplicationControllerScaler{fake} preconditions := ScalePrecondition{2, ""} count := uint(3) name := "foo" @@ -135,14 +164,14 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) { actions := fake.Actions() if len(actions) != 1 { - t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions) + t.Errorf("unexpected actions: %v, expected 1 action (get)", actions) } if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "replicationcontrollers" || action.GetName() != name { t.Errorf("unexpected action: %v, expected get-replicationController %s", actions[0], name) } } -func TestPreconditionValidate(t *testing.T) { +func TestValidateReplicationController(t *testing.T) { tests := []struct { preconditions ScalePrecondition controller api.ReplicationController @@ -247,7 +276,218 @@ func TestPreconditionValidate(t *testing.T) { }, } for _, test := range tests { - err := test.preconditions.Validate(&test.controller) + err := test.preconditions.ValidateReplicationController(&test.controller) + if err != nil && !test.expectError { + t.Errorf("unexpected error: %v (%s)", err, test.test) + } + if err == nil && test.expectError { + t.Errorf("unexpected non-error: %v (%s)", err, test.test) + } + } +} + +func TestJobScaleRetry(t *testing.T) { + fake := &ErrorExperimentalClient{Fake: testclient.Fake{}, invalid: false} + scaler := JobScaler{fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + pass, err := scaleFunc() + if pass != false { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err != nil { + t.Errorf("Did not expect an error on update failure, got %v", err) + } + preconditions = ScalePrecondition{3, ""} + scaleFunc = ScaleCondition(&scaler, &preconditions, namespace, name, count) + pass, err = scaleFunc() + if err == nil { + t.Errorf("Expected error on precondition failure") + } +} + +func TestJobScale(t *testing.T) { + fake := &testclient.Fake{} + scaler := JobScaler{fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fake.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions) + } + if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "jobs" || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-replicationController %s", actions[0], name) + } + if action, ok := actions[1].(testclient.UpdateAction); !ok || action.GetResource() != "jobs" || *action.GetObject().(*experimental.Job).Spec.Parallelism != int(count) { + t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count) + } +} + +func TestJobScaleInvalid(t *testing.T) { + fake := &ErrorExperimentalClient{Fake: testclient.Fake{}, invalid: true} + scaler := JobScaler{fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := ScaleCondition(&scaler, &preconditions, namespace, name, count) + pass, err := scaleFunc() + if pass { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + e, ok := err.(ControllerScaleError) + if err == nil || !ok || e.FailureType != ControllerScaleUpdateInvalidFailure { + t.Errorf("Expected error on invalid update failure, got %v", err) + } +} + +func TestJobScaleFailsPreconditions(t *testing.T) { + ten := 10 + fake := testclient.NewSimpleFake(&experimental.Job{ + Spec: experimental.JobSpec{ + Parallelism: &ten, + }, + }) + scaler := JobScaler{fake} + preconditions := ScalePrecondition{2, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fake.Actions() + if len(actions) != 1 { + t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions) + } + if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "jobs" || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) + } +} + +func TestValidateJob(t *testing.T) { + zero, ten, twenty := 0, 10, 20 + tests := []struct { + preconditions ScalePrecondition + job experimental.Job + expectError bool + test string + }{ + { + preconditions: ScalePrecondition{-1, ""}, + expectError: false, + test: "defaults", + }, + { + preconditions: ScalePrecondition{-1, ""}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: experimental.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "defaults 2", + }, + { + preconditions: ScalePrecondition{0, ""}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: experimental.JobSpec{ + Parallelism: &zero, + }, + }, + expectError: false, + test: "size matches", + }, + { + preconditions: ScalePrecondition{-1, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: experimental.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "resource version matches", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: experimental.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "both match", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: experimental.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "size different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "foo", + }, + }, + expectError: true, + test: "parallelism nil", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: experimental.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: true, + test: "version different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: experimental.Job{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: experimental.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "both different", + }, + } + for _, test := range tests { + err := test.preconditions.ValidateJob(&test.job) if err != nil && !test.expectError { t.Errorf("unexpected error: %v (%s)", err, test.test) } diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index fa4a371c65..88333dd8ce 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -65,6 +65,8 @@ func ReaperFor(kind string, c client.Interface) (Reaper, error) { return &PodReaper{c}, nil case "Service": return &ServiceReaper{c}, nil + case "Job": + return &JobReaper{c, Interval, Timeout}, nil } return nil, &NoSuchReaperError{kind} } @@ -81,6 +83,10 @@ type DaemonSetReaper struct { client.Interface pollInterval, timeout time.Duration } +type JobReaper struct { + client.Interface + pollInterval, timeout time.Duration +} type PodReaper struct { client.Interface } @@ -112,7 +118,7 @@ func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api. func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) (string, error) { rc := reaper.ReplicationControllers(namespace) - scaler, err := ScalerFor("ReplicationController", NewScalerClient(*reaper)) + scaler, err := ScalerFor("ReplicationController", *reaper) if err != nil { return "", err } @@ -223,6 +229,34 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio return fmt.Sprintf("%s stopped", name), nil } +func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) (string, error) { + jobs := reaper.Experimental().Jobs(namespace) + scaler, err := ScalerFor("Job", *reaper) + if err != nil { + return "", err + } + job, err := jobs.Get(name) + if err != nil { + return "", err + } + if timeout == 0 { + // we will never have more active pods than job.Spec.Parallelism + parallelism := *job.Spec.Parallelism + timeout = Timeout + time.Duration(10*parallelism)*time.Second + } + + // TODO: handle overlapping jobs + retry := NewRetryParams(reaper.pollInterval, reaper.timeout) + waitForJobs := NewRetryParams(reaper.pollInterval, timeout) + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil { + return "", err + } + if err := jobs.Delete(name, gracePeriod); err != nil { + return "", err + } + return fmt.Sprintf("%s stopped", name), nil +} + func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) (string, error) { pods := reaper.Pods(namespace) _, err := pods.Get(name) diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 705d7def23..3e6a85e188 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -23,6 +23,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/runtime" @@ -271,6 +272,78 @@ func TestReplicationControllerStop(t *testing.T) { } } +func TestJobStop(t *testing.T) { + name := "foo" + ns := "default" + zero := 0 + tests := []struct { + Name string + Objs []runtime.Object + StopError error + StopMessage string + ExpectedActions []string + }{ + { + Name: "OnlyOneJob", + Objs: []runtime.Object{ + &experimental.Job{ // GET + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: experimental.JobSpec{ + Parallelism: &zero, + Selector: map[string]string{"k1": "v1"}}, + }, + &experimental.JobList{ // LIST + Items: []experimental.Job{ + { + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: experimental.JobSpec{ + Parallelism: &zero, + Selector: map[string]string{"k1": "v1"}}, + }, + }, + }, + }, + StopError: nil, + StopMessage: "foo stopped", + ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, + }, + } + + for _, test := range tests { + fake := testclient.NewSimpleFake(test.Objs...) + reaper := JobReaper{fake, time.Millisecond, time.Millisecond} + s, err := reaper.Stop(ns, name, 0, nil) + if !reflect.DeepEqual(err, test.StopError) { + t.Errorf("%s unexpected error: %v", test.Name, err) + continue + } + + if s != test.StopMessage { + t.Errorf("%s expected '%s', got '%s'", test.Name, test.StopMessage, s) + continue + } + actions := fake.Actions() + if len(actions) != len(test.ExpectedActions) { + t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) + continue + } + for i, verb := range test.ExpectedActions { + if actions[i].GetResource() != "jobs" { + t.Errorf("%s unexpected action: %+v, expected %s-job", test.Name, actions[i], verb) + } + if actions[i].GetVerb() != verb { + t.Errorf("%s unexpected action: %+v, expected %s-job", test.Name, actions[i], verb) + } + } + } +} + type noSuchPod struct { *testclient.FakePods } diff --git a/test/e2e/util.go b/test/e2e/util.go index a3ddbe8712..293769f128 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1427,7 +1427,7 @@ func getNodeEvents(c *client.Client, nodeName string) []api.Event { func ScaleRC(c *client.Client, ns, name string, size uint, wait bool) error { By(fmt.Sprintf("%v Scaling replication controller %s in namespace %s to %d", time.Now(), name, ns, size)) - scaler, err := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(c)) + scaler, err := kubectl.ScalerFor("ReplicationController", c) if err != nil { return err } diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 604186c31e..bb95339349 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -208,7 +208,7 @@ func StopRC(rc *api.ReplicationController, restClient *client.Client) error { // ScaleRC scales the given rc to the given replicas. func ScaleRC(name, ns string, replicas int, restClient *client.Client) (*api.ReplicationController, error) { - scaler, err := kubectl.ScalerFor("ReplicationController", kubectl.NewScalerClient(restClient)) + scaler, err := kubectl.ScalerFor("ReplicationController", restClient) if err != nil { return nil, err }