removes custom scalers from kubectl

pull/8/head
p0lyn0mial 2018-02-26 21:23:33 +01:00
parent 9bd4f12c33
commit 1f1d24005a
46 changed files with 685 additions and 1445 deletions

View File

@ -40,20 +40,16 @@ go_test(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/api/testing:go_default_library",
"//pkg/apis/apps:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/apps/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/batch/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion:go_default_library",
"//pkg/kubectl/util:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/k8s.io/api/apps/v1beta1:go_default_library",
"//vendor/k8s.io/api/apps/v1beta2:go_default_library",
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/batch/v1beta1:go_default_library",
@ -66,24 +62,20 @@ go_test(
"//vendor/k8s.io/api/scheduling/v1alpha1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/discovery/fake:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/scale/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],

View File

@ -146,6 +146,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/portforward:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
@ -234,6 +235,7 @@ go_test(
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/gopkg.in/yaml.v2:go_default_library",
"//vendor/k8s.io/api/autoscaling/v1:go_default_library",
"//vendor/k8s.io/api/batch/v1:go_default_library",
"//vendor/k8s.io/api/batch/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -260,6 +262,7 @@ go_test(
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/client-go/scale/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
scaleclient "k8s.io/client-go/scale"
oapi "k8s.io/kube-openapi/pkg/util/proto"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -324,6 +325,10 @@ func RunApply(f cmdutil.Factory, cmd *cobra.Command, out, errOut io.Writer, opti
if _, ok := annotationMap[api.LastAppliedConfigAnnotation]; !ok {
fmt.Fprintf(errOut, warningNoLastAppliedConfigAnnotation, options.cmdBaseName)
}
scaler, err := f.ScaleClient()
if err != nil {
return err
}
helper := resource.NewHelper(info.Client, info.Mapping)
patcher := &patcher{
encoder: encoder,
@ -339,6 +344,7 @@ func RunApply(f cmdutil.Factory, cmd *cobra.Command, out, errOut io.Writer, opti
timeout: options.Timeout,
gracePeriod: options.GracePeriod,
openapiSchema: openapiSchema,
scaleClient: scaler,
}
patchBytes, patchedObject, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name, errOut)
@ -504,6 +510,10 @@ func (p *pruner) prune(f cmdutil.Factory, namespace string, mapping *meta.RESTMa
if err != nil {
return err
}
scaler, err := f.ScaleClient()
if err != nil {
return err
}
for _, obj := range objs {
annots, err := mapping.MetadataAccessor.Annotations(obj)
@ -527,7 +537,7 @@ func (p *pruner) prune(f cmdutil.Factory, namespace string, mapping *meta.RESTMa
return err
}
if !p.dryRun {
if err := p.delete(namespace, name, mapping); err != nil {
if err := p.delete(namespace, name, mapping, scaler); err != nil {
return err
}
}
@ -536,16 +546,16 @@ func (p *pruner) prune(f cmdutil.Factory, namespace string, mapping *meta.RESTMa
return nil
}
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping) error {
func (p *pruner) delete(namespace, name string, mapping *meta.RESTMapping, scaleClient scaleclient.ScalesGetter) error {
c, err := p.clientFunc(mapping)
if err != nil {
return err
}
return runDelete(namespace, name, mapping, c, nil, p.cascade, p.gracePeriod, p.clientsetFunc)
return runDelete(namespace, name, mapping, c, nil, p.cascade, p.gracePeriod, p.clientsetFunc, scaleClient)
}
func runDelete(namespace, name string, mapping *meta.RESTMapping, c resource.RESTClient, helper *resource.Helper, cascade bool, gracePeriod int, clientsetFunc func() (internalclientset.Interface, error)) error {
func runDelete(namespace, name string, mapping *meta.RESTMapping, c resource.RESTClient, helper *resource.Helper, cascade bool, gracePeriod int, clientsetFunc func() (internalclientset.Interface, error), scaleClient scaleclient.ScalesGetter) error {
if !cascade {
if helper == nil {
helper = resource.NewHelper(c, mapping)
@ -556,7 +566,7 @@ func runDelete(namespace, name string, mapping *meta.RESTMapping, c resource.RES
if err != nil {
return err
}
r, err := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), cs)
r, err := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), cs, scaleClient)
if err != nil {
if _, ok := err.(*kubectl.NoSuchReaperError); !ok {
return err
@ -578,7 +588,7 @@ func (p *patcher) delete(namespace, name string) error {
if err != nil {
return err
}
return runDelete(namespace, name, p.mapping, c, p.helper, p.cascade, p.gracePeriod, p.clientsetFunc)
return runDelete(namespace, name, p.mapping, c, p.helper, p.cascade, p.gracePeriod, p.clientsetFunc, p.scaleClient)
}
type patcher struct {
@ -599,6 +609,7 @@ type patcher struct {
gracePeriod int
openapiSchema openapi.Resources
scaleClient scaleclient.ScalesGetter
}
func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {

View File

@ -31,14 +31,18 @@ import (
"github.com/spf13/cobra"
autoscalingv1 "k8s.io/api/autoscaling/v1"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
sptest "k8s.io/apimachinery/pkg/util/strategicpatch/testing"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testapi"
api "k8s.io/kubernetes/pkg/apis/core"
@ -1190,14 +1194,14 @@ func TestForceApply(t *testing.T) {
pathRC := "/namespaces/test/replicationcontrollers/" + nameRC
pathRCList := "/namespaces/test/replicationcontrollers"
expected := map[string]int{
"getOk": 10,
"getOk": 7,
"getNotFound": 1,
"getList": 1,
"patch": 6,
"delete": 1,
"put": 1,
"post": 1,
}
scaleClientExpected := []string{"get", "update", "get", "get"}
for _, fn := range testingOpenAPISchemaFns {
t.Run("test apply with --force", func(t *testing.T) {
@ -1277,10 +1281,48 @@ func TestForceApply(t *testing.T) {
}
}),
}
newReplicas := int32(3)
scaleClient := &fakescale.FakeScaleClient{}
scaleClient.AddReactor("get", "replicationcontrollers", func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.GetAction)
if action.GetName() != "test-rc" {
return true, nil, fmt.Errorf("expected = test-rc, got = %s", action.GetName())
}
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: action.GetName(),
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}
return true, obj, nil
})
scaleClient.AddReactor("update", "replicationcontrollers", func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.UpdateAction)
obj := action.GetObject().(*autoscalingv1.Scale)
if obj.Name != "test-rc" {
return true, nil, fmt.Errorf("expected = test-rc, got = %s", obj.Name)
}
newReplicas = obj.Spec.Replicas
return true, &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}, nil
})
tf.ScaleGetter = scaleClient
tf.OpenAPISchemaFunc = fn
tf.Client = tf.UnstructuredClient
tf.ClientConfigVal = &restclient.Config{}
tf.Namespace = "test"
buf := bytes.NewBuffer([]byte{})
errBuf := bytes.NewBuffer([]byte{})
@ -1302,6 +1344,22 @@ func TestForceApply(t *testing.T) {
if errBuf.String() != "" {
t.Fatalf("unexpected error output: %s", errBuf.String())
}
scale, err := scaleClient.Scales(tf.Namespace).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, nameRC)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
if len(scaleClient.Actions()) != len(scaleClientExpected) {
t.Fatalf("a fake scale client has unexpected amout of API calls, wanted = %d, got = %d", len(scaleClientExpected), len(scaleClient.Actions()))
}
for index, action := range scaleClient.Actions() {
if scaleClientExpected[index] != action.GetVerb() {
t.Errorf("unexpected API method called on a fake scale client, wanted = %s, got = %s at index = %d", scaleClientExpected[index], action.GetVerb(), index)
}
}
})
}
}

View File

@ -293,7 +293,12 @@ func RunRollingUpdate(f cmdutil.Factory, out io.Writer, cmd *cobra.Command, args
filename, oldName)
}
updater := kubectl.NewRollingUpdater(newRc.Namespace, coreClient, coreClient)
scalesGetter, err := f.ScaleClient()
if err != nil {
return err
}
updater := kubectl.NewRollingUpdater(newRc.Namespace, coreClient, coreClient, scalesGetter)
// To successfully pull off a rolling update the new and old rc have to differ
// by at least one selector. Every new pod should have the selector and every

View File

@ -1,7 +1,4 @@
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
@ -10,9 +7,7 @@ go_library(
"zz_generated.deepcopy.go",
],
importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/testing",
visibility = [
"//build/visible_to:pkg_kubectl_cmd_testing_CONSUMERS",
],
visibility = ["//build/visible_to:pkg_kubectl_cmd_testing_CONSUMERS"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
@ -37,6 +32,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
],
@ -46,13 +42,12 @@ filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//build/visible_to:pkg_kubectl_cmd_testing_CONSUMERS"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = [
"//build/visible_to:pkg_kubectl_cmd_testing_CONSUMERS",
],
visibility = ["//build/visible_to:pkg_kubectl_cmd_testing_CONSUMERS"],
)

View File

@ -37,6 +37,7 @@ import (
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -239,6 +240,7 @@ type TestFactory struct {
cmdutil.Factory
Client kubectl.RESTClient
ScaleGetter scaleclient.ScalesGetter
UnstructuredClient kubectl.RESTClient
DescriberVal printers.Describer
Namespace string
@ -483,6 +485,10 @@ func (f *TestFactory) LogsForObject(object, options runtime.Object, timeout time
}
}
func (f *TestFactory) ScaleClient() (scaleclient.ScalesGetter, error) {
return f.ScaleGetter, nil
}
func testDynamicResources() []*discovery.APIGroupResources {
return []*discovery.APIGroupResources{
{

View File

@ -1,8 +1,4 @@
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -18,9 +14,7 @@ go_library(
"shortcut_restmapper.go",
],
importpath = "k8s.io/kubernetes/pkg/kubectl/cmd/util",
visibility = [
"//build/visible_to:pkg_kubectl_cmd_util_CONSUMERS",
],
visibility = ["//build/visible_to:pkg_kubectl_cmd_util_CONSUMERS"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/apps:go_default_library",
@ -93,13 +87,7 @@ go_test(
"helpers_test.go",
"shortcut_restmapper_test.go",
],
data = [
"//api/swagger-spec",
],
embed = [":go_default_library"],
visibility = [
"//build/visible_to:COMMON_testing",
],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
@ -144,6 +132,7 @@ filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
api "k8s.io/kubernetes/pkg/apis/core"
apiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
@ -178,10 +179,6 @@ type ObjectMappingFactory interface {
// LogsForObject returns a request for the logs associated with the provided object
LogsForObject(object, options runtime.Object, timeout time.Duration) (*restclient.Request, error)
// Returns a Scaler for changing the size of the specified RESTMapping type or an error
Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
// Returns a Reaper for gracefully shutting down resources.
Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error)
// Returns a HistoryViewer for viewing change history
HistoryViewer(mapping *meta.RESTMapping) (kubectl.HistoryViewer, error)
// Returns a Rollbacker for changing the rollback version of the specified RESTMapping type or an error
@ -213,6 +210,12 @@ type BuilderFactory interface {
PluginLoader() plugins.PluginLoader
// PluginRunner provides the implementation to be used to run cli plugins.
PluginRunner() plugins.PluginRunner
// Returns a Scaler for changing the size of the specified RESTMapping type or an error
Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
// ScaleClient gives you back scale getter
ScaleClient() (scaleclient.ScalesGetter, error)
// Returns a Reaper for gracefully shutting down resources.
Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error)
}
type factory struct {

View File

@ -21,7 +21,11 @@ package util
import (
"os"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/plugins"
"k8s.io/kubernetes/pkg/kubectl/resource"
)
@ -84,3 +88,49 @@ func (f *ring2Factory) PluginLoader() plugins.PluginLoader {
func (f *ring2Factory) PluginRunner() plugins.PluginRunner {
return &plugins.ExecPluginRunner{}
}
func (f *ring2Factory) ScaleClient() (scaleclient.ScalesGetter, error) {
discoClient, err := f.clientAccessFactory.DiscoveryClient()
if err != nil {
return nil, err
}
restClient, err := f.clientAccessFactory.RESTClient()
if err != nil {
return nil, err
}
resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient)
mapper, _ := f.objectMappingFactory.Object()
return scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver), nil
}
func (f *ring2Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) {
clientset, err := f.clientAccessFactory.ClientSet()
if err != nil {
return nil, err
}
scalesGetter, err := f.ScaleClient()
if err != nil {
return nil, err
}
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset.Batch(), scalesGetter, gvk.GroupResource()), nil
}
func (f *ring2Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {
clientset, clientsetErr := f.clientAccessFactory.ClientSet()
if clientsetErr != nil {
return nil, clientsetErr
}
scaler, err := f.ScaleClient()
if err != nil {
return nil, err
}
reaper, reaperErr := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), clientset, scaler)
if kubectl.IsNoSuchReaperError(reaperErr) {
return nil, reaperErr
}
return reaper, reaperErr
}

View File

@ -37,7 +37,6 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
@ -278,43 +277,6 @@ func (f *ring1Factory) LogsForObject(object, options runtime.Object, timeout tim
return clientset.Core().Pods(pod.Namespace).GetLogs(pod.Name, opts), nil
}
func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) {
clientset, err := f.clientAccessFactory.ClientSet()
if err != nil {
return nil, err
}
// create scales getter
// TODO(p0lyn0mial): put scalesGetter to a factory
discoClient, err := f.clientAccessFactory.DiscoveryClient()
if err != nil {
return nil, err
}
restClient, err := f.clientAccessFactory.RESTClient()
if err != nil {
return nil, err
}
mapper, _ := f.Object()
resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient)
scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset.Batch(), scalesGetter, gvk.GroupResource()), nil
}
func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {
clientset, clientsetErr := f.clientAccessFactory.ClientSet()
reaper, reaperErr := kubectl.ReaperFor(mapping.GroupVersionKind.GroupKind(), clientset)
if kubectl.IsNoSuchReaperError(reaperErr) {
return nil, reaperErr
}
if clientsetErr != nil {
return nil, clientsetErr
}
return reaper, reaperErr
}
func (f *ring1Factory) HistoryViewer(mapping *meta.RESTMapping) (kubectl.HistoryViewer, error) {
external, err := f.clientAccessFactory.KubernetesClientSet()
if err != nil {

View File

@ -28,6 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
@ -67,13 +68,13 @@ func IsNoSuchReaperError(err error) bool {
return ok
}
func ReaperFor(kind schema.GroupKind, c internalclientset.Interface) (Reaper, error) {
func ReaperFor(kind schema.GroupKind, c internalclientset.Interface, sc scaleclient.ScalesGetter) (Reaper, error) {
switch kind {
case api.Kind("ReplicationController"):
return &ReplicationControllerReaper{c.Core(), Interval, Timeout}, nil
return &ReplicationControllerReaper{c.Core(), Interval, Timeout, sc}, nil
case extensions.Kind("ReplicaSet"), apps.Kind("ReplicaSet"):
return &ReplicaSetReaper{c.Extensions(), Interval, Timeout}, nil
return &ReplicaSetReaper{c.Extensions(), Interval, Timeout, sc, schema.GroupResource{Group: kind.Group, Resource: "replicasets"}}, nil
case extensions.Kind("DaemonSet"), apps.Kind("DaemonSet"):
return &DaemonSetReaper{c.Extensions(), Interval, Timeout}, nil
@ -85,26 +86,29 @@ func ReaperFor(kind schema.GroupKind, c internalclientset.Interface) (Reaper, er
return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil
case apps.Kind("StatefulSet"):
return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout}, nil
return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout, sc}, nil
case extensions.Kind("Deployment"), apps.Kind("Deployment"):
return &DeploymentReaper{c.Extensions(), c.Extensions(), Interval, Timeout}, nil
return &DeploymentReaper{c.Extensions(), c.Extensions(), Interval, Timeout, sc, schema.GroupResource{Group: kind.Group, Resource: "deployments"}}, nil
}
return nil, &NoSuchReaperError{kind}
}
func ReaperForReplicationController(rcClient coreclient.ReplicationControllersGetter, timeout time.Duration) (Reaper, error) {
return &ReplicationControllerReaper{rcClient, Interval, timeout}, nil
func ReaperForReplicationController(rcClient coreclient.ReplicationControllersGetter, scaleClient scaleclient.ScalesGetter, timeout time.Duration) (Reaper, error) {
return &ReplicationControllerReaper{rcClient, Interval, timeout, scaleClient}, nil
}
type ReplicationControllerReaper struct {
client coreclient.ReplicationControllersGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
}
type ReplicaSetReaper struct {
client extensionsclient.ReplicaSetsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
gr schema.GroupResource
}
type DaemonSetReaper struct {
client extensionsclient.DaemonSetsGetter
@ -119,6 +123,8 @@ type DeploymentReaper struct {
dClient extensionsclient.DeploymentsGetter
rsClient extensionsclient.ReplicaSetsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
gr schema.GroupResource
}
type PodReaper struct {
client coreclient.PodsGetter
@ -127,6 +133,7 @@ type StatefulSetReaper struct {
client appsclient.StatefulSetsGetter
podClient coreclient.PodsGetter
pollInterval, timeout time.Duration
scaleClient scaleclient.ScalesGetter
}
// getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
@ -148,7 +155,7 @@ func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterfac
func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rc := reaper.client.ReplicationControllers(namespace)
scaler := &ReplicationControllerScaler{reaper.client}
scaler := NewScaler(reaper.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"})
ctrl, err := rc.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -217,7 +224,7 @@ func getOverlappingReplicaSets(c extensionsclient.ReplicaSetInterface, rs *exten
func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
rsc := reaper.client.ReplicaSets(namespace)
scaler := &ReplicaSetScaler{reaper.client}
scaler := NewScaler(reaper.scaleClient, reaper.gr)
rs, err := rsc.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -318,7 +325,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
statefulsets := reaper.client.StatefulSets(namespace)
scaler := &StatefulSetScaler{reaper.client}
scaler := NewScaler(reaper.scaleClient, apps.Resource("statefulsets"))
ss, err := statefulsets.Get(name, metav1.GetOptions{})
if err != nil {
return err
@ -391,7 +398,7 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
deployments := reaper.dClient.Deployments(namespace)
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout}
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout, reaper.scaleClient, schema.GroupResource{Group: reaper.gr.Group, Resource: "replicasets"}}
deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
// set deployment's history and scale to 0

View File

@ -23,12 +23,15 @@ import (
"testing"
"time"
autoscalingv1 "k8s.io/api/autoscaling/v1"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
fakescale "k8s.io/client-go/scale/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
@ -43,8 +46,10 @@ func TestReplicationControllerStop(t *testing.T) {
tests := []struct {
Name string
Objs []runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "OnlyOneRC",
@ -63,8 +68,10 @@ func TestReplicationControllerStop(t *testing.T) {
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"},
ExpectedActions: []string{"get", "list", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "NoOverlapping",
@ -92,8 +99,10 @@ func TestReplicationControllerStop(t *testing.T) {
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "list", "get", "update", "get", "get", "delete"},
ExpectedActions: []string{"get", "list", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "OverlappingError",
@ -122,10 +131,10 @@ func TestReplicationControllerStop(t *testing.T) {
},
},
},
ScaledDown: false, // scale resource was not scaled down due to overlapping controllers
StopError: fmt.Errorf("Detected overlapping controllers for rc foo: baz, please manage deletion individually with --cascade=false."),
ExpectedActions: []string{"get", "list"},
},
{
Name: "OverlappingButSafeDelete",
Objs: []runtime.Object{
@ -162,7 +171,7 @@ func TestReplicationControllerStop(t *testing.T) {
},
},
},
ScaledDown: false, // scale resource was not scaled down due to overlapping controllers
StopError: fmt.Errorf("Detected overlapping controllers for rc foo: baz,zaz, please manage deletion individually with --cascade=false."),
ExpectedActions: []string{"get", "list"},
},
@ -194,7 +203,7 @@ func TestReplicationControllerStop(t *testing.T) {
},
},
},
ScaledDown: false, // scale resource was not scaled down because there is still an additional replica
StopError: nil,
ExpectedActions: []string{"get", "list", "delete"},
},
@ -202,6 +211,7 @@ func TestReplicationControllerStop(t *testing.T) {
for _, test := range tests {
copiedForWatch := test.Objs[0].DeepCopyObject()
scaleClient := createFakeScaleClient("replicationcontrollers", "foo", 3, nil)
fake := fake.NewSimpleClientset(test.Objs...)
fakeWatch := watch.NewFake()
fake.PrependWatchReactor("replicationcontrollers", testcore.DefaultWatchReactor(fakeWatch, nil))
@ -210,7 +220,7 @@ func TestReplicationControllerStop(t *testing.T) {
fakeWatch.Add(copiedForWatch)
}()
reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond}
reaper := ReplicationControllerReaper{fake.Core(), time.Millisecond, time.Millisecond, scaleClient}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
@ -230,6 +240,24 @@ func TestReplicationControllerStop(t *testing.T) {
t.Errorf("%s unexpected action: %+v, expected %s-replicationController", test.Name, actions[i], verb)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "", Resource: "replicationcontrollers"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
}
}
}
}
}
@ -239,13 +267,20 @@ func TestReplicaSetStop(t *testing.T) {
tests := []struct {
Name string
Objs []runtime.Object
DiscoveryResources []*metav1.APIResourceList
PathsResources map[string]runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "OnlyOneRS",
Objs: []runtime.Object{
&extensions.ReplicaSetList{ // LIST
TypeMeta: metav1.TypeMeta{
APIVersion: extensions.SchemeGroupVersion.String(),
},
Items: []extensions.ReplicaSet{
{
ObjectMeta: metav1.ObjectMeta{
@ -260,8 +295,10 @@ func TestReplicaSetStop(t *testing.T) {
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"},
ExpectedActions: []string{"get", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
{
Name: "NoOverlapping",
@ -291,8 +328,10 @@ func TestReplicaSetStop(t *testing.T) {
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"},
ExpectedActions: []string{"get", "delete"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
// TODO: Implement tests for overlapping replica sets, similar to replication controllers,
// when the overlapping checks are implemented for replica sets.
@ -300,7 +339,9 @@ func TestReplicaSetStop(t *testing.T) {
for _, test := range tests {
fake := fake.NewSimpleClientset(test.Objs...)
reaper := ReplicaSetReaper{fake.Extensions(), time.Millisecond, time.Millisecond}
scaleClient := createFakeScaleClient("replicasets", "foo", 3, nil)
reaper := ReplicaSetReaper{fake.Extensions(), time.Millisecond, time.Millisecond, scaleClient, schema.GroupResource{Group: "extensions", Resource: "replicasets"}}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
@ -320,6 +361,24 @@ func TestReplicaSetStop(t *testing.T) {
t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "extensions", Resource: "replicasets"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
}
}
}
}
}
@ -441,8 +500,10 @@ func TestDeploymentStop(t *testing.T) {
tests := []struct {
Name string
Objs []runtime.Object
ScaledDown bool
StopError error
ExpectedActions []string
ScaleClientExpectedAction []string
}{
{
Name: "SimpleDeployment",
@ -510,17 +571,20 @@ func TestDeploymentStop(t *testing.T) {
},
},
},
ScaledDown: true,
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "list:replicasets", "get:replicasets",
"get:replicasets", "update:replicasets", "get:replicasets",
"get:replicasets", "delete:replicasets", "delete:deployments"},
"delete:replicasets", "delete:deployments"},
ScaleClientExpectedAction: []string{"get", "update", "get", "get"},
},
}
for _, test := range tests {
scaleClient := createFakeScaleClient("deployments", "foo", 3, nil)
fake := fake.NewSimpleClientset(test.Objs...)
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond}
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, scaleClient, schema.GroupResource{Group: "extensions", Resource: "deployments"}}
err := reaper.Stop(ns, name, 0, nil)
if !reflect.DeepEqual(err, test.StopError) {
t.Errorf("%s unexpected error: %v", test.Name, err)
@ -544,6 +608,24 @@ func TestDeploymentStop(t *testing.T) {
t.Errorf("%s unexpected subresource: %+v, expected %s", test.Name, actions[i], expAction)
}
}
if test.ScaledDown {
scale, err := scaleClient.Scales(ns).Get(schema.GroupResource{Group: "extensions", Resource: "replicaset"}, name)
if err != nil {
t.Error(err)
}
if scale.Spec.Replicas != 0 {
t.Errorf("a scale subresource has unexpected number of replicas, got %d expected 0", scale.Spec.Replicas)
}
actions := scaleClient.Actions()
if len(actions) != len(test.ScaleClientExpectedAction) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ScaleClientExpectedAction), len(actions))
}
for i, verb := range test.ScaleClientExpectedAction {
if actions[i].GetVerb() != verb {
t.Errorf("%s unexpected action: %+v, expected %s", test.Name, actions[i].GetVerb(), verb)
}
}
}
}
}
@ -637,7 +719,7 @@ func TestSimpleStop(t *testing.T) {
}
for _, test := range tests {
fake := test.fake
reaper, err := ReaperFor(test.kind, fake)
reaper, err := ReaperFor(test.kind, fake, nil)
if err != nil {
t.Errorf("unexpected error: %v (%s)", err, test.test)
}
@ -697,8 +779,59 @@ func TestDeploymentNotFoundError(t *testing.T) {
return true, nil, ScaleError{ActualError: errors.NewNotFound(api.Resource("replicaset"), "doesn't-matter")}
})
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond}
reaper := DeploymentReaper{fake.Extensions(), fake.Extensions(), time.Millisecond, time.Millisecond, nil, schema.GroupResource{}}
if err := reaper.Stop(ns, name, 0, nil); err != nil {
t.Fatalf("unexpected error: %#v", err)
}
}
func createFakeScaleClient(resource string, resourceName string, replicas int, errorsOnVerb map[string]*kerrors.StatusError) *fakescale.FakeScaleClient {
shouldReturnAnError := func(verb string) (*kerrors.StatusError, bool) {
if anError, anErrorExists := errorsOnVerb[verb]; anErrorExists {
return anError, true
}
return &kerrors.StatusError{}, false
}
newReplicas := int32(replicas)
scaleClient := &fakescale.FakeScaleClient{}
scaleClient.AddReactor("get", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.GetAction)
if action.GetName() != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, action.GetName())
}
if anError, should := shouldReturnAnError("get"); should {
return true, nil, anError
}
obj := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: action.GetName(),
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}
return true, obj, nil
})
scaleClient.AddReactor("update", resource, func(rawAction testcore.Action) (handled bool, ret runtime.Object, err error) {
action := rawAction.(testcore.UpdateAction)
obj := action.GetObject().(*autoscalingv1.Scale)
if obj.Name != resourceName {
return true, nil, fmt.Errorf("expected = %s, got = %s", resourceName, obj.Name)
}
if anError, should := shouldReturnAnError("update"); should {
return true, nil, anError
}
newReplicas = obj.Spec.Replicas
return true, &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name,
Namespace: action.GetNamespace(),
},
Spec: autoscalingv1.ScaleSpec{
Replicas: newReplicas,
},
}, nil
})
return scaleClient
}

