Merge pull request #62114 from deads2k/cli-22-jobs

Automatic merge from submit-queue (batch tested with PRs 62208, 62114, 62144, 60460, 62214). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

removes job scaler, continued

Builds on https://github.com/kubernetes/kubernetes/pull/61912 (original commit is there for credit/blame)

This keeps all the updates to the scaler building and all the test and reaper cleanup.  It just keeps a fake job scaler around for a different command path and the reaper.

/assign @p0lyn0mial 
/assign @soltysh 

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-04-06 17:06:09 -07:00 committed by GitHub
commit cba403024f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 666 additions and 497 deletions

View File

@ -45,7 +45,6 @@ go_test(
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/util/pointer:go_default_library",
@ -137,6 +136,7 @@ go_library(
"//pkg/controller/statefulset:go_default_library",
"//pkg/credentialprovider:go_default_library",
"//pkg/kubectl/apps:go_default_library",
"//pkg/kubectl/cmd/scalejob:go_default_library",
"//pkg/kubectl/resource:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//pkg/kubectl/util/hash:go_default_library",

View File

@ -75,6 +75,7 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubectl/apply/parse:go_default_library",
@ -83,6 +84,7 @@ go_library(
"//pkg/kubectl/cmd/config:go_default_library",
"//pkg/kubectl/cmd/resource:go_default_library",
"//pkg/kubectl/cmd/rollout:go_default_library",
"//pkg/kubectl/cmd/scalejob:go_default_library",
"//pkg/kubectl/cmd/set:go_default_library",
"//pkg/kubectl/cmd/templates:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library",
@ -287,6 +289,7 @@ filegroup(
"//pkg/kubectl/cmd/config:all-srcs",
"//pkg/kubectl/cmd/resource:all-srcs",
"//pkg/kubectl/cmd/rollout:all-srcs",
"//pkg/kubectl/cmd/scalejob:all-srcs",
"//pkg/kubectl/cmd/set:all-srcs",
"//pkg/kubectl/cmd/templates:all-srcs",
"//pkg/kubectl/cmd/testdata/edit:all-srcs",

View File

@ -22,7 +22,9 @@ import (
"github.com/spf13/cobra"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/scalejob"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
@ -60,7 +62,7 @@ var (
func NewCmdScale(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
options := &resource.FilenameOptions{}
validArgs := []string{"deployment", "replicaset", "replicationcontroller", "job", "statefulset"}
validArgs := []string{"deployment", "replicaset", "replicationcontroller", "statefulset"}
argAliases := kubectl.ResourceAliases(validArgs)
cmd := &cobra.Command{
@ -139,6 +141,15 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
return fmt.Errorf("cannot use --resource-version with multiple resources")
}
currentSize := cmdutil.GetFlagInt(cmd, "current-replicas")
precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion}
retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout)
var waitForReplicas *kubectl.RetryParams
if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 {
waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout)
}
counter := 0
err = r.Visit(func(info *resource.Info, err error) error {
if err != nil {
@ -147,26 +158,29 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
mapping := info.ResourceMapping()
if mapping.Resource == "jobs" {
// go down the legacy jobs path. This can be removed in 3.14 For now, contain it.
fmt.Fprintf(errOut, "%s scale job is DEPRECATED and will be removed in a future version.\n", cmd.Parent().Name())
clientset, err := f.ClientSet()
if err != nil {
return err
}
if err := ScaleJob(info, clientset.Batch(), uint(count), precondition, retry, waitForReplicas); err != nil {
return err
}
} else {
scaler, err := f.Scaler()
if err != nil {
return err
}
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas, gvk.GroupResource()); err != nil {
return err
}
}
scaler, err := f.Scaler(mapping)
if err != nil {
return err
}
currentSize := cmdutil.GetFlagInt(cmd, "current-replicas")
precondition := &kubectl.ScalePrecondition{Size: currentSize, ResourceVersion: resourceVersion}
retry := kubectl.NewRetryParams(kubectl.Interval, kubectl.Timeout)
var waitForReplicas *kubectl.RetryParams
if timeout := cmdutil.GetFlagDuration(cmd, "timeout"); timeout != 0 {
waitForReplicas = kubectl.NewRetryParams(kubectl.Interval, timeout)
}
if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas); err != nil {
return err
}
if cmdutil.ShouldRecord(cmd, info) {
patchBytes, patchType, err := cmdutil.ChangeResourcePatch(info, f.Command(cmd, true))
if err != nil {
@ -195,3 +209,23 @@ func RunScale(f cmdutil.Factory, out, errOut io.Writer, cmd *cobra.Command, args
}
return nil
}
func ScaleJob(info *resource.Info, jobsClient batchclient.JobsGetter, count uint, preconditions *kubectl.ScalePrecondition, retry, waitForReplicas *kubectl.RetryParams) error {
scaler := scalejob.JobPsuedoScaler{
JobsClient: jobsClient,
}
var jobPreconditions *scalejob.ScalePrecondition
if preconditions != nil {
jobPreconditions = &scalejob.ScalePrecondition{Size: preconditions.Size, ResourceVersion: preconditions.ResourceVersion}
}
var jobRetry *scalejob.RetryParams
if retry != nil {
jobRetry = &scalejob.RetryParams{Interval: retry.Interval, Timeout: retry.Timeout}
}
var jobWaitForReplicas *scalejob.RetryParams
if waitForReplicas != nil {
jobWaitForReplicas = &scalejob.RetryParams{Interval: waitForReplicas.Interval, Timeout: waitForReplicas.Timeout}
}
return scaler.Scale(info.Namespace, info.Name, count, jobPreconditions, jobRetry, jobWaitForReplicas)
}

View File

@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"scalejob.go",
],
importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/scalejob",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/batch:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["scalejob_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,18 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package scalejob is deprecated This package contains deprecated functions used to "scale" jobs in a way inconsistent with normal scaling rules
package scalejob

View File

@ -0,0 +1,162 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalejob
import (
"fmt"
"strconv"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/apis/batch"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
// ScalePrecondition is a deprecated precondition
type ScalePrecondition struct {
Size int
ResourceVersion string
}
// RetryParams is a deprecated retry struct
type RetryParams struct {
Interval, Timeout time.Duration
}
// PreconditionError is a deprecated error
type PreconditionError struct {
Precondition string
ExpectedValue string
ActualValue string
}
func (pe PreconditionError) Error() string {
return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
}
// ScaleCondition is a closure around Scale that facilitates retries via util.wait
func scaleCondition(r *JobPsuedoScaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc {
return func() (bool, error) {
rv, err := r.ScaleSimple(namespace, name, precondition, count)
if updatedResourceVersion != nil {
*updatedResourceVersion = rv
}
// Retry only on update conflicts.
if errors.IsConflict(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
}
// JobPsuedoScaler is a deprecated scale-similar thing that doesn't obey scale semantics
type JobPsuedoScaler struct {
JobsClient batchclient.JobsGetter
}
// ScaleSimple is responsible for updating job's parallelism. It returns the
// resourceVersion of the job if the update is successful.
func (scaler *JobPsuedoScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", err
}
if preconditions != nil {
if err := validateJob(job, preconditions); err != nil {
return "", err
}
}
parallelism := int32(newSize)
job.Spec.Parallelism = &parallelism
updatedJob, err := scaler.JobsClient.Jobs(namespace).Update(job)
if err != nil {
return "", err
}
return updatedJob.ObjectMeta.ResourceVersion, nil
}
// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
// number, which can be less than requested based on job's current progress.
func (scaler *JobPsuedoScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := scaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
job, err := scaler.JobsClient.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, jobHasDesiredParallelism(scaler.JobsClient, job))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func jobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc {
return func() (bool, error) {
job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// desired parallelism can be either the exact number, in which case return immediately
if job.Status.Active == *job.Spec.Parallelism {
return true, nil
}
if job.Spec.Completions == nil {
// A job without specified completions needs to wait for Active to reach Parallelism.
return false, nil
}
// otherwise count successful
progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded
return progress == 0, nil
}
}
func validateJob(job *batch.Job, precondition *ScalePrecondition) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))}
}
if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}

View File

@ -0,0 +1,292 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scalejob
import (
"errors"
"testing"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
type errorJobs struct {
batchclient.JobInterface
conflict bool
invalid bool
}
func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) {
switch {
case c.invalid:
return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil)
case c.conflict:
return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil)
}
return nil, errors.New("Job update failure")
}
func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) {
zero := int32(0)
return &batch.Job{
Spec: batch.JobSpec{
Parallelism: &zero,
},
}, nil
}
type errorJobClient struct {
batchclient.JobsGetter
conflict bool
invalid bool
}
func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface {
return &errorJobs{
JobInterface: c.JobsGetter.Jobs(namespace),
conflict: c.conflict,
invalid: c.invalid,
}
}
func TestJobScaleRetry(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true}
scaler := &JobPsuedoScaler{JobsClient: fake}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err != nil {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = ScalePrecondition{3, ""}
scaleFunc = scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
}
}
func job() *batch.Job {
return &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
}
}
func TestJobScale(t *testing.T) {
fakeClientset := fake.NewSimpleClientset(job())
scaler := &JobPsuedoScaler{JobsClient: fakeClientset.Batch()}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fakeClientset.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) {
t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count)
}
}
func TestJobScaleInvalid(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true}
scaler := &JobPsuedoScaler{JobsClient: fake}
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := scaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err == nil {
t.Errorf("Expected error on invalid update failure, got %v", err)
}
}
func TestJobScaleFailsPreconditions(t *testing.T) {
ten := int32(10)
fake := fake.NewSimpleClientset(&batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
})
scaler := &JobPsuedoScaler{JobsClient: fake.Batch()}
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fake.Actions()
if len(actions) != 1 {
t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
}
func TestValidateJob(t *testing.T) {
zero, ten, twenty := int32(0), int32(10), int32(20)
tests := []struct {
preconditions ScalePrecondition
job batch.Job
expectError bool
test string
}{
{
preconditions: ScalePrecondition{-1, ""},
expectError: false,
test: "defaults",
},
{
preconditions: ScalePrecondition{-1, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "defaults 2",
},
{
preconditions: ScalePrecondition{0, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &zero,
},
},
expectError: false,
test: "size matches",
},
{
preconditions: ScalePrecondition{-1, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "resource version matches",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "both match",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "size different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
},
expectError: true,
test: "parallelism nil",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: true,
test: "version different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "both different",
},
}
for _, test := range tests {
err := validateJob(&test.job, &test.preconditions)
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
if err == nil && test.expectError {
t.Errorf("expected an error: %v (%s)", err, test.test)
}
}
}

