segregate job scaling from everything else

pull/8/head
David Eads 2018-04-04 08:57:07 -04:00
parent 3b00b4a86f
commit 1fec6980b6
11 changed files with 734 additions and 58 deletions

View File

@ -40,6 +40,7 @@ go_test(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
@ -135,6 +136,7 @@ go_library(
"//pkg/controller/statefulset:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubectl/apps:go_default_library",
"//pkg/kubectl/cmd/scalejob:go_default_library",
"//pkg/kubectl/resource:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/kubectl/util/hash:go_default_library",

View File

@ -75,6 +75,7 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/apply/parse:go_default_library",
@ -83,6 +84,7 @@ go_library(
"//pkg/kubectl/cmd/config:go_default_library",
"//pkg/kubectl/cmd/resource:go_default_library",
"//pkg/kubectl/cmd/rollout:go_default_library",
"//pkg/kubectl/cmd/scalejob:go_default_library",
"//pkg/kubectl/cmd/set:go_default_library",
"//pkg/kubectl/cmd/templates:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",
@ -287,6 +289,7 @@ filegroup(
"//pkg/kubectl/cmd/config:all-srcs",
"//pkg/kubectl/cmd/resource:all-srcs",
"//pkg/kubectl/cmd/rollout:all-srcs",
"//pkg/kubectl/cmd/scalejob:all-srcs",
"//pkg/kubectl/cmd/set:all-srcs",
"//pkg/kubectl/cmd/templates:all-srcs",
"//pkg/kubectl/cmd/testdata/edit:all-srcs",

View File

@ -22,7 +22,9 @@ import (
"github.com/spf13/cobra"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/scalejob"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
@ -139,17 +141,6 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
return fmt.Errorf("cannot use --resource-version with multiple resources")
}
counter := 0
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
scaler, err := f.Scaler()
if err != nil {
return err
}
currentSize := cmdutil.GetFlagInt(cmd, "current-replicas")
precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion}
retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout)
@ -159,11 +150,37 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout)
}
counter := 0
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
return err
}
mapping := info.ResourceMapping()
if mapping.Resource == "jobs" {
// go down the legacy jobs path. This can be removed in 3.14 For now, contain it.
fmt.Fprintf(errOut, "%s scale job is DEPRECATED and will be removed in a future version.\n", cmd.Parent().Name())
clientset, err := f.ClientSet()
if err != nil {
return err
}
if err := ScaleJob(info, clientset.Batch(), uint(count), precondition, retry, waitForReplicas); err != nil {
return err
}
} else {
scaler, err := f.Scaler()
if err != nil {
return err
}
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas, gvk.GroupResource()); err != nil {
return err
}
}
if cmdutil.ShouldRecord(cmd, info) {
patchBytes, patchType, err := cmdutil.ChangeResourcePatch(info, f.Command(cmd, true))
if err != nil {
@ -192,3 +209,23 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
}
return nil
}
func ScaleJob(info *resource.Info, jobsClient batchclient.JobsGetter, count uint, preconditions *kubectl.ScalePrecondition, retry, waitForReplicas *kubectl.RetryParams) error {
scaler := scalejob.JobPsuedoScaler{
JobsClient: jobsClient,
}
var jobPreconditions *scalejob.ScalePrecondition
if preconditions != nil {
jobPreconditions = &scalejob.ScalePrecondition{Size: preconditions.Size, ResourceVersion: preconditions.ResourceVersion}
}
var jobRetry *scalejob.RetryParams
if retry != nil {
jobRetry = &scalejob.RetryParams{Interval: retry.Interval, Timeout: retry.Timeout}
}
var jobWaitForReplicas *scalejob.RetryParams
if waitForReplicas != nil {
jobWaitForReplicas = &scalejob.RetryParams{Interval: waitForReplicas.Interval, Timeout: waitForReplicas.Timeout}
}
return scaler.Scale(info.Namespace, info.Name, count, jobPreconditions, jobRetry, jobWaitForReplicas)
}

View File