View File

@ -28,8 +28,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/util/integer"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -116,6 +118,7 @@ const (
type RollingUpdater struct {
rcClient coreclient.ReplicationControllersGetter
podClient coreclient.PodsGetter
scaleClient scaleclient.ScalesGetter
// Namespace for resources
ns string
// scaleAndWait scales a controller and returns its updated state.
@ -132,10 +135,11 @@ type RollingUpdater struct {
}
// NewRollingUpdater creates a RollingUpdater from a client.
func NewRollingUpdater(namespace string, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter) *RollingUpdater {
func NewRollingUpdater(namespace string, rcClient coreclient.ReplicationControllersGetter, podClient coreclient.PodsGetter, sc scaleclient.ScalesGetter) *RollingUpdater {
updater := &RollingUpdater{
rcClient: rcClient,
podClient: podClient,
scaleClient: sc,
ns: namespace,
}
// Inject real implementations.
@ -396,7 +400,7 @@ func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desi
// scalerScaleAndWait scales a controller using a Scaler and a real client.
func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
scaler := &ReplicationControllerScaler{r.rcClient}
scaler := NewScaler(r.scaleClient, schema.GroupResource{Resource: "replicationcontrollers"})
if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
return nil, err
}

View File

@ -26,16 +26,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
scaleclient "k8s.io/client-go/scale"
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
)
// TODO: Figure out if we should be waiting on initializers in the Scale() functions below.
@ -61,10 +55,17 @@ func ScalerFor(kind schema.GroupKind, jobsClient batchclient.JobsGetter, scalesG
case batch.Kind("Job"):
return &jobScaler{jobsClient} // Either kind of job can be scaled with Batch interface.
default:
return &genericScaler{scalesGetter, gr}
return NewScaler(scalesGetter, gr)
}
}
// NewScaler get a scaler for a given resource
// Note that if you are trying to crate create a scaler for "job" then stop and use ScalerFor instead.
// When scaling jobs is dead, we'll remove ScalerFor method.
func NewScaler(scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) Scaler {
return &genericScaler{scalesGetter, gr}
}
// ScalePrecondition describes a condition that must be true for the scale to take place
// If CurrentSize == -1, it is ignored.
// If CurrentResourceVersion is the empty string, it is ignored.
@ -139,160 +140,6 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
}
}
// ValidateStatefulSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateStatefulSet(ps *apps.StatefulSet) error {
if precondition.Size != -1 && int(ps.Spec.Replicas) != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(ps.Spec.Replicas))}
}
if len(precondition.ResourceVersion) != 0 && ps.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, ps.ResourceVersion}
}
return nil
}
// ValidateReplicationController ensures that the preconditions match. Returns nil if they are valid, an error otherwise
func (precondition *ScalePrecondition) ValidateReplicationController(controller *api.ReplicationController) error {
if precondition.Size != -1 && int(controller.Spec.Replicas) != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(controller.Spec.Replicas))}
}
if len(precondition.ResourceVersion) != 0 && controller.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, controller.ResourceVersion}
}
return nil
}
// TODO(p0lyn0mial): remove ReplicationControllerScaler
type ReplicationControllerScaler struct {
c coreclient.ReplicationControllersGetter
}
// ScaleSimple does a simple one-shot attempt at scaling. It returns the
// resourceVersion of the replication controller if the update is successful.
func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
controller, err := scaler.c.ReplicationControllers(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", ScaleError{ScaleGetFailure, "", err}
}
if preconditions != nil {
if err := preconditions.ValidateReplicationController(controller); err != nil {
return "", err
}
}
controller.Spec.Replicas = int32(newSize)
updatedRC, err := scaler.c.ReplicationControllers(namespace).Update(controller)
if err != nil {
if errors.IsConflict(err) {
return "", ScaleError{ScaleUpdateConflictFailure, controller.ResourceVersion, err}
}
return "", ScaleError{ScaleUpdateFailure, controller.ResourceVersion, err}
}
return updatedRC.ObjectMeta.ResourceVersion, nil
}
// Scale updates a ReplicationController to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for it's replica count to reach the new value
// (if wait is not nil).
func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
rc, err := scaler.c.ReplicationControllers(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if rc.Initializers != nil {
return nil
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, ControllerHasDesiredReplicas(scaler.c, rc))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// ValidateReplicaSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise
func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions.ReplicaSet) error {
if precondition.Size != -1 && int(replicaSet.Spec.Replicas) != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(replicaSet.Spec.Replicas))}
}
if len(precondition.ResourceVersion) != 0 && replicaSet.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, replicaSet.ResourceVersion}
}
return nil
}
// TODO(p0lyn0mial): remove ReplicaSetScaler
type ReplicaSetScaler struct {
c extensionsclient.ReplicaSetsGetter
}
// ScaleSimple does a simple one-shot attempt at scaling. It returns the
// resourceVersion of the replicaset if the update is successful.
func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
rs, err := scaler.c.ReplicaSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", ScaleError{ScaleGetFailure, "", err}
}
if preconditions != nil {
if err := preconditions.ValidateReplicaSet(rs); err != nil {
return "", err
}
}
rs.Spec.Replicas = int32(newSize)
updatedRS, err := scaler.c.ReplicaSets(namespace).Update(rs)
if err != nil {
if errors.IsConflict(err) {
return "", ScaleError{ScaleUpdateConflictFailure, rs.ResourceVersion, err}
}
return "", ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err}
}
return updatedRS.ObjectMeta.ResourceVersion, nil
}
// Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is
// not nil), optional retries (if retry is not nil), and then optionally waits for it's replica
// count to reach the new value (if wait is not nil).
func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
rs, err := scaler.c.ReplicaSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if rs.Initializers != nil {
return nil
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, ReplicaSetHasDesiredReplicas(scaler.c, rs))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
if precondition.Size != -1 && job.Spec.Parallelism == nil {
@ -307,63 +154,6 @@ func (precondition *ScalePrecondition) ValidateJob(job *batch.Job) error {
return nil
}
// TODO(p0lyn0mial): remove StatefulSetsGetter
type StatefulSetScaler struct {
c appsclient.StatefulSetsGetter
}
// ScaleSimple does a simple one-shot attempt at scaling. It returns the
// resourceVersion of the statefulset if the update is successful.
func (scaler *StatefulSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
ss, err := scaler.c.StatefulSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", ScaleError{ScaleGetFailure, "", err}
}
if preconditions != nil {
if err := preconditions.ValidateStatefulSet(ss); err != nil {
return "", err
}
}
ss.Spec.Replicas = int32(newSize)
updatedStatefulSet, err := scaler.c.StatefulSets(namespace).Update(ss)
if err != nil {
if errors.IsConflict(err) {
return "", ScaleError{ScaleUpdateConflictFailure, ss.ResourceVersion, err}
}
return "", ScaleError{ScaleUpdateFailure, ss.ResourceVersion, err}
}
return updatedStatefulSet.ResourceVersion, nil
}
func (scaler *StatefulSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
job, err := scaler.c.StatefulSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
if job.Initializers != nil {
return nil
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, StatefulSetHasDesiredReplicas(scaler.c, job))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
type jobScaler struct {
c batchclient.JobsGetter
}
@ -421,80 +211,8 @@ func (scaler *jobScaler) Scale(namespace, name string, newSize uint, preconditio
return nil
}
// ValidateDeployment ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
func (precondition *ScalePrecondition) ValidateDeployment(deployment *extensions.Deployment) error {
if precondition.Size != -1 && int(deployment.Spec.Replicas) != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(deployment.Spec.Replicas))}
}
if len(precondition.ResourceVersion) != 0 && deployment.ResourceVersion != precondition.ResourceVersion {
return PreconditionError{"resource version", precondition.ResourceVersion, deployment.ResourceVersion}
}
return nil
}
// TODO(p0lyn0mial): remove DeploymentScaler
type DeploymentScaler struct {
c extensionsclient.DeploymentsGetter
}
// ScaleSimple is responsible for updating a deployment's desired replicas
// count. It returns the resourceVersion of the deployment if the update is
// successful.
func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (string, error) {
deployment, err := scaler.c.Deployments(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return "", ScaleError{ScaleGetFailure, "", err}
}
if preconditions != nil {
if err := preconditions.ValidateDeployment(deployment); err != nil {
return "", err
}
}
// TODO(madhusudancs): Fix this when Scale group issues are resolved (see issue #18528).
// For now I'm falling back to regular Deployment update operation.
deployment.Spec.Replicas = int32(newSize)
updatedDeployment, err := scaler.c.Deployments(namespace).Update(deployment)
if err != nil {
if errors.IsConflict(err) {
return "", ScaleError{ScaleUpdateConflictFailure, deployment.ResourceVersion, err}
}
return "", ScaleError{ScaleUpdateFailure, deployment.ResourceVersion, err}
}
return updatedDeployment.ObjectMeta.ResourceVersion, nil
}
// Scale updates a deployment to a new size, with optional precondition check (if preconditions is not nil),
// optional retries (if retry is not nil), and then optionally waits for the status to reach desired count.
func (scaler *DeploymentScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
if preconditions == nil {
preconditions = &ScalePrecondition{-1, ""}
}
if retry == nil {
// Make it try only once, immediately
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
}
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize, nil)
if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
return err
}
if waitForReplicas != nil {
deployment, err := scaler.c.Deployments(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
err = wait.PollImmediate(waitForReplicas.Interval, waitForReplicas.Timeout, DeploymentHasDesiredReplicas(scaler.c, deployment))
if err == wait.ErrWaitTimeout {
return fmt.Errorf("timed out waiting for %q to be synced", name)
}
return err
}
return nil
}
// validateGeneric ensures that the preconditions match. Returns nil if they are valid, otherwise an error
// TODO(p0lyn0mial): when the work on GenericScaler is done, rename validateGeneric to validate
func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Scale) error {
func (precondition *ScalePrecondition) validate(scale *autoscalingapi.Scale) error {
if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(scale.Spec.Replicas))}
}
@ -519,7 +237,7 @@ func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *Scale
return "", ScaleError{ScaleGetFailure, "", err}
}
if preconditions != nil {
if err := preconditions.validateGeneric(scale); err != nil {
if err := preconditions.validate(scale); err != nil {
return "", err
}
}

File diff suppressed because it is too large Load Diff

View File

@ -196,7 +196,10 @@ func (c *namespacedScaleClient) Update(resource schema.GroupResource, scale *aut
Body(scaleUpdateBytes).
Do()
if err := result.Error(); err != nil {
return nil, fmt.Errorf("could not update the scale for %s %s: %v", resource.String(), scale.Name, err)
// propagate "raw" error from the API
// this allows callers to interpret underlying Reason field
// for example: errors.IsConflict(err)
return nil, err
}
scaleBytes, err := result.Raw()

View File

@ -66,6 +66,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -207,7 +207,7 @@ var _ = SIGDescribe("CronJob", func() {
By("Deleting the job")
job := cronJob.Status.Active[0]
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset)
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(f.Namespace.Name, job.Name, timeout, metav1.NewDeleteOptions(0))

View File

@ -70,7 +70,7 @@ var _ = SIGDescribe("Daemon set [Serial]", func() {
if daemonsets != nil && len(daemonsets.Items) > 0 {
for _, ds := range daemonsets.Items {
By(fmt.Sprintf("Deleting DaemonSet %q with reaper", ds.Name))
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset)
dsReaper, err := kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
err = dsReaper.Stop(f.Namespace.Name, ds.Name, 0, nil)
Expect(err).NotTo(HaveOccurred())

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
@ -158,12 +159,12 @@ func newDeploymentRollback(name string, annotations map[string]string, revision
}
}
func stopDeployment(c clientset.Interface, internalClient internalclientset.Interface, ns, deploymentName string) {
func stopDeployment(c clientset.Interface, internalClient internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, deploymentName string) {
deployment, err := c.ExtensionsV1beta1().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
framework.Logf("Deleting deployment %s", deploymentName)
reaper, err := kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClient)
reaper, err := kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClient, scaleClient)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
@ -224,7 +225,7 @@ func testDeleteDeployment(f *framework.Framework) {
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c.ExtensionsV1beta1())
Expect(err).NotTo(HaveOccurred())
Expect(newRS).NotTo(Equal(nilRs))
stopDeployment(c, internalClient, ns, deploymentName)
stopDeployment(c, internalClient, f.ScalesGetter, ns, deploymentName)
}
func testRollingUpdateDeployment(f *framework.Framework) {

View File

@ -111,7 +111,7 @@ var _ = SIGDescribe("Job", func() {
Expect(err).NotTo(HaveOccurred())
By("delete a job")
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset)
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset, f.ScalesGetter)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(f.Namespace.Name, job.Name, timeout, metav1.NewDeleteOptions(0))

View File

@ -93,7 +93,7 @@ var _ = SIGDescribe("[Feature:ClusterSizeAutoscalingScaleUp] [Slow] Autoscaling"
nodeMemoryMB := (&nodeMemoryBytes).Value() / 1024 / 1024
memRequestMB := nodeMemoryMB / 10 // Ensure each pod takes not more than 10% of node's allocatable memory.
replicas := 1
resourceConsumer := common.NewDynamicResourceConsumer("resource-consumer", f.Namespace.Name, common.KindDeployment, replicas, 0, 0, 0, cpuRequestMillis, memRequestMB, f.ClientSet, f.InternalClientset)
resourceConsumer := common.NewDynamicResourceConsumer("resource-consumer", f.Namespace.Name, common.KindDeployment, replicas, 0, 0, 0, cpuRequestMillis, memRequestMB, f.ClientSet, f.InternalClientset, f.ScalesGetter)
defer resourceConsumer.CleanUp()
resourceConsumer.WaitForReplicas(replicas, 1*time.Minute) // Should finish ~immediately, so 1 minute is more than enough.

View File

@ -347,7 +347,7 @@ var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", fun
timeToWait := 5 * time.Minute
podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
framework.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, podsConfig.Name)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, podsConfig.Name)
// Ensure that no new nodes have been added so far.
Expect(framework.NumberOfReadyNodes(f.ClientSet)).To(Equal(nodeCount))
@ -417,7 +417,7 @@ func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestC
}
timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, config.extraPods.Name)
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, config.extraPods.Name)
}
}
@ -500,7 +500,7 @@ func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, p
err := framework.RunRC(*config)
framework.ExpectNoError(err)
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id)
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
}
}
@ -540,7 +540,7 @@ func distributeLoad(f *framework.Framework, namespace string, id string, podDist
framework.ExpectNoError(framework.RunRC(*rcConfig))
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id)
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
}
}