View File

@ -211,7 +211,7 @@ type BuilderFactory interface {
// PluginRunner provides the implementation to be used to run cli plugins.
PluginRunner() plugins.PluginRunner
// Returns a Scaler for changing the size of the specified RESTMapping type or an error
Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
Scaler() (kubectl.Scaler, error)
// ScaleClient gives you back scale getter
ScaleClient() (scaleclient.ScalesGetter, error)
// Returns a Reaper for gracefully shutting down resources.

View File

@ -103,19 +103,13 @@ func (f *ring2Factory) ScaleClient() (scaleclient.ScalesGetter, error) {
return scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver), nil
}
func (f *ring2Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) {
clientset, err := f.clientAccessFactory.ClientSet()
if err != nil {
return nil, err
}
func (f *ring2Factory) Scaler() (kubectl.Scaler, error) {
scalesGetter, err := f.ScaleClient()
if err != nil {
return nil, err
}
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset.Batch(), scalesGetter, gvk.GroupResource()), nil
return kubectl.NewScaler(scalesGetter), nil
}
func (f *ring2Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {

View File

@ -26,11 +26,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
)
@ -99,30 +97,6 @@ func StatefulSetHasDesiredReplicas(ssClient appsclient.StatefulSetsGetter, ss *a
}
}
// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func JobHasDesiredParallelism(jobClient batchclient.JobsGetter, job *batch.Job) wait.ConditionFunc {
return func() (bool, error) {
job, err := jobClient.Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
// desired parallelism can be either the exact number, in which case return immediately
if job.Status.Active == *job.Spec.Parallelism {
return true, nil
}
if job.Spec.Completions == nil {
// A job without specified completions needs to wait for Active to reach Parallelism.
return false, nil
}
// otherwise count successful
progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded
return progress == 0, nil
}
}
// DeploymentHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a deployment equals its updated replicas count.
// (non-terminated pods that have the desired template spec).

View File

@ -39,6 +39,7 @@ import (
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/kubectl/cmd/scalejob"
)
const (
@ -155,7 +156,7 @@ func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterfac
func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rc := reaper.client.ReplicationControllers(namespace)
scaler := NewScaler(reaper.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"})
scaler := NewScaler(reaper.scaleClient)
ctrl, err := rc.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -206,7 +207,7 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
// No overlapping controllers.
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) {
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil && !errors.IsNotFound(err) {
return err
}
}
@ -224,7 +225,7 @@ func getOverlappingReplicaSets(c extensionsclient.ReplicaSetInterface, rs *exten
func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rsc := reaper.client.ReplicaSets(namespace)
scaler := NewScaler(reaper.scaleClient, reaper.gr)
scaler := NewScaler(reaper.scaleClient)
rs, err := rsc.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -276,7 +277,7 @@ func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Durati
// No overlapping ReplicaSets.
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil && !errors.IsNotFound(err) {
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas, reaper.gr); err != nil && !errors.IsNotFound(err) {
return err
}
}
@ -325,7 +326,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
statefulsets := reaper.client.StatefulSets(namespace)
scaler := NewScaler(reaper.scaleClient, apps.Resource("statefulsets"))
scaler := NewScaler(reaper.scaleClient)
ss, err := statefulsets.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -340,7 +341,7 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil && !errors.IsNotFound(err) {
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet, apps.Resource("statefulsets")); err != nil && !errors.IsNotFound(err) {
return err
}
@ -354,7 +355,9 @@ func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Durat
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
jobs := reaper.client.Jobs(namespace)
pods := reaper.podClient.Pods(namespace)
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, reaper.client, nil, schema.GroupResource{})
scaler := &scalejob.JobPsuedoScaler{
JobsClient: reaper.client,
}
job, err := jobs.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -366,8 +369,8 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
}
// TODO: handle overlapping jobs
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
retry := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
waitForJobs := &scalejob.RetryParams{Interval: reaper.pollInterval, Timeout: reaper.timeout}
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil && !errors.IsNotFound(err) {
return err
}

View File

@ -400,8 +400,8 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
// scalerScaleAndWait scales a controller using a Scaler and a real client.
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaler := NewScaler(r.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"})
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
scaler := NewScaler(r.scaleClient)
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait, schema.GroupResource{Resource: "replicationcontrollers"}); err != nil {
return nil, err
}
return r.rcClient.ReplicationControllers(rc.Namespace).Get(rc.Name, metav1.GetOptions{})

View File

@ -23,13 +23,10 @@ import (
autoscalingapi "k8s.io/api/autoscaling/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/apis/batch"
scaleclient "k8s.io/client-go/scale"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
// TODO: Figure out if we should be waiting on initializers in the Scale() functions below.
@ -40,30 +37,15 @@ type Scaler interface {
// retries in the event of resource version mismatch (if retry is not nil),
// and optionally waits until the status of the resource matches newSize (if wait is not nil)
// TODO: Make the implementation of this watch-based (#56075) once #31345 is fixed.
Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams) error
Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams, gr schema.GroupResource) error
// ScaleSimple does a simple one-shot attempt at scaling - not useful on its own, but
// a necessary building block for Scale
ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error)
}
// ScalerFor gets a scaler for a given resource
func ScalerFor(kind schema.GroupKind, jobsClient batchclient.JobsGetter, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
// it seems like jobs dont't follow "normal" scale semantics.
// For example it is not clear whether HPA could make use of it or not.
// For more details see: https://github.com/kubernetes/kubernetes/pull/58468
switch kind {
case batch.Kind("Job"):
return &jobScaler{jobsClient} // Either kind of job can be scaled with Batch interface.
default:
return NewScaler(scalesGetter, gr)
}
ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error)
}
// NewScaler get a scaler for a given resource
// Note that if you are trying to crate create a scaler for "job" then stop and use ScalerFor instead.
// When scaling jobs is dead, we'll remove ScalerFor method.
func NewScaler(scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
return &genericScaler{scalesGetter, gr}
func NewScaler(scalesGetter scaleclient.ScalesGetter) Scaler {
return &genericScaler{scalesGetter}
}
// ScalePrecondition describes a condition that must be true for the scale to take place
@ -97,9 +79,9 @@ func NewRetryParams(interval, timeout time.Duration) *RetryParams {
}
// ScaleCondition is a closure around Scale that facilitates retries via util.wait
func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string) wait.ConditionFunc {
func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gr schema.GroupResource) wait.ConditionFunc {
return func() (bool, error) {
rv, err := r.ScaleSimple(namespace, name, precondition, count)
rv, err := r.ScaleSimple(namespace, name, precondition, count, gr)
if updatedResourceVersion != nil {
*updatedResourceVersion = rv
}
@ -114,74 +96,6 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
}
}
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), "nil"}
}
if precondition.Size != -1 && int(*job.Spec.Parallelism) != precondition.Size {
return PreconditionError{"parallelism", strconv.Itoa(precondition.Size), strconv.Itoa(int(*job.Spec.Parallelism))}
}
if len(precondition.ResourceVersion) != 0 && job.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, job.ResourceVersion}
}
return nil
}
type jobScaler struct {
c batchclient.JobsGetter
}
// ScaleSimple is responsible for updating job's parallelism. It returns the
// resourceVersion of the job if the update is successful.
func (scaler *jobScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", err
}
if preconditions != nil {
if err := preconditions.ValidateJob(job); err != nil {
return "", err
}
}
parallelism := int32(newSize)
job.Spec.Parallelism = &parallelism
updatedJob, err := scaler.c.Jobs(namespace).Update(job)
if err != nil {
return "", err
}
return updatedJob.ObjectMeta.ResourceVersion, nil
}
// Scale updates a Job to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for parallelism to reach desired
// number, which can be less than requested based on job's current progress.
func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
job, err := scaler.c.Jobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, JobHasDesiredParallelism(scaler.c, job))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error
func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error {
if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {
@ -196,14 +110,13 @@ func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) err
// genericScaler can update scales for resources in a particular namespace
type genericScaler struct {
scaleNamespacer scaleclient.ScalesGetter
targetGR schema.GroupResource
}
var _ Scaler = &genericScaler{}
// ScaleSimple updates a scale of a given resource. It returns the resourceVersion of the scale if the update was successful.
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) {
scale, err := s.scaleNamespacer.Scales(namespace).Get(s.targetGR, name)
func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gr schema.GroupResource) (updatedResourceVersion string, err error) {
scale, err := s.scaleNamespacer.Scales(namespace).Get(gr, name)
if err != nil {
return "", err
}
@ -214,7 +127,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
}
scale.Spec.Replicas = int32(newSize)
updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(s.targetGR, scale)
updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(gr, scale)
if err != nil {
return "", err
}
@ -223,7 +136,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
// Scale updates a scale of a given resource to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams, gr schema.GroupResource) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
@ -231,7 +144,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
// make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil)
cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil, gr)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
@ -239,7 +152,7 @@ func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, prec
err := wait.PollImmediate(
waitForReplicas.Interval,
waitForReplicas.Timeout,
scaleHasDesiredReplicas(s.scaleNamespacer, s.targetGR, resourceName, namespace, int32(newSize)))
scaleHasDesiredReplicas(s.scaleNamespacer, gr, resourceName, namespace, int32(newSize)))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubectl
import (
"errors"
"fmt"
"testing"
"time"
@ -26,11 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/scale"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
)
func TestReplicationControllerScaleRetry(t *testing.T) {
@ -39,13 +34,13 @@ func TestReplicationControllerScaleRetry(t *testing.T) {
}
scaleClientExpectedAction := []string{"get", "update", "get"}
scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo-v1"
namespace := metav1.NamespaceDefault
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -54,7 +49,7 @@ func TestReplicationControllerScaleRetry(t *testing.T) {
t.Errorf("Did not expect an error on update conflict failure, got %v", err)
}
preconditions = ScalePrecondition{3, ""}
scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
pass, err = scaleFunc()
if err == nil {
t.Errorf("Expected error on precondition failure")
@ -76,13 +71,13 @@ func TestReplicationControllerScaleInvalid(t *testing.T) {
}
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 1, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo-v1"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -104,11 +99,11 @@ func TestReplicationControllerScaleInvalid(t *testing.T) {
func TestReplicationControllerScale(t *testing.T) {
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("replicationcontrollers", "foo-v1", 2, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo-v1"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
if err != nil {
t.Fatalf("unexpected error occurred = %v while scaling the resource", err)
@ -127,11 +122,11 @@ func TestReplicationControllerScale(t *testing.T) {
func TestReplicationControllerScaleFailsPreconditions(t *testing.T) {
scaleClientExpectedAction := []string{"get"}
scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 10, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "", Resource: "replicationcontrollers"})
if err == nil {
t.Fatal("expected to get an error but none was returned")
}
@ -146,281 +141,19 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) {
}
}
type errorJobs struct {
batchclient.JobInterface
conflict bool
invalid bool
}
func (c *errorJobs) Update(job *batch.Job) (*batch.Job, error) {
switch {
case c.invalid:
return nil, kerrors.NewInvalid(api.Kind(job.Kind), job.Name, nil)
case c.conflict:
return nil, kerrors.NewConflict(api.Resource(job.Kind), job.Name, nil)
}
return nil, errors.New("Job update failure")
}
func (c *errorJobs) Get(name string, options metav1.GetOptions) (*batch.Job, error) {
zero := int32(0)
return &batch.Job{
Spec: batch.JobSpec{
Parallelism: &zero,
},
}, nil
}
type errorJobClient struct {
batchclient.JobsGetter
conflict bool
invalid bool
}
func (c *errorJobClient) Jobs(namespace string) batchclient.JobInterface {
return &errorJobs{
JobInterface: c.JobsGetter.Jobs(namespace),
conflict: c.conflict,
invalid: c.invalid,
}
}
func TestJobScaleRetry(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), conflict: true}
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{})
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err != nil {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = ScalePrecondition{3, ""}
scaleFunc = ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
}
}
func job() *batch.Job {
return &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
}
}
func TestJobScale(t *testing.T) {
fakeClientset := fake.NewSimpleClientset(job())
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fakeClientset.Batch(), nil, schema.GroupResource{})
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fakeClientset.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v, expected 2 actions (get, update)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
if action, ok := actions[1].(testcore.UpdateAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || *action.GetObject().(*batch.Job).Spec.Parallelism != int32(count) {
t.Errorf("unexpected action %v, expected update-job with parallelism = %d", actions[1], count)
}
}
func TestJobScaleInvalid(t *testing.T) {
fake := &errorJobClient{JobsGetter: fake.NewSimpleClientset().Batch(), invalid: true}
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake, nil, schema.GroupResource{})
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
}
if err == nil {
t.Errorf("Expected error on invalid update failure, got %v", err)
}
}
func TestJobScaleFailsPreconditions(t *testing.T) {
ten := int32(10)
fake := fake.NewSimpleClientset(&batch.Job{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
})
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, fake.Batch(), nil, schema.GroupResource{})
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
scaler.Scale("default", name, count, &preconditions, nil, nil)
actions := fake.Actions()
if len(actions) != 1 {
t.Errorf("unexpected actions: %v, expected 1 actions (get)", actions)
}
if action, ok := actions[0].(testcore.GetAction); !ok || action.GetResource().GroupResource() != batch.Resource("jobs") || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
}
func TestValidateJob(t *testing.T) {
zero, ten, twenty := int32(0), int32(10), int32(20)
tests := []struct {
preconditions ScalePrecondition
job batch.Job
expectError bool
test string
}{
{
preconditions: ScalePrecondition{-1, ""},
expectError: false,
test: "defaults",
},
{
preconditions: ScalePrecondition{-1, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "defaults 2",
},
{
preconditions: ScalePrecondition{0, ""},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &zero,
},
},
expectError: false,
test: "size matches",
},
{
preconditions: ScalePrecondition{-1, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "resource version matches",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: false,
test: "both match",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "size different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "foo",
},
},
expectError: true,
test: "parallelism nil",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &ten,
},
},
expectError: true,
test: "version different",
},
{
preconditions: ScalePrecondition{10, "foo"},
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "bar",
},
Spec: batch.JobSpec{
Parallelism: &twenty,
},
},
expectError: true,
test: "both different",
},
}
for _, test := range tests {
err := test.preconditions.ValidateJob(&test.job)
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
if err == nil && test.expectError {
t.Errorf("expected an error: %v (%s)", err, test.test)
}
}
}
func TestDeploymentScaleRetry(t *testing.T) {
verbsOnError := map[string]*kerrors.StatusError{
"update": kerrors.NewConflict(api.Resource("Status"), "foo", nil),
}
scaleClientExpectedAction := []string{"get", "update", "get"}
scaleClient := createFakeScaleClient("deployments", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"})
scaler := NewScaler(scaleClient)
preconditions := &ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"})
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -429,7 +162,7 @@ func TestDeploymentScaleRetry(t *testing.T) {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = &ScalePrecondition{3, ""}
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"})
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
@ -448,11 +181,11 @@ func TestDeploymentScaleRetry(t *testing.T) {
func TestDeploymentScale(t *testing.T) {
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("deployments", "foo", 2, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "deployments"})
if err != nil {
t.Fatal(err)
}
@ -473,13 +206,13 @@ func TestDeploymentScaleInvalid(t *testing.T) {
"update": kerrors.NewInvalid(api.Kind("Status"), "foo", nil),
}
scaleClient := createFakeScaleClient("deployments", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "deployments"})
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -501,11 +234,11 @@ func TestDeploymentScaleInvalid(t *testing.T) {
func TestDeploymentScaleFailsPreconditions(t *testing.T) {
scaleClientExpectedAction := []string{"get"}
scaleClient := createFakeScaleClient("deployments", "foo", 10, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "deployments"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "deployments"})
if err == nil {
t.Fatal("exptected to get an error but none was returned")
}
@ -523,11 +256,11 @@ func TestDeploymentScaleFailsPreconditions(t *testing.T) {
func TestStatefulSetScale(t *testing.T) {
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("statefulsets", "foo", 2, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefullset"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "statefullset"})
if err != nil {
t.Fatal(err)
}
@ -548,13 +281,13 @@ func TestStatefulSetScaleRetry(t *testing.T) {
"update": kerrors.NewConflict(api.Resource("Status"), "foo", nil),
}
scaleClient := createFakeScaleClient("statefulsets", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
scaler := NewScaler(scaleClient)
preconditions := &ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -563,7 +296,7 @@ func TestStatefulSetScaleRetry(t *testing.T) {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = &ScalePrecondition{3, ""}
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
@ -585,13 +318,13 @@ func TestStatefulSetScaleInvalid(t *testing.T) {
"update": kerrors.NewInvalid(api.Kind("Status"), "foo", nil),
}
scaleClient := createFakeScaleClient("statefulsets", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -613,11 +346,11 @@ func TestStatefulSetScaleInvalid(t *testing.T) {
func TestStatefulSetScaleFailsPreconditions(t *testing.T) {
scaleClientExpectedAction := []string{"get"}
scaleClient := createFakeScaleClient("statefulsets", "foo", 10, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "apps", Resource: "statefulsets"})
if err == nil {
t.Fatal("expected to get an error but none was returned")
}
@ -635,11 +368,11 @@ func TestStatefulSetScaleFailsPreconditions(t *testing.T) {
func TestReplicaSetScale(t *testing.T) {
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("replicasets", "foo", 10, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
if err != nil {
t.Fatal(err)
}
@ -660,13 +393,13 @@ func TestReplicaSetScaleRetry(t *testing.T) {
}
scaleClientExpectedAction := []string{"get", "update", "get"}
scaleClient := createFakeScaleClient("replicasets", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
scaler := NewScaler(scaleClient)
preconditions := &ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
pass, err := scaleFunc()
if pass != false {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -675,7 +408,7 @@ func TestReplicaSetScaleRetry(t *testing.T) {
t.Errorf("Did not expect an error on update failure, got %v", err)
}
preconditions = &ScalePrecondition{3, ""}
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil)
scaleFunc = ScaleCondition(scaler, preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
pass, err = scaleFunc()
if err == nil {
t.Error("Expected error on precondition failure")
@ -697,13 +430,13 @@ func TestReplicaSetScaleInvalid(t *testing.T) {
}
scaleClientExpectedAction := []string{"get", "update"}
scaleClient := createFakeScaleClient("replicasets", "foo", 2, verbsOnError)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{-1, ""}
count := uint(3)
name := "foo"
namespace := "default"
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil)
scaleFunc := ScaleCondition(scaler, &preconditions, namespace, name, count, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
pass, err := scaleFunc()
if pass {
t.Errorf("Expected an update failure to return pass = false, got pass = %v", pass)
@ -725,11 +458,11 @@ func TestReplicaSetScaleInvalid(t *testing.T) {
func TestReplicaSetsGetterFailsPreconditions(t *testing.T) {
scaleClientExpectedAction := []string{"get"}
scaleClient := createFakeScaleClient("replicasets", "foo", 10, nil)
scaler := NewScaler(scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
scaler := NewScaler(scaleClient)
preconditions := ScalePrecondition{2, ""}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, &preconditions, nil, nil)
err := scaler.Scale("default", name, count, &preconditions, nil, nil, schema.GroupResource{Group: "extensions", Resource: "replicasets"})
if err == nil {
t.Fatal("expected to get an error but non was returned")
}
@ -812,9 +545,9 @@ func TestGenericScaleSimple(t *testing.T) {
// act
for index, scenario := range scenarios {
t.Run(fmt.Sprintf("running scenario %d: %s", index+1, scenario.name), func(t *testing.T) {
target := NewScaler(scenario.scaleGetter, scenario.targetGR)
target := NewScaler(scenario.scaleGetter)
resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize))
resVersion, err := target.ScaleSimple("default", scenario.resName, &scenario.precondition, uint(scenario.newSize), scenario.targetGR)
if scenario.expectError && err == nil {
t.Fatal("expected an error but was not returned")
@ -880,9 +613,9 @@ func TestGenericScale(t *testing.T) {
// act
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
target := NewScaler(scenario.scaleGetter, scenario.targetGR)
target := NewScaler(scenario.scaleGetter)
err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas)
err := target.Scale("default", scenario.resName, uint(scenario.newSize), &scenario.precondition, nil, scenario.waitForReplicas, scenario.targetGR)
if scenario.expectError && err == nil {
t.Fatal("expected an error but was not returned")

View File

@ -258,7 +258,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() {
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
// to the same size achieves this, because the scale operation advances the RC's sequence number
// and awaits it to be observed and reported back in the RC's status.
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true)
framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true)
// Only check the keys, the pods can be different if the kubelet updated it.
// TODO: Can it really?
@ -289,9 +289,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() {
restarter.kill()
// This is best effort to try and create pods while the scheduler is down,
// since we don't know exactly when it is restarted after the kill signal.
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
restarter.waitUp()
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
})
It("Kubelet should not restart containers across restart", func() {

View File

@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
Expect(err).NotTo(HaveOccurred())
By("scaling rethinkdb")
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true)
framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, "rethinkdb-rc", 2, true)
checkDbInstances()
By("starting admin")
@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
Expect(err).NotTo(HaveOccurred())
By("scaling hazelcast")
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true)
framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, "hazelcast", 2, true)
forEachPod("name", "hazelcast", func(pod v1.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())

View File

@ -31,7 +31,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
testutils "k8s.io/kubernetes/test/utils"
)
@ -179,8 +178,8 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er
return err
}
func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments"))
func ScaleDeployment(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments"))
}
func RunDeployment(config testutils.DeploymentConfig) error {

View File

@ -85,9 +85,7 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str
// ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till
// none are running, otherwise it does what a synchronous scale operation would do.
//TODO(p0lyn0mial): remove internalClientset.
//TODO(p0lyn0mial): update the callers.
func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error {
func ScaleRCByLabels(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error {
listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()}
rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts)
if err != nil {
@ -99,7 +97,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl
Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas)
for _, labelRC := range rcs.Items {
name := labelRC.Name
if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil {
if err := ScaleRC(clientset, scalesGetter, ns, name, replicas, false); err != nil {
return err
}
rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{})
@ -159,8 +157,8 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl
return DeleteResourceAndPods(clientset, internalClientset, scaleClient, api.Kind("ReplicationController"), ns, name)
}
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers"))
func ScaleRC(clientset clientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers"))
}
func RunRC(config testutils.RCConfig) error {

View File

@ -2803,7 +2803,6 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) {
func ScaleResource(
clientset clientset.Interface,
internalClientset internalclientset.Interface,
scalesGetter scaleclient.ScalesGetter,
ns, name string,
size uint,
@ -2812,8 +2811,8 @@ func ScaleResource(
gr schema.GroupResource,
) error {
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
scaler := kubectl.ScalerFor(kind, internalClientset.Batch(), scalesGetter, gr)
if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size); err != nil {
scaler := kubectl.NewScaler(scalesGetter)
if err := testutils.ScaleResourceWithRetries(scaler, ns, name, size, gr); err != nil {
return fmt.Errorf("error while scaling RC %s to %d replicas: %v", name, size, err)
}
if !wait {

View File

@ -1291,7 +1291,7 @@ var _ = SIGDescribe("Services", func() {
}
By("Scaling down replication controller to zero")
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false)
framework.ScaleRC(f.ClientSet, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false)
By("Update service to not tolerate unready services")
_, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {

View File

@ -649,7 +649,6 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling
newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2)
framework.ExpectNoError(framework.ScaleResource(
config.GetClient(),
config.GetInternalClient(),
config.GetScalesGetter(),
config.GetNamespace(),
config.GetName(),

View File

@ -156,7 +156,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
By("Trying to schedule another equivalent Pod should fail due to node label has been removed.")
// use scale to create another equivalent pod and wait for failure event
WaitForSchedulerAfterAction(f, func() error {
err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false)
err := framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false)
return err
}, ns, affinityRCName, false)
// and this new pod should be rejected since node label has been updated

View File

@ -195,7 +195,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1))
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true)
framework.ScaleRC(f.ClientSet, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true)
testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: "name=scheduler-priority-avoid-pod",
})

View File

@ -69,8 +69,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() {
deployment := deployments.Items[0]
replicas := uint(*(deployment.Spec.Replicas))
err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true)
defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true))
err = framework.ScaleDeployment(f.ClientSet, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true)
defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true))
framework.ExpectNoError(err)
})
@ -81,7 +81,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error {
replicas := millicores / 100
reserveCpu(f, id, 1, 100)
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false))
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) {
pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels)

View File

@ -20,6 +20,7 @@ import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/kubectl"
)
@ -32,10 +33,10 @@ const (
waitRetryTImeout = 5 * time.Minute
)
func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint) error {
func ScaleResourceWithRetries(scaler kubectl.Scaler, namespace, name string, size uint, gr schema.GroupResource) error {
waitForScale := kubectl.NewRetryParams(updateRetryInterval, updateRetryTimeout)
waitForReplicas := kubectl.NewRetryParams(waitRetryInterval, waitRetryTImeout)
if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas); err != nil {
if err := scaler.Scale(namespace, name, size, nil, waitForScale, waitForReplicas, gr); err != nil {
return fmt.Errorf("Error while scaling %s to %d replicas: %v", name, size, err)
}
return nil