From dd9de90b0ad1aa78c2a8dd7d5238d8f769ffe771 Mon Sep 17 00:00:00 2001 From: p0lyn0mial Date: Thu, 4 Jan 2018 14:52:25 +0100 Subject: [PATCH] the changes introduced in this commit plumbs in the generic scaler into kubectl. note that we don't change the behaviour of kubectl. For example it won't scale new resources. That's the end goal. The first step is to retrofit existing code to use the generic scaler. --- pkg/kubectl/cmd/util/BUILD | 1 + .../cmd/util/factory_object_mapping.go | 19 ++++++- pkg/kubectl/scale.go | 12 ++-- staging/src/k8s.io/client-go/scale/client.go | 1 - test/e2e/apps/daemon_restart.go | 6 +- test/e2e/examples.go | 4 +- test/e2e/framework/BUILD | 3 + test/e2e/framework/deployment_util.go | 7 ++- test/e2e/framework/framework.go | 25 +++++++++ test/e2e/framework/rc_util.go | 13 +++-- test/e2e/framework/util.go | 12 +++- test/e2e/network/service.go | 2 +- test/e2e/scalability/BUILD | 15 +++-- test/e2e/scalability/density.go | 5 +- test/e2e/scalability/load.go | 55 ++++++++++++++++--- .../equivalence_cache_predicates.go | 2 +- test/e2e/scheduling/priorities.go | 2 +- test/e2e/scheduling/rescheduler.go | 6 +- test/integration/framework/BUILD | 3 - test/integration/framework/util.go | 54 ------------------ test/utils/BUILD | 1 + test/utils/runners.go | 29 ++++++++++ 22 files changed, 177 insertions(+), 100 deletions(-) diff --git a/pkg/kubectl/cmd/util/BUILD b/pkg/kubectl/cmd/util/BUILD index 6541d3953e..ed3d59475c 100644 --- a/pkg/kubectl/cmd/util/BUILD +++ b/pkg/kubectl/cmd/util/BUILD @@ -78,6 +78,7 @@ go_library( "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/util/homedir:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", diff --git a/pkg/kubectl/cmd/util/factory_object_mapping.go b/pkg/kubectl/cmd/util/factory_object_mapping.go index dfd82d406c..5c9f01f7d5 100644 --- a/pkg/kubectl/cmd/util/factory_object_mapping.go +++ b/pkg/kubectl/cmd/util/factory_object_mapping.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" @@ -286,7 +287,23 @@ func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error) if err != nil { return nil, err } - return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset) + + // create scales getter + // TODO(p0lyn0mial): put scalesGetter to a factory + discoClient, err := f.clientAccessFactory.DiscoveryClient() + if err != nil { + return nil, err + } + restClient, err := f.clientAccessFactory.RESTClient() + if err != nil { + return nil, err + } + mapper, _ := f.Object() + resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient) + scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver) + gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource) + + return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset, scalesGetter, gvk.GroupResource()) } func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 1d4165f962..511514df6b 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -53,7 +53,10 @@ type Scaler interface { ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error) } -func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, error) { +// ScalerFor gets a scaler for a given resource +// TODO(p0lyn0mial): remove kind and internalclientset +// TODO(p0lyn0mial): once we have only one scaler, there is no need to return an error anymore. +func ScalerFor(kind schema.GroupKind, c internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): return &ReplicationControllerScaler{c.Core()}, nil @@ -63,10 +66,9 @@ func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, er return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface. case apps.Kind("StatefulSet"): return &StatefulSetScaler{c.Apps()}, nil - case extensions.Kind("Deployment"), apps.Kind("Deployment"): - return &DeploymentScaler{c.Extensions()}, nil + default: + return &GenericScaler{scalesGetter, gr}, nil } - return nil, fmt.Errorf("no scaler has been implemented for %q", kind) } // ScalePrecondition describes a condition that must be true for the scale to take place @@ -533,7 +535,7 @@ func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Sca } // GenericScaler can update scales for resources in a particular namespace -// TODO(o0lyn0mial): when the work on GenericScaler is done, don't +// TODO(po0lyn0mial): when the work on GenericScaler is done, don't // export the GenericScaler. Instead use ScalerFor method for getting the Scaler // also update the UTs type GenericScaler struct { diff --git a/staging/src/k8s.io/client-go/scale/client.go b/staging/src/k8s.io/client-go/scale/client.go index 07c6098620..a8c903d9ea 100644 --- a/staging/src/k8s.io/client-go/scale/client.go +++ b/staging/src/k8s.io/client-go/scale/client.go @@ -196,7 +196,6 @@ func (c *namespacedScaleClient) Update(resource schema.GroupResource, scale *aut Body(scaleUpdateBytes). Do() if err := result.Error(); err != nil { - panic(err) return nil, fmt.Errorf("could not update the scale for %s %s: %v", resource.String(), scale.Name, err) } diff --git a/test/e2e/apps/daemon_restart.go b/test/e2e/apps/daemon_restart.go index 85266680ee..2319dfe573 100644 --- a/test/e2e/apps/daemon_restart.go +++ b/test/e2e/apps/daemon_restart.go @@ -257,7 +257,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { // that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC // to the same size achieves this, because the scale operation advances the RC's sequence number // and awaits it to be observed and reported back in the RC's status. - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true) // Only check the keys, the pods can be different if the kubelet updated it. // TODO: Can it really? @@ -288,9 +288,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() { restarter.kill() // This is best effort to try and create pods while the scheduler is down, // since we don't know exactly when it is restarted after the kill signal. - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false)) restarter.waitUp() - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, true)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true)) }) It("Kubelet should not restart containers across restart", func() { diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 7e377e203e..6fa937c89f 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling rethinkdb") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "rethinkdb-rc", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true) checkDbInstances() By("starting admin") @@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { Expect(err).NotTo(HaveOccurred()) By("scaling hazelcast") - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "hazelcast", 2, true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true) forEachPod("name", "hazelcast", func(pod v1.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 0ecad57686..6a773a3957 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -110,6 +110,7 @@ go_library( "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/api/rbac/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", @@ -132,6 +133,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", @@ -139,6 +141,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1beta1:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", diff --git a/test/e2e/framework/deployment_util.go b/test/e2e/framework/deployment_util.go index 23feda770d..d5544e1998 100644 --- a/test/e2e/framework/deployment_util.go +++ b/test/e2e/framework/deployment_util.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" @@ -178,8 +179,10 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er return err } -func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment")) +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. +func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments")) } func RunDeployment(config testutils.DeploymentConfig) error { diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index e628accaa2..f5341d79c0 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -28,14 +28,19 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/clientcmd" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -67,6 +72,8 @@ type Framework struct { AggregatorClient *aggregatorclient.Clientset ClientPool dynamic.ClientPool + ScalesGetter scaleclient.ScalesGetter + SkipNamespaceCreation bool // Whether to skip creating a namespace Namespace *v1.Namespace // Every test has at least one namespace unless creation is skipped namespacesToDelete []*v1.Namespace // Some tests have more than one. @@ -161,6 +168,24 @@ func (f *Framework) BeforeEach() { f.AggregatorClient, err = aggregatorclient.NewForConfig(config) Expect(err).NotTo(HaveOccurred()) f.ClientPool = dynamic.NewClientPool(config, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + + // create scales getter, set GroupVersion and NegotiatedSerializer to default values + // as they are required when creating a REST client. + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := rest.RESTClientFor(config) + Expect(err).NotTo(HaveOccurred()) + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + Expect(err).NotTo(HaveOccurred()) + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + f.ScalesGetter = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) + if ProviderIs("kubemark") && TestContext.KubemarkExternalKubeConfig != "" && TestContext.CloudConfig.KubemarkController == nil { externalConfig, err := clientcmd.BuildConfigFromFlags("", TestContext.KubemarkExternalKubeConfig) externalConfig.QPS = f.Options.ClientQPS diff --git a/test/e2e/framework/rc_util.go b/test/e2e/framework/rc_util.go index d0d1982b53..8bbdb6f4a9 100644 --- a/test/e2e/framework/rc_util.go +++ b/test/e2e/framework/rc_util.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/testapi" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -84,7 +85,9 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str // ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till // none are running, otherwise it does what a synchronous scale operation would do. -func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, ns string, l map[string]string, replicas uint) error { +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error { listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()} rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts) if err != nil { @@ -96,7 +99,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas) for _, labelRC := range rcs.Items { name := labelRC.Name - if err := ScaleRC(clientset, internalClientset, ns, name, replicas, false); err != nil { + if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil { return err } rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) @@ -156,8 +159,10 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name) } -func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error { - return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController")) +//TODO(p0lyn0mial): remove internalClientset. +//TODO(p0lyn0mial): update the callers. +func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error { + return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers")) } func RunRC(config testutils.RCConfig) error { diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 637a9eace0..8384c774af 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -74,6 +74,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/testapi" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -2682,20 +2683,25 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) { ExpectNoError(err) } -func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) { - return kubectl.ScalerFor(kind, internalClientset) +//TODO(p0lyn0mial): remove internalClientset and kind +func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (kubectl.Scaler, error) { + return kubectl.ScalerFor(kind, internalClientset, scalesGetter, gr) } +//TODO(p0lyn0mial): remove internalClientset and kind. +//TODO(p0lyn0mial): update the callers. func ScaleResource( clientset clientset.Interface, internalClientset internalclientset.Interface, + scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool, kind schema.GroupKind, + gr schema.GroupResource, ) error { By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size)) - scaler, err := getScalerForKind(internalClientset, kind) + scaler, err := getScalerForKind(internalClientset, kind, scalesGetter, gr) if err != nil { return err } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 759599a993..44712d0d4c 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -1265,7 +1265,7 @@ var _ = SIGDescribe("Services", func() { } By("Scaling down replication controller to zero") - framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false) By("Update service to not tolerate unready services") _, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) { diff --git a/test/e2e/scalability/BUILD b/test/e2e/scalability/BUILD index 34dff1f866..fc6e3cee36 100644 --- a/test/e2e/scalability/BUILD +++ b/test/e2e/scalability/BUILD @@ -1,9 +1,4 @@ -package(default_visibility = ["//visibility:public"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -14,7 +9,9 @@ go_library( "load.go", ], importpath = "k8s.io/kubernetes/test/e2e/scalability", + visibility = ["//visibility:public"], deps = [ + "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/batch:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/apis/extensions:go_default_library", @@ -26,6 +23,7 @@ go_library( "//vendor/github.com/onsi/gomega:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", @@ -38,8 +36,12 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", + "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/transport:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", @@ -57,4 +59,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index f10671e10b..6e49d06859 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -528,7 +528,7 @@ var _ = SIGDescribe("Density", func() { podThroughput := 20 timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute // createClients is defined in load.go - clients, internalClients, err := createClients(numberOfCollections) + clients, internalClients, scalesClients, err := createClients(numberOfCollections) for i := 0; i < numberOfCollections; i++ { nsName := namespaces[i].Name secretNames := []string{} @@ -559,6 +559,7 @@ var _ = SIGDescribe("Density", func() { baseConfig := &testutils.RCConfig{ Client: clients[i], InternalClient: internalClients[i], + ScalesGetter: scalesClients[i], Image: framework.GetPauseImageName(f.ClientSet), Name: name, Namespace: nsName, @@ -590,7 +591,7 @@ var _ = SIGDescribe("Density", func() { } // Single client is running out of http2 connections in delete phase, hence we need more. - clients, internalClients, err = createClients(2) + clients, internalClients, _, err = createClients(2) dConfig := DensityTestConfig{ ClientSets: clients, diff --git a/test/e2e/scalability/load.go b/test/e2e/scalability/load.go index c696de4272..6e15dbc880 100644 --- a/test/e2e/scalability/load.go +++ b/test/e2e/scalability/load.go @@ -28,14 +28,18 @@ import ( "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/apis/batch" @@ -48,6 +52,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/client-go/dynamic" + "k8s.io/kubernetes/pkg/api/legacyscheme" ) const ( @@ -309,9 +315,11 @@ var _ = SIGDescribe("Load capacity", func() { } }) -func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, error) { +func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, []scaleclient.ScalesGetter, error) { clients := make([]clientset.Interface, numberOfClients) internalClients := make([]internalclientset.Interface, numberOfClients) + scalesClients := make([]scaleclient.ScalesGetter, numberOfClients) + for i := 0; i < numberOfClients; i++ { config, err := framework.LoadConfig() Expect(err).NotTo(HaveOccurred()) @@ -327,11 +335,11 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient // each client here. transportConfig, err := config.TransportConfig() if err != nil { - return nil, nil, err + return nil, nil, nil, err } tlsConfig, err := transport.TLSConfigFor(transportConfig) if err != nil { - return nil, nil, err + return nil, nil, nil, err } config.Transport = utilnet.SetTransportDefaults(&http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -349,16 +357,37 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient c, err := clientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } clients[i] = c internalClient, err := internalclientset.NewForConfig(config) if err != nil { - return nil, nil, err + return nil, nil, nil, err } internalClients[i] = internalClient + + // create scale client, if GroupVersion or NegotiatedSerializer are not set + // assign default values - these fields are mandatory (required by RESTClientFor). + if config.GroupVersion == nil { + config.GroupVersion = &schema.GroupVersion{} + } + if config.NegotiatedSerializer == nil { + config.NegotiatedSerializer = legacyscheme.Codecs + } + restClient, err := restclient.RESTClientFor(config) + if err != nil { + return nil, nil, nil, err + } + discoClient, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, nil, nil, err + } + cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured) + resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient) + scalesClients[i] = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver) } - return clients, internalClients, nil + return clients, internalClients, scalesClients, nil } func computePodCounts(total int) (int, int, int) { @@ -405,12 +434,13 @@ func generateConfigs( // Create a number of clients to better simulate real usecase // where not everyone is using exactly the same client. rcsPerClient := 20 - clients, internalClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) + clients, internalClients, scalesClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient) framework.ExpectNoError(err) for i := 0; i < len(configs); i++ { configs[i].SetClient(clients[i%len(clients)]) configs[i].SetInternalClient(internalClients[i%len(internalClients)]) + configs[i].SetScalesClient(scalesClients[i%len(clients)]) } for i := 0; i < len(secretConfigs); i++ { secretConfigs[i].Client = clients[i%len(clients)] @@ -590,7 +620,16 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling sleepUpTo(scalingTime) newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2) framework.ExpectNoError(framework.ScaleResource( - config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()), + config.GetClient(), + config.GetInternalClient(), + config.GetScalesGetter(), + config.GetNamespace(), + config.GetName(), + newSize, + true, + config.GetKind(), + config.GetGroupResource(), + ), fmt.Sprintf("scaling %v %v", config.GetKind(), config.GetName())) selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()})) diff --git a/test/e2e/scheduling/equivalence_cache_predicates.go b/test/e2e/scheduling/equivalence_cache_predicates.go index 79aaf5d9d8..3d55147639 100644 --- a/test/e2e/scheduling/equivalence_cache_predicates.go +++ b/test/e2e/scheduling/equivalence_cache_predicates.go @@ -155,7 +155,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() { By("Trying to schedule another equivalent Pod should fail due to node label has been removed.") // use scale to create another equivalent pod and wait for failure event WaitForSchedulerAfterAction(f, func() error { - err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false) + err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false) return err }, affinityRCName, false) // and this new pod should be rejected since node label has been updated diff --git a/test/e2e/scheduling/priorities.go b/test/e2e/scheduling/priorities.go index 1c34ea8998..f3643b6cb1 100644 --- a/test/e2e/scheduling/priorities.go +++ b/test/e2e/scheduling/priorities.go @@ -196,7 +196,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() { By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1)) - framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rc.Name, uint(len(nodeList.Items)-1), true) + framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true) testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{ LabelSelector: "name=scheduler-priority-avoid-pod", }) diff --git a/test/e2e/scheduling/rescheduler.go b/test/e2e/scheduling/rescheduler.go index 512e8b3c6f..0d1107ccbe 100644 --- a/test/e2e/scheduling/rescheduler.go +++ b/test/e2e/scheduling/rescheduler.go @@ -68,8 +68,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() { deployment := deployments.Items[0] replicas := uint(*(deployment.Spec.Replicas)) - err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas+1, true) - defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas, true)) + err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true) + defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true)) framework.ExpectNoError(err) }) @@ -80,7 +80,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error { replicas := millicores / 100 reserveCpu(f, id, 1, 100) - framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.Namespace.Name, id, uint(replicas), false)) + framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false)) for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) { pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels) diff --git a/test/integration/framework/BUILD b/test/integration/framework/BUILD index 07da3bc114..27a3fb8e41 100644 --- a/test/integration/framework/BUILD +++ b/test/integration/framework/BUILD @@ -22,13 +22,10 @@ go_library( "//pkg/api/legacyscheme:go_default_library", "//pkg/api/testapi:go_default_library", "//pkg/apis/batch:go_default_library", - "//pkg/apis/core:go_default_library", "//pkg/apis/policy/v1beta1:go_default_library", - "//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/replication:go_default_library", "//pkg/generated/openapi:go_default_library", - "//pkg/kubectl:go_default_library", "//pkg/kubelet/client:go_default_library", "//pkg/master:go_default_library", "//pkg/util/env:go_default_library", diff --git a/test/integration/framework/util.go b/test/integration/framework/util.go index afb1d68961..c9d42a99c4 100644 --- a/test/integration/framework/util.go +++ b/test/integration/framework/util.go @@ -19,22 +19,13 @@ limitations under the License. package framework import ( - "io/ioutil" "net/http/httptest" "strings" "testing" - "time" - - "github.com/golang/glog" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" - "k8s.io/kubernetes/pkg/api/testapi" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - "k8s.io/kubernetes/pkg/kubectl" ) const ( @@ -80,48 +71,3 @@ func CreateTestingNamespace(baseName string, apiserver *httptest.Server, t *test func DeleteTestingNamespace(ns *v1.Namespace, apiserver *httptest.Server, t *testing.T) { // TODO: Remove all resources from a given namespace once we implement CreateTestingNamespace. } - -// RCFromManifest reads a .json file and returns the rc in it. -func RCFromManifest(fileName string) *v1.ReplicationController { - data, err := ioutil.ReadFile(fileName) - if err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - var controller v1.ReplicationController - if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil { - glog.Fatalf("Unexpected error reading rc manifest %v", err) - } - return &controller -} - -// StopRC stops the rc via kubectl's stop library -func StopRC(rc *v1.ReplicationController, clientset internalclientset.Interface) error { - reaper, err := kubectl.ReaperFor(api.Kind("ReplicationController"), clientset) - if err != nil || reaper == nil { - return err - } - err = reaper.Stop(rc.Namespace, rc.Name, 0, nil) - if err != nil { - return err - } - return nil -} - -// ScaleRC scales the given rc to the given replicas. -func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interface) (*api.ReplicationController, error) { - scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), clientset) - if err != nil { - return nil, err - } - retry := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - waitForReplicas := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout} - err = scaler.Scale(ns, name, uint(replicas), nil, retry, waitForReplicas) - if err != nil { - return nil, err - } - scaled, err := clientset.Core().ReplicationControllers(ns).Get(name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return scaled, nil -} diff --git a/test/utils/BUILD b/test/utils/BUILD index a2e6045933..da7eeadab9 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -44,6 +44,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], diff --git a/test/utils/runners.go b/test/utils/runners.go index 2eaf28e48e..1d71a3eeb6 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/util/workqueue" batchinternal "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" @@ -105,16 +106,20 @@ type RunObjectConfig interface { GetKind() schema.GroupKind GetClient() clientset.Interface GetInternalClient() internalclientset.Interface + GetScalesGetter() scaleclient.ScalesGetter SetClient(clientset.Interface) SetInternalClient(internalclientset.Interface) + SetScalesClient(scaleclient.ScalesGetter) GetReplicas() int GetLabelValue(string) (string, bool) + GetGroupResource() schema.GroupResource } type RCConfig struct { Affinity *v1.Affinity Client clientset.Interface InternalClient internalclientset.Interface + ScalesGetter scaleclient.ScalesGetter Image string Command []string Name string @@ -277,6 +282,10 @@ func (config *DeploymentConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("Deployment") } +func (config *DeploymentConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("deployments") +} + func (config *DeploymentConfig) create() error { deployment := &extensions.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -344,6 +353,10 @@ func (config *ReplicaSetConfig) GetKind() schema.GroupKind { return extensionsinternal.Kind("ReplicaSet") } +func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource { + return extensionsinternal.Resource("replicasets") +} + func (config *ReplicaSetConfig) create() error { rs := &extensions.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ @@ -411,6 +424,10 @@ func (config *JobConfig) GetKind() schema.GroupKind { return batchinternal.Kind("Job") } +func (config *JobConfig) GetGroupResource() schema.GroupResource { + return batchinternal.Resource("jobs") +} + func (config *JobConfig) create() error { job := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -482,6 +499,10 @@ func (config *RCConfig) GetKind() schema.GroupKind { return api.Kind("ReplicationController") } +func (config *RCConfig) GetGroupResource() schema.GroupResource { + return api.Resource("replicationcontrollers") +} + func (config *RCConfig) GetClient() clientset.Interface { return config.Client } @@ -490,6 +511,10 @@ func (config *RCConfig) GetInternalClient() internalclientset.Interface { return config.InternalClient } +func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter { + return config.ScalesGetter +} + func (config *RCConfig) SetClient(c clientset.Interface) { config.Client = c } @@ -498,6 +523,10 @@ func (config *RCConfig) SetInternalClient(c internalclientset.Interface) { config.InternalClient = c } +func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) { + config.ScalesGetter = getter +} + func (config *RCConfig) GetReplicas() int { return config.Replicas }