View File

@ -168,7 +168,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
It("shouldn't increase cluster size if pending pod is too large [Feature:ClusterSizeAutoscalingScaleUp]", func() {
By("Creating unschedulable pod")
ReserveMemory(f, "memory-reservation", 1, int(1.1*float64(memAllocatableMb)), false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
By("Waiting for scale up hoping it won't happen")
// Verify that the appropriate event was generated
@ -195,7 +195,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
simpleScaleUpTest := func(unready int) {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(f.ClientSet,
@ -222,7 +222,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Schedule more pods than can fit and wait for cluster to scale-up")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
status, err = waitForScaleUpStatus(c, func(s *scaleUpStatus) bool {
return s.status == caOngoingScaleUpStatus
@ -265,8 +265,8 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Reserving 0.1x more memory than the cluster holds to trigger scale up")
totalMemoryReservation := int(1.1 * float64(nodeCount*memAllocatableMb+extraMemMb))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", 100, totalMemoryReservation, false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
// Verify, that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
@ -289,7 +289,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
It("should increase cluster size if pods are pending due to host port conflict [Feature:ClusterSizeAutoscalingScaleUp]", func() {
scheduling.CreateHostPortPods(f, "host-port", nodeCount+2, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "host-port")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "host-port")
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
func(size int) bool { return size >= nodeCount+2 }, scaleUpTimeout))
@ -304,12 +304,12 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
}
By("starting a pod with anti-affinity on each node")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "some-pod")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("scheduling extra pods with anti-affinity to existing ones")
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "extra-pod")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
@ -323,14 +323,14 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
"anti-affinity": "yes",
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "some-pod")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
By("waiting for all pods before triggering scale up")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
By("creating a pod requesting EmptyDir")
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, "extra-pod", labels, labels, emptyDirVolumes))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "extra-pod")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "extra-pod")
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount+newPods, scaleUpTimeout))
@ -388,7 +388,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
}
framework.ExpectNoError(runAntiAffinityPods(f, f.Namespace.Name, pods, "some-pod", labels, labels))
defer func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "some-pod")
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "some-pod")
glog.Infof("RC and pods not using volume deleted")
}()
@ -401,7 +401,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
volumes := buildVolumes(pv, pvc)
framework.ExpectNoError(runVolumeAntiAffinityPods(f, f.Namespace.Name, newPods, pvcPodName, labels, labels, volumes))
defer func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, pvcPodName)
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, pvcPodName)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
}()
@ -506,7 +506,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
defer removeLabels(registeredNodes)
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "node-selector"))
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "node-selector"))
})
It("should scale up correct target pool [Feature:ClusterSizeAutoscalingScaleUp]", func() {
@ -524,8 +524,8 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
extraPods := extraNodes + 1
totalMemoryReservation := int(float64(extraPods) * 1.5 * float64(memAllocatableMb))
By(fmt.Sprintf("Creating rc with %v pods too big to fit default-pool but fitting extra-pool", extraPods))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
ReserveMemory(f, "memory-reservation", extraPods, totalMemoryReservation, false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
// Apparently GKE master is restarted couple minutes after the node pool is added
// reseting all the timers in scale down code. Adding 5 extra minutes to workaround
@ -663,7 +663,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
By("Run a scale-up test")
ReserveMemory(f, "memory-reservation", 1, 100, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
// Verify that cluster size is increased
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
@ -776,7 +776,7 @@ var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
framework.TestUnderTemporaryNetworkFailure(c, "default", ntb, testFunction)
} else {
ReserveMemory(f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, defaultTimeout)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, "memory-reservation")
time.Sleep(scaleUpTimeout)
currentNodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
@ -974,7 +974,7 @@ func runDrainTest(f *framework.Framework, migSizes map[string]int, namespace str
labelMap := map[string]string{"test_id": testID}
framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, "reschedulable-pods")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, "reschedulable-pods")
By("Create a PodDisruptionBudget")
minAvailable := intstr.FromInt(numPods - pdbSize)
@ -1404,7 +1404,7 @@ func reserveMemory(f *framework.Framework, id string, replicas, megabytes int, e
framework.ExpectNoError(err)
}
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id)
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id)
}
}
framework.Failf("Failed to reserve memory within timeout")
@ -1790,7 +1790,7 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa
func runReplicatedPodOnEachNodeWithCleanup(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) (func(), error) {
err := runReplicatedPodOnEachNode(f, nodes, namespace, podsPerNode, id, labels, memRequest)
return func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, id)
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, id)
}, err
}

View File

@ -116,7 +116,7 @@ type HPAScaleTest struct {
// TODO The use of 3 states is arbitrary, we could eventually make this test handle "n" states once this test stabilizes.
func (scaleTest *HPAScaleTest) run(name string, kind schema.GroupVersionKind, rc *common.ResourceConsumer, f *framework.Framework) {
const timeToWait = 15 * time.Minute
rc = common.NewDynamicResourceConsumer(name, f.Namespace.Name, kind, int(scaleTest.initPods), int(scaleTest.totalInitialCPUUsage), 0, 0, scaleTest.perPodCPURequest, 200, f.ClientSet, f.InternalClientset)
rc = common.NewDynamicResourceConsumer(name, f.Namespace.Name, kind, int(scaleTest.initPods), int(scaleTest.totalInitialCPUUsage), 0, 0, scaleTest.perPodCPURequest, 200, f.ClientSet, f.InternalClientset, f.ScalesGetter)
defer rc.CleanUp()
hpa := common.CreateCPUHorizontalPodAutoscaler(rc, scaleTest.targetCPUUtilizationPercent, scaleTest.minPods, scaleTest.maxPods)
defer common.DeleteHorizontalPodAutoscaler(rc, hpa.Name)

View File

@ -65,6 +65,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -36,6 +36,7 @@ import (
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
scaleclient "k8s.io/client-go/scale"
imageutils "k8s.io/kubernetes/test/utils/image"
)
@ -86,6 +87,7 @@ type ResourceConsumer struct {
nsName string
clientSet clientset.Interface
internalClientset *internalclientset.Clientset
scaleClient scaleclient.ScalesGetter
cpu chan int
mem chan int
customMetric chan int
@ -104,15 +106,15 @@ func GetResourceConsumerImage() string {
return resourceConsumerImage
}
func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer {
func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset)
dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
}
// TODO this still defaults to replication controller
func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer {
func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds,
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset)
initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, clientset, internalClientset, scaleClient)
}
/*
@ -123,7 +125,7 @@ memLimit argument is in megabytes, memLimit is a maximum amount of memory that c
cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
*/
func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer {
requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, internalClientset *internalclientset.Clientset, scaleClient scaleclient.ScalesGetter) *ResourceConsumer {
runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuLimit, memLimit)
rc := &ResourceConsumer{
@ -133,6 +135,7 @@ func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, repl
nsName: nsName,
clientSet: clientset,
internalClientset: internalClientset,
scaleClient: scaleClient,
cpu: make(chan int),
mem: make(chan int),
customMetric: make(chan int),
@ -401,9 +404,9 @@ func (rc *ResourceConsumer) CleanUp() {
// Wait some time to ensure all child goroutines are finished.
time.Sleep(10 * time.Second)
kind := rc.kind.GroupKind()
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, kind, rc.nsName, rc.name))
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, rc.scaleClient, kind, rc.nsName, rc.name))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.name, nil))
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, api.Kind("ReplicationController"), rc.nsName, rc.controllerName))
framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, rc.scaleClient, api.Kind("ReplicationController"), rc.nsName, rc.controllerName))
framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil))
}

View File

@ -155,8 +155,8 @@ func DeleteRCAndWaitForGC(c clientset.Interface, ns, name string) error {
return DeleteResourceAndWaitForGC(c, api.Kind("ReplicationController"), ns, name)
}
func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error {
return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name)
func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, name string) error {
return DeleteResourceAndPods(clientset, internalClientset, scaleClient, api.Kind("ReplicationController"), ns, name)
}
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {

View File

@ -47,6 +47,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
scaleclient "k8s.io/client-go/scale"
)
const (
@ -1269,8 +1270,8 @@ func StartServeHostnameService(c clientset.Interface, internalClient internalcli
return podNames, serviceIP, nil
}
func StopServeHostnameService(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string) error {
if err := DeleteRCAndPods(clientset, internalClientset, ns, name); err != nil {
func StopServeHostnameService(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, ns, name string) error {
if err := DeleteRCAndPods(clientset, internalClientset, scaleClient, ns, name); err != nil {
return err
}
if err := clientset.CoreV1().Services(ns).Delete(name, nil); err != nil {

View File

@ -2984,7 +2984,7 @@ func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
}
// DeleteResourceAndPods deletes a given resource and all pods it spawned
func DeleteResourceAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, kind schema.GroupKind, ns, name string) error {
func DeleteResourceAndPods(clientset clientset.Interface, internalClientset internalclientset.Interface, scaleClient scaleclient.ScalesGetter, kind schema.GroupKind, ns, name string) error {
By(fmt.Sprintf("deleting %v %s in namespace %s", kind, name, ns))
rtObject, err := getRuntimeObjectForKind(clientset, kind, ns, name)
@ -3005,7 +3005,7 @@ func DeleteResourceAndPods(clientset clientset.Interface, internalClientset inte
}
defer ps.Stop()
startTime := time.Now()
if err := testutils.DeleteResourceUsingReaperWithRetries(internalClientset, kind, ns, name, nil); err != nil {
if err := testutils.DeleteResourceUsingReaperWithRetries(internalClientset, kind, ns, name, nil, scaleClient); err != nil {
return fmt.Errorf("error while stopping %v: %s: %v", kind, name, err)
}
deleteTime := time.Now().Sub(startTime)

View File

@ -101,7 +101,7 @@ func testStackdriverMonitoring(f *framework.Framework, pods, allPodsCPU int, per
framework.ExpectNoError(err)
rc := common.NewDynamicResourceConsumer(rcName, f.Namespace.Name, common.KindDeployment, pods, allPodsCPU, memoryUsed, 0, perPodCPU, memoryLimit, f.ClientSet, f.InternalClientset)
rc := common.NewDynamicResourceConsumer(rcName, f.Namespace.Name, common.KindDeployment, pods, allPodsCPU, memoryUsed, 0, perPodCPU, memoryLimit, f.ClientSet, f.InternalClientset, f.ScalesGetter)
defer rc.CleanUp()
rc.WaitForReplicas(pods, 15*time.Minute)

View File

@ -163,7 +163,7 @@ var _ = SIGDescribe("Proxy", func() {
CreatedPods: &pods,
}
Expect(framework.RunRC(cfg)).NotTo(HaveOccurred())
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, cfg.Name)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, cfg.Name)
Expect(framework.WaitForEndpoint(f.ClientSet, f.Namespace.Name, service.Name)).NotTo(HaveOccurred())

View File

@ -309,7 +309,7 @@ var _ = SIGDescribe("Services", func() {
// Stop service 1 and make sure it is gone.
By("stopping service1")
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, ns, "service1"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1"))
By("verifying service1 is not up")
framework.ExpectNoError(framework.VerifyServeHostnameServiceDown(cs, host, svc1IP, servicePort))
@ -343,13 +343,13 @@ var _ = SIGDescribe("Services", func() {
svc2 := "service2"
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, ns, svc1))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc1))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc1, servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, ns, svc2))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, svc2))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, svc2, servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
@ -396,7 +396,7 @@ var _ = SIGDescribe("Services", func() {
numPods, servicePort := 3, 80
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, ns, "service1"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service1"))
}()
podNames1, svc1IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service1", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
@ -423,7 +423,7 @@ var _ = SIGDescribe("Services", func() {
// Create a new service and check if it's not reusing IP.
defer func() {
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, ns, "service2"))
framework.ExpectNoError(framework.StopServeHostnameService(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "service2"))
}()
podNames2, svc2IP, err := framework.StartServeHostnameService(cs, internalClientset, ns, "service2", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
@ -1667,7 +1667,7 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
framework.Logf("Health checking %s, http://%s%s, expectedSuccess %v", nodes.Items[n].Name, ipPort, path, expectedSuccess)
Expect(jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, framework.KubeProxyEndpointLagTimeout, expectedSuccess, threshold)).NotTo(HaveOccurred())
}
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, serviceName))
framework.ExpectNoError(framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, namespace, serviceName))
}
})

View File

@ -334,7 +334,7 @@ var _ = SIGDescribe("kubelet", func() {
}
By("Deleting the RC")
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, rcName)
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
// Check that the pods really are gone by querying /runningpods on the
// node. The /runningpods handler checks the container runtime (or its
// cache) and returns a list of running pods. Some possible causes of

View File

@ -117,7 +117,7 @@ func runResourceTrackingTest(f *framework.Framework, podsPerNode int, nodeNames
verifyCPULimits(expectedCPU, cpuSummary)
By("Deleting the RC")
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, rcName)
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
}
func verifyMemoryLimits(c clientset.Interface, expected framework.ResourceUsagePerContainer, actual framework.ResourceUsagePerNode) {

View File

@ -35,6 +35,7 @@ import (
utiluuid "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/apis/batch"
@ -66,6 +67,7 @@ type DensityTestConfig struct {
Configs []testutils.RunObjectConfig
ClientSets []clientset.Interface
InternalClientsets []internalclientset.Interface
ScaleClients []scaleclient.ScalesGetter
PollInterval time.Duration
PodCount int
// What kind of resource we want to create
@ -116,6 +118,7 @@ func (dtc *DensityTestConfig) deleteDaemonSets(numberOfClients int, testPhase *t
framework.ExpectNoError(framework.DeleteResourceAndPods(
dtc.ClientSets[i%numberOfClients],
dtc.InternalClientsets[i%numberOfClients],
dtc.ScaleClients[i%numberOfClients],
extensions.Kind("DaemonSet"),
dtc.DaemonConfigs[i].Namespace,
dtc.DaemonConfigs[i].Name,
@ -320,7 +323,7 @@ func cleanupDensityTest(dtc DensityTestConfig, testPhaseDurations *timer.TestPha
framework.ExpectNoError(err)
} else {
By(fmt.Sprintf("Cleaning up the %v and pods", kind))
err := framework.DeleteResourceAndPods(dtc.ClientSets[i%numberOfClients], dtc.InternalClientsets[i%numberOfClients], kind, namespace, name)
err := framework.DeleteResourceAndPods(dtc.ClientSets[i%numberOfClients], dtc.InternalClientsets[i%numberOfClients], dtc.ScaleClients[i%numberOfClients], kind, namespace, name)
framework.ExpectNoError(err)
}
}
@ -613,11 +616,12 @@ var _ = SIGDescribe("Density", func() {
}
// Single client is running out of http2 connections in delete phase, hence we need more.
clients, internalClients, _, err = createClients(2)
clients, internalClients, scalesClients, err = createClients(2)
dConfig := DensityTestConfig{
ClientSets: clients,
InternalClientsets: internalClients,
ScaleClients: scalesClients,
Configs: configs,
PodCount: totalPods,
PollInterval: DensityPollInterval,

View File

@ -289,6 +289,7 @@ var _ = SIGDescribe("Load capacity", func() {
framework.ExpectNoError(framework.DeleteResourceAndPods(
f.ClientSet,
f.InternalClientset,
f.ScalesGetter,
extensions.Kind("DaemonSet"),
config.Namespace,
config.Name,
@ -700,7 +701,7 @@ func deleteResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, deleti
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
} else {
framework.ExpectNoError(framework.DeleteResourceAndPods(
config.GetClient(), config.GetInternalClient(), config.GetKind(), config.GetNamespace(), config.GetName()),
config.GetClient(), config.GetInternalClient(), config.GetScalesGetter(), config.GetKind(), config.GetNamespace(), config.GetName()),
fmt.Sprintf("deleting %v %s", config.GetKind(), config.GetName()))
}
}

View File

@ -92,7 +92,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
err := CreateNodeSelectorPods(f, rcName, 2, nodeSelector, false)
return err
}, rcName, false)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, rcName)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName)
// the first replica pod is scheduled, and the second pod will be rejected.
verifyResult(cs, 1, 1, ns)
})
@ -140,7 +140,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
},
}
rc := getRCWithInterPodAffinity(affinityRCName, labelsMap, replica, affinity, framework.GetPauseImageName(f.ClientSet))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, affinityRCName)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName)
// RC should be running successfully
// TODO: WaitForSchedulerAfterAction() can on be used to wait for failure event,
@ -166,7 +166,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
It("validates pod anti-affinity works properly when new replica pod is scheduled", func() {
By("Launching two pods on two distinct nodes to get two node names")
CreateHostPortPods(f, "host-port", 2, true)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "host-port")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "host-port")
podList, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{})
framework.ExpectNoError(err)
Expect(len(podList.Items)).To(Equal(2))
@ -217,7 +217,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
}
rc := getRCWithInterPodAffinityNodeSelector(labelRCName, labelsMap, replica, affinity,
framework.GetPauseImageName(f.ClientSet), map[string]string{k: v})
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, labelRCName)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, labelRCName)
WaitForSchedulerAfterAction(f, func() error {
_, err := cs.CoreV1().ReplicationControllers(ns).Create(rc)

View File

@ -71,7 +71,7 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
rc, err := cs.CoreV1().ReplicationControllers(ns).Get(RCName, metav1.GetOptions{})
if err == nil && *(rc.Spec.Replicas) != 0 {
By("Cleaning up the replication controller")
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, RCName)
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, RCName)
framework.ExpectNoError(err)
}
})

View File

@ -152,7 +152,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
// Cleanup the replication controller when we are done.
defer func() {
// Resize the replication controller to zero to get rid of pods.
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, rc.Name); err != nil {
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rc.Name); err != nil {
framework.Logf("Failed to cleanup replication controller %v: %v.", rc.Name, err)
}
}()

View File

@ -55,7 +55,7 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() {
It("should ensure that critical pod is scheduled in case there is no resources available", func() {
By("reserving all available cpu")
err := reserveAllCpu(f, "reserve-all-cpu", totalMillicores)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, ns, "reserve-all-cpu")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "reserve-all-cpu")
framework.ExpectNoError(err)
By("creating a new instance of Dashboard and waiting for Dashboard to be scheduled")

View File

@ -223,7 +223,7 @@ func SpreadRCOrFail(f *framework.Framework, replicaCount int32, image string) {
// Cleanup the replication controller when we are done.
defer func() {
// Resize the replication controller to zero to get rid of pods.
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, controller.Name); err != nil {
if err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, controller.Name); err != nil {
framework.Logf("Failed to cleanup replication controller %v: %v.", controller.Name, err)
}
}()

View File

@ -374,7 +374,7 @@ func testNoWrappedVolumeRace(f *framework.Framework, volumes []v1.Volume, volume
Expect(err).NotTo(HaveOccurred(), "error creating replication controller")
defer func() {
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, rcName)
err := framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, rcName)
framework.ExpectNoError(err)
}()

View File

@ -48,7 +48,8 @@ func (t *HPAUpgradeTest) Setup(f *framework.Framework) {
500, /* cpuLimit */
200, /* memLimit */
f.ClientSet,
f.InternalClientset)
f.InternalClientset,
f.ScalesGetter)
t.hpa = common.CreateCPUHorizontalPodAutoscaler(
t.rc,
20, /* targetCPUUtilizationPercent */

View File

@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
@ -58,10 +59,6 @@ func deleteResource(c clientset.Interface, kind schema.GroupKind, namespace, nam
}
}
func getReaperForKind(c internalclientset.Interface, kind schema.GroupKind) (kubectl.Reaper, error) {
return kubectl.ReaperFor(kind, c)
}
func DeleteResourceWithRetries(c clientset.Interface, kind schema.GroupKind, namespace, name string, options *metav1.DeleteOptions) error {
deleteFunc := func() (bool, error) {
err := deleteResource(c, kind, namespace, name, options)
@ -76,8 +73,8 @@ func DeleteResourceWithRetries(c clientset.Interface, kind schema.GroupKind, nam
return RetryWithExponentialBackOff(deleteFunc)
}
func DeleteResourceUsingReaperWithRetries(c internalclientset.Interface, kind schema.GroupKind, namespace, name string, options *metav1.DeleteOptions) error {
reaper, err := getReaperForKind(c, kind)
func DeleteResourceUsingReaperWithRetries(c internalclientset.Interface, kind schema.GroupKind, namespace, name string, options *metav1.DeleteOptions, scaleClient scaleclient.ScalesGetter) error {
reaper, err := kubectl.ReaperFor(kind, c, scaleClient)
if err != nil {
return err
}