@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"scalejob.go",
],
importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/batch:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["scalejob_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package scalejob is deprecated This package contains deprecated functions used to "scale" jobs in a way inconsistent with normal scaling rules
package scalejob

View File

@ -0,0 +1,162 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalejob
import (
"fmt"
"strconv"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/apis/batch"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
// ScalePrecondition is a deprecated precondition
type ScalePrecondition struct {
Size int
ResourceVersion string
}
// RetryParams is a deprecated retry struct
type RetryParams struct {
Interval, Timeout time.Duration
}
// PreconditionError is a deprecated error
type PreconditionError struct {
Precondition string
ExpectedValue string
ActualValue string
}
func (pe PreconditionError) Error() string {
return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
}
// ScaleCondition is a closure around Scale that facilitates retries via util.wait
func scaleCondition(r *JobPsuedoScaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc {
return func() (bool, error) {
rv, err := r.ScaleSimple(namespace, name, precondition, count)
if updatedResourceVersion != nil {
*updatedResourceVersion = rv
}
// Retry only on update conflicts.
if errors.IsConflict(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
}
// JobPsuedoScaler is a deprecated scale-similar thing that doesn't obey scale semantics
type JobPsuedoScaler struct {
JobsClient batchclient.JobsGetter
}
// ScaleSimple is responsible for updating job's parallelism. It returns the
// resourceVersion of the job if the update is successful.
func (scaler *JobPsuedoScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", err
}
if preconditions != nil {
if err := validateJob(job, preconditions); err != nil {
return "", err
}
}
parallelism := int32(newSize)
job.Spec.Parallelism = &parallelism
updatedJob, err := scaler.JobsClient.Jobs(namespace).Update(job)
if err != nil {
return "", err
}
return updatedJob.ObjectMeta.ResourceVersion, nil
}
// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
// number, which can be less than requested based on job's current progress.
func (scaler *JobPsuedoScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := scaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, jobHasDesiredParallelism(scaler.JobsClient, job))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func jobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc {
return func() (bool, error) {
job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// desired parallelism can be either the exact number, in which case return immediately
if job.Status.Active == *job.Spec.Parallelism {
return true, nil
}
if job.Spec.Completions == nil {
// A job without specified completions needs to wait for Active to reach Parallelism.
return false, nil
}
// otherwise count successful
progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded
return progress == 0, nil
}
}
func validateJob(job *batch.Job, precondition *ScalePrecondition) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))}
}
if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}

View File

@ -0,0 +1,292 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalejob
import (
"errors"
"testing"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
type errorJobs struct {
batchclient.JobInterface
conflict bool
invalid bool
}
func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) {
switch {
case c.invalid:
return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil)
case c.conflict:
return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil)
}
return nil, errors.New("Job update failure")
}
func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) {
zero := int32(0)
return &batch.Job{
Spec: batch.JobSpec{
Parallelism: &zero,
},
}, nil
}
type errorJobClient struct {
batchclient.JobsGetter
conflict bool
invalid bool
}
func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface {
return &errorJobs{
JobInterface: c.JobsGetter.Jobs(namespace),
conflict: c.conflict,
invalid: c.invalid,
}
}
func TestJobScaleRetry(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true}
scaler := &JobPsuedoScaler{JobsClient: fake}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err != nil {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = ScalePrecondition{3, ""}
scaleFunc = scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
}
}
func job() *batch.Job {
return &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
}
}
func TestJobScale(t *testing.T) {
fakeClientset := fake.NewSimpleClientset(job())
scaler := &JobPsuedoScaler{JobsClient: fakeClientset.Batch()}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fakeClientset.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) {
t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count)
}
}
func TestJobScaleInvalid(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true}
scaler := &JobPsuedoScaler{JobsClient: fake}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err == nil {
t.Errorf("Expected error on invalid update failure, got %v", err)
}
}
func TestJobScaleFailsPreconditions(t *testing.T) {
ten := int32(10)
fake := fake.NewSimpleClientset(&batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
})
scaler := &JobPsuedoScaler{JobsClient: fake.Batch()}
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fake.Actions()
if len(actions) != 1 {
t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
}
func TestValidateJob(t *testing.T) {
zero, ten, twenty := int32(0), int32(10), int32(20)
tests := []struct {
preconditions ScalePrecondition
job batch.Job
expectError bool
test string
}{
{
preconditions: ScalePrecondition{-1, ""},
expectError: false,
test: "defaults",
},
{
preconditions: ScalePrecondition{-1, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "defaults 2",
},
{
preconditions: ScalePrecondition{0, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &zero,
},
},
expectError: false,
test: "size matches",
},
{
preconditions: ScalePrecondition{-1, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "resource version matches",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "both match",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "size different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
},
expectError: true,
test: "parallelism nil",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: true,
test: "version different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "both different",
},
}
for _, test := range tests {
err := validateJob(&test.job, &test.preconditions)
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
if err == nil && test.expectError {
t.Errorf("expected an error: %v (%s)", err, test.test)
}
}
}

View File

@ -26,11 +26,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
)
@ -99,30 +97,6 @@ func StatefulSetHasDesiredReplicas(ssClient appsclient.StatefulSetsGetter, ss *a
}
}
// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func JobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc {
return func() (bool, error) {
job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// desired parallelism can be either the exact number, in which case return immediately
if job.Status.Active == *job.Spec.Parallelism {
return true, nil
}
if job.Spec.Completions == nil {
// A job without specified completions needs to wait for Active to reach Parallelism.
return false, nil
}
// otherwise count successful
progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded
return progress == 0, nil
}
}
// DeploymentHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a deployment equals its updated replicas count.
// (non-terminated pods that have the desired template spec).

View File

@ -30,13 +30,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl/cmd/scalejob"
)
const (
@ -80,6 +83,9 @@ func ReaperFor(kind schema.GroupKind, c internalclientset.Interface, sc scalecli
case api.Kind("Pod"):
return &PodReaper{c.Core()}, nil
case batch.Kind("Job"):
return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil
case apps.Kind("StatefulSet"):
return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout, sc}, nil
@ -109,6 +115,11 @@ type DaemonSetReaper struct {
client extensionsclient.DaemonSetsGetter
pollInterval, timeout time.Duration
}
type JobReaper struct {
client batchclient.JobsGetter
podClient coreclient.PodsGetter
pollInterval, timeout time.Duration
}
type DeploymentReaper struct {
dClient extensionsclient.DeploymentsGetter
rsClient extensionsclient.ReplicaSetsGetter
@ -341,6 +352,53 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat
return statefulsets.Delete(name, deleteOptions)
}
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
jobs := reaper.client.Jobs(namespace)
pods := reaper.podClient.Pods(namespace)
scaler := &scalejob.JobPsuedoScaler{
JobsClient: reaper.client,
}
job, err := jobs.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if timeout == 0 {
// we will never have more active pods than job.Spec.Parallelism
parallelism := *job.Spec.Parallelism
timeout = Timeout + time.Duration(10*parallelism)*time.Second
}
// TODO: handle overlapping jobs
retry := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
waitForJobs := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) {
return err
}
// at this point only dead pods are left, that should be removed
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := pods.List(options)
if err != nil {
return err
}
errList := []error{}
for _, pod := range podList.Items {
if err := pods.Delete(pod.Name, gracePeriod); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) > 0 {
return utilerrors.NewAggregate(errList)
}
// once we have all the pods removed we can safely remove the job itself.
falseVar := false
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
return jobs.Delete(name, deleteOptions)
}
func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
deployments := reaper.dClient.Deployments(namespace)
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout, reaper.scaleClient, schema.GroupResource{Group: reaper.gr.Group, Resource: "replicasets"}}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
@ -381,6 +382,103 @@ func TestReplicaSetStop(t *testing.T) {
}
}
func TestJobStop(t *testing.T) {
name := "foo"
ns := "default"
zero := int32(0)
tests := []struct {
Name string
Objs []runtime.Object
StopError error
ExpectedActions []string
}{
{
Name: "OnlyOneJob",
Objs: []runtime.Object{
&batch.JobList{ // LIST
Items: []batch.Job{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: batch.JobSpec{
Parallelism: &zero,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
},
},
},
StopError: nil,
ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:jobs"},
},
{
Name: "JobWithDeadPods",
Objs: []runtime.Object{
&batch.JobList{ // LIST
Items: []batch.Job{
{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: batch.JobSpec{
Parallelism: &zero,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
},
},
&api.PodList{ // LIST
Items: []api.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
},
},
},
},
},
StopError: nil,
ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:pods", "delete:jobs"},
},
}
for _, test := range tests {
fake := fake.NewSimpleClientset(test.Objs...)
reaper := JobReaper{fake.Batch(), fake.Core(), time.Millisecond, time.Millisecond}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
continue
}
actions := fake.Actions()
if len(actions) != len(test.ExpectedActions) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
continue
}
for i, expAction := range test.ExpectedActions {
action := strings.Split(expAction, ":")
if actions[i].GetVerb() != action[0] {
t.Errorf("%s unexpected verb: %+v, expected %s", test.Name, actions[i], expAction)
}
if actions[i].GetResource().Resource != action[1] {
t.Errorf("%s unexpected resource: %+v, expected %s", test.Name, actions[i], expAction)
}
}
}
}
func TestDeploymentStop(t *testing.T) {
name := "foo"
ns := "default"

View File

@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/apis/batch"
scaleclient "k8s.io/client-go/scale"
)
@ -97,20 +96,6 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
}
}
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))}
}
if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}
// validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error
func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error {
if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {