From 1fec6980b6cb733b75cee3cdc8be7065e2d2442b Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 4 Apr 2018 08:57:07 -0400 Subject: [PATCH] segregate job scaling from everything else --- pkg/kubectl/BUILD | 2 + pkg/kubectl/cmd/BUILD | 3 + pkg/kubectl/cmd/scale.go | 71 ++++-- pkg/kubectl/cmd/scalejob/BUILD | 47 ++++ pkg/kubectl/cmd/scalejob/doc.go | 18 ++ pkg/kubectl/cmd/scalejob/scalejob.go | 162 ++++++++++++ pkg/kubectl/cmd/scalejob/scalejob_test.go | 292 ++++++++++++++++++++++ pkg/kubectl/conditions.go | 26 -- pkg/kubectl/delete.go | 58 +++++ pkg/kubectl/delete_test.go | 98 ++++++++ pkg/kubectl/scale.go | 15 -- 11 files changed, 734 insertions(+), 58 deletions(-) create mode 100644 pkg/kubectl/cmd/scalejob/BUILD create mode 100644 pkg/kubectl/cmd/scalejob/doc.go create mode 100644 pkg/kubectl/cmd/scalejob/scalejob.go create mode 100644 pkg/kubectl/cmd/scalejob/scalejob_test.go diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index 4d50353475..f6cd4a553f 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -40,6 +40,7 @@ go_test( "//pkg/api/legacyscheme:go_default_library", "//pkg/api/testapi:go_default_library", "//pkg/api/testing:go_default_library", + "//pkg/apis/batch:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", @@ -135,6 +136,7 @@ go_library( "//pkg/controller/statefulset:go_default_library", "//pkg/credentialprovider:go_default_library", "//pkg/kubectl/apps:go_default_library", + "//pkg/kubectl/cmd/scalejob:go_default_library", "//pkg/kubectl/resource:go_default_library", "//pkg/kubectl/util:go_default_library", "//pkg/kubectl/util/hash:go_default_library", diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index ab1708b29b..c77ee06570 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -75,6 +75,7 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/apis/core/validation:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/kubectl:go_default_library", "//pkg/kubectl/apply/parse:go_default_library", @@ -83,6 +84,7 @@ go_library( "//pkg/kubectl/cmd/config:go_default_library", "//pkg/kubectl/cmd/resource:go_default_library", "//pkg/kubectl/cmd/rollout:go_default_library", + "//pkg/kubectl/cmd/scalejob:go_default_library", "//pkg/kubectl/cmd/set:go_default_library", "//pkg/kubectl/cmd/templates:go_default_library", "//pkg/kubectl/cmd/util:go_default_library", @@ -287,6 +289,7 @@ filegroup( "//pkg/kubectl/cmd/config:all-srcs", "//pkg/kubectl/cmd/resource:all-srcs", "//pkg/kubectl/cmd/rollout:all-srcs", + "//pkg/kubectl/cmd/scalejob:all-srcs", "//pkg/kubectl/cmd/set:all-srcs", "//pkg/kubectl/cmd/templates:all-srcs", "//pkg/kubectl/cmd/testdata/edit:all-srcs", diff --git a/pkg/kubectl/cmd/scale.go b/pkg/kubectl/cmd/scale.go index 35d1bcf9a7..c8cd0dcf34 100644 --- a/pkg/kubectl/cmd/scale.go +++ b/pkg/kubectl/cmd/scale.go @@ -22,7 +22,9 @@ import ( "github.com/spf13/cobra" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" @@ -139,31 +141,46 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args return fmt.Errorf("cannot use --resource-version with multiple resources") } + currentSize := cmdutil.GetFlagInt(cmd, "current-replicas") + precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion} + retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout) + + var waitForReplicas *kubectl.RetryParams + if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 { + waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout) + } + counter := 0 err = r.Visit(func(info *resource.Info, err error) error { if err != nil { return err } - scaler, err := f.Scaler() - if err != nil { - return err - } - - currentSize := cmdutil.GetFlagInt(cmd, "current-replicas") - precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion} - retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout) - - var waitForReplicas *kubectl.RetryParams - if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 { - waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout) - } - mapping := info.ResourceMapping() - gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) - if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas, gvk.GroupResource()); err != nil { - return err + if mapping.Resource == "jobs" { + // go down the legacy jobs path. This can be removed in 3.14 For now, contain it. + fmt.Fprintf(errOut, "%s scale job is DEPRECATED and will be removed in a future version.\n", cmd.Parent().Name()) + + clientset, err := f.ClientSet() + if err != nil { + return err + } + if err := ScaleJob(info, clientset.Batch(), uint(count), precondition, retry, waitForReplicas); err != nil { + return err + } + + } else { + scaler, err := f.Scaler() + if err != nil { + return err + } + + gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) + if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas, gvk.GroupResource()); err != nil { + return err + } } + if cmdutil.ShouldRecord(cmd, info) { patchBytes, patchType, err := cmdutil.ChangeResourcePatch(info, f.Command(cmd, true)) if err != nil { @@ -192,3 +209,23 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args } return nil } + +func ScaleJob(info *resource.Info, jobsClient batchclient.JobsGetter, count uint, preconditions *kubectl.ScalePrecondition, retry, waitForReplicas *kubectl.RetryParams) error { + scaler := scalejob.JobPsuedoScaler{ + JobsClient: jobsClient, + } + var jobPreconditions *scalejob.ScalePrecondition + if preconditions != nil { + jobPreconditions = &scalejob.ScalePrecondition{Size: preconditions.Size, ResourceVersion: preconditions.ResourceVersion} + } + var jobRetry *scalejob.RetryParams + if retry != nil { + jobRetry = &scalejob.RetryParams{Interval: retry.Interval, Timeout: retry.Timeout} + } + var jobWaitForReplicas *scalejob.RetryParams + if waitForReplicas != nil { + jobWaitForReplicas = &scalejob.RetryParams{Interval: waitForReplicas.Interval, Timeout: waitForReplicas.Timeout} + } + + return scaler.Scale(info.Namespace, info.Name, count, jobPreconditions, jobRetry, jobWaitForReplicas) +} diff --git a/pkg/kubectl/cmd/scalejob/BUILD b/pkg/kubectl/cmd/scalejob/BUILD new file mode 100644 index 0000000000..f95519c780 --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/BUILD @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "scalejob.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/batch:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["scalejob_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/apis/batch:go_default_library", + "//pkg/apis/core:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubectl/cmd/scalejob/doc.go b/pkg/kubectl/cmd/scalejob/doc.go new file mode 100644 index 0000000000..589fa1a64a --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package scalejob is deprecated This package contains deprecated functions used to "scale" jobs in a way inconsistent with normal scaling rules +package scalejob diff --git a/pkg/kubectl/cmd/scalejob/scalejob.go b/pkg/kubectl/cmd/scalejob/scalejob.go new file mode 100644 index 0000000000..70264cd9fa --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/scalejob.go @@ -0,0 +1,162 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalejob + +import ( + "fmt" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/apis/batch" + + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" +) + +// ScalePrecondition is a deprecated precondition +type ScalePrecondition struct { + Size int + ResourceVersion string +} + +// RetryParams is a deprecated retry struct +type RetryParams struct { + Interval, Timeout time.Duration +} + +// PreconditionError is a deprecated error +type PreconditionError struct { + Precondition string + ExpectedValue string + ActualValue string +} + +func (pe PreconditionError) Error() string { + return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue) +} + +// ScaleCondition is a closure around Scale that facilitates retries via util.wait +func scaleCondition(r *JobPsuedoScaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc { + return func() (bool, error) { + rv, err := r.ScaleSimple(namespace, name, precondition, count) + if updatedResourceVersion != nil { + *updatedResourceVersion = rv + } + // Retry only on update conflicts. + if errors.IsConflict(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + } +} + +// JobPsuedoScaler is a deprecated scale-similar thing that doesn't obey scale semantics +type JobPsuedoScaler struct { + JobsClient batchclient.JobsGetter +} + +// ScaleSimple is responsible for updating job's parallelism. It returns the +// resourceVersion of the job if the update is successful. +func (scaler *JobPsuedoScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) { + job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return "", err + } + if preconditions != nil { + if err := validateJob(job, preconditions); err != nil { + return "", err + } + } + parallelism := int32(newSize) + job.Spec.Parallelism = ¶llelism + updatedJob, err := scaler.JobsClient.Jobs(namespace).Update(job) + if err != nil { + return "", err + } + return updatedJob.ObjectMeta.ResourceVersion, nil +} + +// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil), +// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired +// number, which can be less than requested based on job's current progress. +func (scaler *JobPsuedoScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := scaleCondition(scaler, preconditions, namespace, name, newSize, nil) + if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, jobHasDesiredParallelism(scaler.JobsClient, job)) + if err == wait.ErrWaitTimeout { + return fmt.Errorf("timed out waiting for %q to be synced", name) + } + return err + } + return nil +} + +// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count +// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. +func jobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { + return func() (bool, error) { + job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + // desired parallelism can be either the exact number, in which case return immediately + if job.Status.Active == *job.Spec.Parallelism { + return true, nil + } + if job.Spec.Completions == nil { + // A job without specified completions needs to wait for Active to reach Parallelism. + return false, nil + } + + // otherwise count successful + progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded + return progress == 0, nil + } +} + +func validateJob(job *batch.Job, precondition *ScalePrecondition) error { + if precondition.Size != -1 && job.Spec.Parallelism == nil { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} + } + if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size { + return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))} + } + if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} + } + return nil +} diff --git a/pkg/kubectl/cmd/scalejob/scalejob_test.go b/pkg/kubectl/cmd/scalejob/scalejob_test.go new file mode 100644 index 0000000000..d8a22f88ef --- /dev/null +++ b/pkg/kubectl/cmd/scalejob/scalejob_test.go @@ -0,0 +1,292 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalejob + +import ( + "errors" + "testing" + + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testcore "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/apis/batch" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" +) + +type errorJobs struct { + batchclient.JobInterface + conflict bool + invalid bool +} + +func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) { + switch { + case c.invalid: + return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil) + case c.conflict: + return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil) + } + return nil, errors.New("Job update failure") +} + +func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) { + zero := int32(0) + return &batch.Job{ + Spec: batch.JobSpec{ + Parallelism: &zero, + }, + }, nil +} + +type errorJobClient struct { + batchclient.JobsGetter + conflict bool + invalid bool +} + +func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface { + return &errorJobs{ + JobInterface: c.JobsGetter.Jobs(namespace), + conflict: c.conflict, + invalid: c.invalid, + } +} + +func TestJobScaleRetry(t *testing.T) { + fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true} + scaler := &JobPsuedoScaler{JobsClient: fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err := scaleFunc() + if pass != false { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err != nil { + t.Errorf("Did not expect an error on update failure, got %v", err) + } + preconditions = ScalePrecondition{3, ""} + scaleFunc = scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err = scaleFunc() + if err == nil { + t.Error("Expected error on precondition failure") + } +} + +func job() *batch.Job { + return &batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "foo", + }, + } +} + +func TestJobScale(t *testing.T) { + fakeClientset := fake.NewSimpleClientset(job()) + scaler := &JobPsuedoScaler{JobsClient: fakeClientset.Batch()} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fakeClientset.Actions() + if len(actions) != 2 { + t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions) + } + if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) + } + if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) { + t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count) + } +} + +func TestJobScaleInvalid(t *testing.T) { + fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true} + scaler := &JobPsuedoScaler{JobsClient: fake} + preconditions := ScalePrecondition{-1, ""} + count := uint(3) + name := "foo" + namespace := "default" + + scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil) + pass, err := scaleFunc() + if pass { + t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass) + } + if err == nil { + t.Errorf("Expected error on invalid update failure, got %v", err) + } +} + +func TestJobScaleFailsPreconditions(t *testing.T) { + ten := int32(10) + fake := fake.NewSimpleClientset(&batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }) + scaler := &JobPsuedoScaler{JobsClient: fake.Batch()} + preconditions := ScalePrecondition{2, ""} + count := uint(3) + name := "foo" + scaler.Scale("default", name, count, &preconditions, nil, nil) + + actions := fake.Actions() + if len(actions) != 1 { + t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions) + } + if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name { + t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name) + } +} + +func TestValidateJob(t *testing.T) { + zero, ten, twenty := int32(0), int32(10), int32(20) + tests := []struct { + preconditions ScalePrecondition + job batch.Job + expectError bool + test string + }{ + { + preconditions: ScalePrecondition{-1, ""}, + expectError: false, + test: "defaults", + }, + { + preconditions: ScalePrecondition{-1, ""}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "defaults 2", + }, + { + preconditions: ScalePrecondition{0, ""}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &zero, + }, + }, + expectError: false, + test: "size matches", + }, + { + preconditions: ScalePrecondition{-1, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "resource version matches", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: false, + test: "both match", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + Spec: batch.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "size different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "foo", + }, + }, + expectError: true, + test: "parallelism nil", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: batch.JobSpec{ + Parallelism: &ten, + }, + }, + expectError: true, + test: "version different", + }, + { + preconditions: ScalePrecondition{10, "foo"}, + job: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "bar", + }, + Spec: batch.JobSpec{ + Parallelism: &twenty, + }, + }, + expectError: true, + test: "both different", + }, + } + for _, test := range tests { + err := validateJob(&test.job, &test.preconditions) + if err != nil && !test.expectError { + t.Errorf("unexpected error: %v (%s)", err, test.test) + } + if err == nil && test.expectError { + t.Errorf("expected an error: %v (%s)", err, test.test) + } + } +} diff --git a/pkg/kubectl/conditions.go b/pkg/kubectl/conditions.go index 666b248f41..771ccbc4fb 100644 --- a/pkg/kubectl/conditions.go +++ b/pkg/kubectl/conditions.go @@ -26,11 +26,9 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/kubernetes/pkg/api/pod" "k8s.io/kubernetes/pkg/apis/apps" - "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion" - batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" ) @@ -99,30 +97,6 @@ func StatefulSetHasDesiredReplicas(ssClient appsclient.StatefulSetsGetter, ss *a } } -// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count -// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count. -func JobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc { - return func() (bool, error) { - job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) - if err != nil { - return false, err - } - - // desired parallelism can be either the exact number, in which case return immediately - if job.Status.Active == *job.Spec.Parallelism { - return true, nil - } - if job.Spec.Completions == nil { - // A job without specified completions needs to wait for Active to reach Parallelism. - return false, nil - } - - // otherwise count successful - progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded - return progress == 0, nil - } -} - // DeploymentHasDesiredReplicas returns a condition that will be true if and only if // the desired replica count for a deployment equals its updated replicas count. // (non-terminated pods that have the desired template spec). diff --git a/pkg/kubectl/delete.go b/pkg/kubectl/delete.go index f6c824de82..0d1f8a46ea 100644 --- a/pkg/kubectl/delete.go +++ b/pkg/kubectl/delete.go @@ -30,13 +30,16 @@ import ( "k8s.io/apimachinery/pkg/util/wait" scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/apis/apps" + "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion" + batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob" ) const ( @@ -80,6 +83,9 @@ func ReaperFor(kind schema.GroupKind, c internalclientset.Interface, sc scalecli case api.Kind("Pod"): return &PodReaper{c.Core()}, nil + case batch.Kind("Job"): + return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil + case apps.Kind("StatefulSet"): return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout, sc}, nil @@ -109,6 +115,11 @@ type DaemonSetReaper struct { client extensionsclient.DaemonSetsGetter pollInterval, timeout time.Duration } +type JobReaper struct { + client batchclient.JobsGetter + podClient coreclient.PodsGetter + pollInterval, timeout time.Duration +} type DeploymentReaper struct { dClient extensionsclient.DeploymentsGetter rsClient extensionsclient.ReplicaSetsGetter @@ -341,6 +352,53 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat return statefulsets.Delete(name, deleteOptions) } +func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { + jobs := reaper.client.Jobs(namespace) + pods := reaper.podClient.Pods(namespace) + scaler := &scalejob.JobPsuedoScaler{ + JobsClient: reaper.client, + } + job, err := jobs.Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + if timeout == 0 { + // we will never have more active pods than job.Spec.Parallelism + parallelism := *job.Spec.Parallelism + timeout = Timeout + time.Duration(10*parallelism)*time.Second + } + + // TODO: handle overlapping jobs + retry := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout} + waitForJobs := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout} + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) { + return err + } + // at this point only dead pods are left, that should be removed + selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) + options := metav1.ListOptions{LabelSelector: selector.String()} + podList, err := pods.List(options) + if err != nil { + return err + } + errList := []error{} + for _, pod := range podList.Items { + if err := pods.Delete(pod.Name, gracePeriod); err != nil { + // ignores the error when the pod isn't found + if !errors.IsNotFound(err) { + errList = append(errList, err) + } + } + } + if len(errList) > 0 { + return utilerrors.NewAggregate(errList) + } + // once we have all the pods removed we can safely remove the job itself. + falseVar := false + deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar} + return jobs.Delete(name, deleteOptions) +} + func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error { deployments := reaper.dClient.Deployments(namespace) rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout, reaper.scaleClient, schema.GroupResource{Group: reaper.gr.Group, Resource: "replicasets"}} diff --git a/pkg/kubectl/delete_test.go b/pkg/kubectl/delete_test.go index ebbebb64ba..6bc06d1404 100644 --- a/pkg/kubectl/delete_test.go +++ b/pkg/kubectl/delete_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/watch" fakescale "k8s.io/client-go/scale/fake" testcore "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" @@ -381,6 +382,103 @@ func TestReplicaSetStop(t *testing.T) { } } +func TestJobStop(t *testing.T) { + name := "foo" + ns := "default" + zero := int32(0) + tests := []struct { + Name string + Objs []runtime.Object + StopError error + ExpectedActions []string + }{ + { + Name: "OnlyOneJob", + Objs: []runtime.Object{ + &batch.JobList{ // LIST + Items: []batch.Job{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: batch.JobSpec{ + Parallelism: &zero, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"k1": "v1"}, + }, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs", + "get:jobs", "get:jobs", "list:pods", "delete:jobs"}, + }, + { + Name: "JobWithDeadPods", + Objs: []runtime.Object{ + &batch.JobList{ // LIST + Items: []batch.Job{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: batch.JobSpec{ + Parallelism: &zero, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"k1": "v1"}, + }, + }, + }, + }, + }, + &api.PodList{ // LIST + Items: []api.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: ns, + Labels: map[string]string{"k1": "v1"}, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs", + "get:jobs", "get:jobs", "list:pods", "delete:pods", "delete:jobs"}, + }, + } + + for _, test := range tests { + fake := fake.NewSimpleClientset(test.Objs...) + reaper := JobReaper{fake.Batch(), fake.Core(), time.Millisecond, time.Millisecond} + err := reaper.Stop(ns, name, 0, nil) + if !reflect.DeepEqual(err, test.StopError) { + t.Errorf("%s unexpected error: %v", test.Name, err) + continue + } + + actions := fake.Actions() + if len(actions) != len(test.ExpectedActions) { + t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) + continue + } + for i, expAction := range test.ExpectedActions { + action := strings.Split(expAction, ":") + if actions[i].GetVerb() != action[0] { + t.Errorf("%s unexpected verb: %+v, expected %s", test.Name, actions[i], expAction) + } + if actions[i].GetResource().Resource != action[1] { + t.Errorf("%s unexpected resource: %+v, expected %s", test.Name, actions[i], expAction) + } + } + } +} + func TestDeploymentStop(t *testing.T) { name := "foo" ns := "default" diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index b7b8cd0855..590b600fd4 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -25,7 +25,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/kubernetes/pkg/apis/batch" scaleclient "k8s.io/client-go/scale" ) @@ -97,20 +96,6 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s } } -// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise. -func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error { - if precondition.Size != -1 && job.Spec.Parallelism == nil { - return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"} - } - if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size { - return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))} - } - if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion { - return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion} - } - return nil -} - // validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error { if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {