the changes introduced in this commit plumbs in the generic scaler into kubectl.

note that we don't change the behaviour of kubectl.
For example it won't scale new resources. That's the end goal.
The first step is to retrofit existing code to use the generic scaler.
pull/6/head
p0lyn0mial 2018-01-04 14:52:25 +01:00
parent 1a817b1507
commit dd9de90b0a
22 changed files with 177 additions and 100 deletions

View File

@ -78,6 +78,7 @@ go_library(
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/util/homedir:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",

View File

@ -37,6 +37,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
@ -286,7 +287,23 @@ func (f *ring1Factory) Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
if err != nil {
return nil, err
}
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset)
// create scales getter
// TODO(p0lyn0mial): put scalesGetter to a factory
discoClient, err := f.clientAccessFactory.DiscoveryClient()
if err != nil {
return nil, err
}
restClient, err := f.clientAccessFactory.RESTClient()
if err != nil {
return nil, err
}
mapper, _ := f.Object()
resolver := scaleclient.NewDiscoveryScaleKindResolver(discoClient)
scalesGetter := scaleclient.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
gvk := mapping.GroupVersionKind.GroupVersion().WithResource(mapping.Resource)
return kubectl.ScalerFor(mapping.GroupVersionKind.GroupKind(), clientset, scalesGetter, gvk.GroupResource())
}
func (f *ring1Factory) Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error) {

View File

@ -53,7 +53,10 @@ type Scaler interface {
ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) (updatedResourceVersion string, err error)
}
func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, error) {
// ScalerFor gets a scaler for a given resource
// TODO(p0lyn0mial): remove kind and internalclientset
// TODO(p0lyn0mial): once we have only one scaler, there is no need to return an error anymore.
func ScalerFor(kind schema.GroupKind, c internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (Scaler, error) {
switch kind {
case api.Kind("ReplicationController"):
return &ReplicationControllerScaler{c.Core()}, nil
@ -63,10 +66,9 @@ func ScalerFor(kind schema.GroupKind, c internalclientset.Interface) (Scaler, er
return &JobScaler{c.Batch()}, nil // Either kind of job can be scaled with Batch interface.
case apps.Kind("StatefulSet"):
return &StatefulSetScaler{c.Apps()}, nil
case extensions.Kind("Deployment"), apps.Kind("Deployment"):
return &DeploymentScaler{c.Extensions()}, nil
default:
return &GenericScaler{scalesGetter, gr}, nil
}
return nil, fmt.Errorf("no scaler has been implemented for %q", kind)
}
// ScalePrecondition describes a condition that must be true for the scale to take place
@ -533,7 +535,7 @@ func (precondition *ScalePrecondition) validateGeneric(scale *autoscalingapi.Sca
}
// GenericScaler can update scales for resources in a particular namespace
// TODO(o0lyn0mial): when the work on GenericScaler is done, don't
// TODO(po0lyn0mial): when the work on GenericScaler is done, don't
// export the GenericScaler. Instead use ScalerFor method for getting the Scaler
// also update the UTs
type GenericScaler struct {

View File

@ -196,7 +196,6 @@ func (c *namespacedScaleClient) Update(resource schema.GroupResource, scale *aut
Body(scaleUpdateBytes).
Do()
if err := result.Error(); err != nil {
panic(err)
return nil, fmt.Errorf("could not update the scale for %s %s: %v", resource.String(), scale.Name, err)
}

View File

@ -257,7 +257,7 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() {
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
// to the same size achieves this, because the scale operation advances the RC's sequence number
// and awaits it to be observed and reported back in the RC's status.
framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods, true)
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods, true)
// Only check the keys, the pods can be different if the kubelet updated it.
// TODO: Can it really?
@ -288,9 +288,9 @@ var _ = SIGDescribe("DaemonRestart [Disruptive]", func() {
restarter.kill()
// This is best effort to try and create pods while the scheduler is down,
// since we don't know exactly when it is restarted after the kill signal.
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, false))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, false))
restarter.waitUp()
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rcName, numPods+5, true))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rcName, numPods+5, true))
})
It("Kubelet should not restart containers across restart", func() {

View File

@ -521,7 +521,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
Expect(err).NotTo(HaveOccurred())
By("scaling rethinkdb")
framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "rethinkdb-rc", 2, true)
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "rethinkdb-rc", 2, true)
checkDbInstances()
By("starting admin")
@ -564,7 +564,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
Expect(err).NotTo(HaveOccurred())
By("scaling hazelcast")
framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, "hazelcast", 2, true)
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, "hazelcast", 2, true)
forEachPod("name", "hazelcast", func(pod v1.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())

View File

@ -110,6 +110,7 @@ go_library(
"//vendor/k8s.io/api/policy/v1beta1:go_default_library",
"//vendor/k8s.io/api/rbac/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
@ -132,6 +133,7 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/discovery/cached:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
@ -139,6 +141,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd/api:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
@ -178,8 +179,10 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er
return err
}
func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, ns, name, size, wait, extensionsinternal.Kind("Deployment"))
//TODO(p0lyn0mial): remove internalClientset and kind.
//TODO(p0lyn0mial): update the callers.
func ScaleDeployment(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, extensionsinternal.Kind("Deployment"), extensionsinternal.Resource("deployments"))
}
func RunDeployment(config testutils.DeploymentConfig) error {

View File

@ -28,14 +28,19 @@ import (
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/tools/clientcmd"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -67,6 +72,8 @@ type Framework struct {
AggregatorClient *aggregatorclient.Clientset
ClientPool dynamic.ClientPool
ScalesGetter scaleclient.ScalesGetter
SkipNamespaceCreation bool // Whether to skip creating a namespace
Namespace *v1.Namespace // Every test has at least one namespace unless creation is skipped
namespacesToDelete []*v1.Namespace // Some tests have more than one.
@ -161,6 +168,24 @@ func (f *Framework) BeforeEach() {
f.AggregatorClient, err = aggregatorclient.NewForConfig(config)
Expect(err).NotTo(HaveOccurred())
f.ClientPool = dynamic.NewClientPool(config, legacyscheme.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
// create scales getter, set GroupVersion and NegotiatedSerializer to default values
// as they are required when creating a REST client.
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if config.NegotiatedSerializer == nil {
config.NegotiatedSerializer = legacyscheme.Codecs
}
restClient, err := rest.RESTClientFor(config)
Expect(err).NotTo(HaveOccurred())
discoClient, err := discovery.NewDiscoveryClientForConfig(config)
Expect(err).NotTo(HaveOccurred())
cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient)
restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured)
resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient)
f.ScalesGetter = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver)
if ProviderIs("kubemark") && TestContext.KubemarkExternalKubeConfig != "" && TestContext.CloudConfig.KubemarkController == nil {
externalConfig, err := clientcmd.BuildConfigFromFlags("", TestContext.KubemarkExternalKubeConfig)
externalConfig.QPS = f.Options.ClientQPS

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/api/testapi"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
@ -84,7 +85,9 @@ func RcByNameContainer(name string, replicas int32, image string, labels map[str
// ScaleRCByLabels scales an RC via ns/label lookup. If replicas == 0 it waits till
// none are running, otherwise it does what a synchronous scale operation would do.
func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, ns string, l map[string]string, replicas uint) error {
//TODO(p0lyn0mial): remove internalClientset.
//TODO(p0lyn0mial): update the callers.
func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns string, l map[string]string, replicas uint) error {
listOpts := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labels.Set(l)).String()}
rcs, err := clientset.CoreV1().ReplicationControllers(ns).List(listOpts)
if err != nil {
@ -96,7 +99,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl
Logf("Scaling %v RCs with labels %v in ns %v to %v replicas.", len(rcs.Items), l, ns, replicas)
for _, labelRC := range rcs.Items {
name := labelRC.Name
if err := ScaleRC(clientset, internalClientset, ns, name, replicas, false); err != nil {
if err := ScaleRC(clientset, internalClientset, scalesGetter, ns, name, replicas, false); err != nil {
return err
}
rc, err := clientset.CoreV1().ReplicationControllers(ns).Get(name, metav1.GetOptions{})
@ -156,8 +159,10 @@ func DeleteRCAndPods(clientset clientset.Interface, internalClientset internalcl
return DeleteResourceAndPods(clientset, internalClientset, api.Kind("ReplicationController"), ns, name)
}
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, ns, name, size, wait, api.Kind("ReplicationController"))
//TODO(p0lyn0mial): remove internalClientset.
//TODO(p0lyn0mial): update the callers.
func ScaleRC(clientset clientset.Interface, internalClientset internalclientset.Interface, scalesGetter scaleclient.ScalesGetter, ns, name string, size uint, wait bool) error {
return ScaleResource(clientset, internalClientset, scalesGetter, ns, name, size, wait, api.Kind("ReplicationController"), api.Resource("replicationcontrollers"))
}
func RunRC(config testutils.RCConfig) error {

View File

@ -74,6 +74,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testapi"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -2682,20 +2683,25 @@ func RemoveAvoidPodsOffNode(c clientset.Interface, nodeName string) {
ExpectNoError(err)
}
func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) {
return kubectl.ScalerFor(kind, internalClientset)
//TODO(p0lyn0mial): remove internalClientset and kind
func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind, scalesGetter scaleclient.ScalesGetter, gr schema.GroupResource) (kubectl.Scaler, error) {
return kubectl.ScalerFor(kind, internalClientset, scalesGetter, gr)
}
//TODO(p0lyn0mial): remove internalClientset and kind.
//TODO(p0lyn0mial): update the callers.
func ScaleResource(
clientset clientset.Interface,
internalClientset internalclientset.Interface,
scalesGetter scaleclient.ScalesGetter,
ns, name string,
size uint,
wait bool,
kind schema.GroupKind,
gr schema.GroupResource,
) error {
By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
scaler, err := getScalerForKind(internalClientset, kind)
scaler, err := getScalerForKind(internalClientset, kind, scalesGetter, gr)
if err != nil {
return err
}

View File

@ -1265,7 +1265,7 @@ var _ = SIGDescribe("Services", func() {
}
By("Scaling down replication controller to zero")
framework.ScaleRC(f.ClientSet, f.InternalClientset, t.Namespace, rcSpec.Name, 0, false)
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false)
By("Update service to not tolerate unready services")
_, err = framework.UpdateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {

View File

@ -1,9 +1,4 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
@ -14,7 +9,9 @@ go_library(
"load.go",
],
importpath = "k8s.io/kubernetes/test/e2e/scalability",
visibility = ["//visibility:public"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
@ -26,6 +23,7 @@ go_library(
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
@ -38,8 +36,12 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/discovery/cached:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/transport:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
@ -57,4 +59,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -528,7 +528,7 @@ var _ = SIGDescribe("Density", func() {
podThroughput := 20
timeout := time.Duration(totalPods/podThroughput)*time.Second + 3*time.Minute
// createClients is defined in load.go
clients, internalClients, err := createClients(numberOfCollections)
clients, internalClients, scalesClients, err := createClients(numberOfCollections)
for i := 0; i < numberOfCollections; i++ {
nsName := namespaces[i].Name
secretNames := []string{}
@ -559,6 +559,7 @@ var _ = SIGDescribe("Density", func() {
baseConfig := &testutils.RCConfig{
Client: clients[i],
InternalClient: internalClients[i],
ScalesGetter: scalesClients[i],
Image: framework.GetPauseImageName(f.ClientSet),
Name: name,
Namespace: nsName,
@ -590,7 +591,7 @@ var _ = SIGDescribe("Density", func() {
}
// Single client is running out of http2 connections in delete phase, hence we need more.
clients, internalClients, err = createClients(2)
clients, internalClients, _, err = createClients(2)
dConfig := DensityTestConfig{
ClientSets: clients,

View File

@ -28,14 +28,18 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/apis/batch"
@ -48,6 +52,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/dynamic"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
const (
@ -309,9 +315,11 @@ var _ = SIGDescribe("Load capacity", func() {
}
})
func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, error) {
func createClients(numberOfClients int) ([]clientset.Interface, []internalclientset.Interface, []scaleclient.ScalesGetter, error) {
clients := make([]clientset.Interface, numberOfClients)
internalClients := make([]internalclientset.Interface, numberOfClients)
scalesClients := make([]scaleclient.ScalesGetter, numberOfClients)
for i := 0; i < numberOfClients; i++ {
config, err := framework.LoadConfig()
Expect(err).NotTo(HaveOccurred())
@ -327,11 +335,11 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient
// each client here.
transportConfig, err := config.TransportConfig()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
tlsConfig, err := transport.TLSConfigFor(transportConfig)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
config.Transport = utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
@ -349,16 +357,37 @@ func createClients(numberOfClients int) ([]clientset.Interface, []internalclient
c, err := clientset.NewForConfig(config)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
clients[i] = c
internalClient, err := internalclientset.NewForConfig(config)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
internalClients[i] = internalClient
// create scale client, if GroupVersion or NegotiatedSerializer are not set
// assign default values - these fields are mandatory (required by RESTClientFor).
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{}
}
if config.NegotiatedSerializer == nil {
config.NegotiatedSerializer = legacyscheme.Codecs
}
restClient, err := restclient.RESTClientFor(config)
if err != nil {
return nil, nil, nil, err
}
discoClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, nil, nil, err
}
cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoClient)
restMapper := discovery.NewDeferredDiscoveryRESTMapper(cachedDiscoClient, meta.InterfacesForUnstructured)
resolver := scaleclient.NewDiscoveryScaleKindResolver(cachedDiscoClient)
scalesClients[i] = scaleclient.New(restClient, restMapper, dynamic.LegacyAPIPathResolverFunc, resolver)
}
return clients, internalClients, nil
return clients, internalClients, scalesClients, nil
}
func computePodCounts(total int) (int, int, int) {
@ -405,12 +434,13 @@ func generateConfigs(
// Create a number of clients to better simulate real usecase
// where not everyone is using exactly the same client.
rcsPerClient := 20
clients, internalClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient)
clients, internalClients, scalesClients, err := createClients((len(configs) + rcsPerClient - 1) / rcsPerClient)
framework.ExpectNoError(err)
for i := 0; i < len(configs); i++ {
configs[i].SetClient(clients[i%len(clients)])
configs[i].SetInternalClient(internalClients[i%len(internalClients)])
configs[i].SetScalesClient(scalesClients[i%len(clients)])
}
for i := 0; i < len(secretConfigs); i++ {
secretConfigs[i].Client = clients[i%len(clients)]
@ -590,7 +620,16 @@ func scaleResource(wg *sync.WaitGroup, config testutils.RunObjectConfig, scaling
sleepUpTo(scalingTime)
newSize := uint(rand.Intn(config.GetReplicas()) + config.GetReplicas()/2)
framework.ExpectNoError(framework.ScaleResource(
config.GetClient(), config.GetInternalClient(), config.GetNamespace(), config.GetName(), newSize, true, config.GetKind()),
config.GetClient(),
config.GetInternalClient(),
config.GetScalesGetter(),
config.GetNamespace(),
config.GetName(),
newSize,
true,
config.GetKind(),
config.GetGroupResource(),
),
fmt.Sprintf("scaling %v %v", config.GetKind(), config.GetName()))
selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.GetName()}))

View File

@ -155,7 +155,7 @@ var _ = framework.KubeDescribe("EquivalenceCache [Serial]", func() {
By("Trying to schedule another equivalent Pod should fail due to node label has been removed.")
// use scale to create another equivalent pod and wait for failure event
WaitForSchedulerAfterAction(f, func() error {
err := framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, affinityRCName, uint(replica+1), false)
err := framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, affinityRCName, uint(replica+1), false)
return err
}, affinityRCName, false)
// and this new pod should be rejected since node label has been updated

View File

@ -196,7 +196,7 @@ var _ = SIGDescribe("SchedulerPriorities [Serial]", func() {
By(fmt.Sprintf("Scale the RC: %s to len(nodeList.Item)-1 : %v.", rc.Name, len(nodeList.Items)-1))
framework.ScaleRC(f.ClientSet, f.InternalClientset, ns, rc.Name, uint(len(nodeList.Items)-1), true)
framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, ns, rc.Name, uint(len(nodeList.Items)-1), true)
testPods, err := cs.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: "name=scheduler-priority-avoid-pod",
})

View File

@ -68,8 +68,8 @@ var _ = SIGDescribe("Rescheduler [Serial]", func() {
deployment := deployments.Items[0]
replicas := uint(*(deployment.Spec.Replicas))
err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas+1, true)
defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, metav1.NamespaceSystem, deployment.Name, replicas, true))
err = framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas+1, true)
defer framework.ExpectNoError(framework.ScaleDeployment(f.ClientSet, f.InternalClientset, f.ScalesGetter, metav1.NamespaceSystem, deployment.Name, replicas, true))
framework.ExpectNoError(err)
})
@ -80,7 +80,7 @@ func reserveAllCpu(f *framework.Framework, id string, millicores int) error {
replicas := millicores / 100
reserveCpu(f, id, 1, 100)
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.Namespace.Name, id, uint(replicas), false))
framework.ExpectNoError(framework.ScaleRC(f.ClientSet, f.InternalClientset, f.ScalesGetter, f.Namespace.Name, id, uint(replicas), false))
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) {
pods, err := framework.GetPodsInNamespace(f.ClientSet, f.Namespace.Name, framework.ImagePullerLabels)

View File

@ -22,13 +22,10 @@ go_library(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/policy/v1beta1:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/replication:go_default_library",
"//pkg/generated/openapi:go_default_library",
"//pkg/kubectl:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master:go_default_library",
"//pkg/util/env:go_default_library",

View File

@ -19,22 +19,13 @@ limitations under the License.
package framework
import (
"io/ioutil"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/api/testapi"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/kubectl"
)
const (
@ -80,48 +71,3 @@ func CreateTestingNamespace(baseName string, apiserver *httptest.Server, t *test
func DeleteTestingNamespace(ns *v1.Namespace, apiserver *httptest.Server, t *testing.T) {
// TODO: Remove all resources from a given namespace once we implement CreateTestingNamespace.
}
// RCFromManifest reads a .json file and returns the rc in it.
func RCFromManifest(fileName string) *v1.ReplicationController {
data, err := ioutil.ReadFile(fileName)
if err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
var controller v1.ReplicationController
if err := runtime.DecodeInto(testapi.Default.Codec(), data, &controller); err != nil {
glog.Fatalf("Unexpected error reading rc manifest %v", err)
}
return &controller
}
// StopRC stops the rc via kubectl's stop library
func StopRC(rc *v1.ReplicationController, clientset internalclientset.Interface) error {
reaper, err := kubectl.ReaperFor(api.Kind("ReplicationController"), clientset)
if err != nil || reaper == nil {
return err
}
err = reaper.Stop(rc.Namespace, rc.Name, 0, nil)
if err != nil {
return err
}
return nil
}
// ScaleRC scales the given rc to the given replicas.
func ScaleRC(name, ns string, replicas int32, clientset internalclientset.Interface) (*api.ReplicationController, error) {
scaler, err := kubectl.ScalerFor(api.Kind("ReplicationController"), clientset)
if err != nil {
return nil, err
}
retry := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout}
waitForReplicas := &kubectl.RetryParams{Interval: 50 * time.Millisecond, Timeout: DefaultTimeout}
err = scaler.Scale(ns, name, uint(replicas), nil, retry, waitForReplicas)
if err != nil {
return nil, err
}
scaled, err := clientset.Core().ReplicationControllers(ns).Get(name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return scaled, nil
}

View File

@ -44,6 +44,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/util/workqueue"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
@ -105,16 +106,20 @@ type RunObjectConfig interface {
GetKind() schema.GroupKind
GetClient() clientset.Interface
GetInternalClient() internalclientset.Interface
GetScalesGetter() scaleclient.ScalesGetter
SetClient(clientset.Interface)
SetInternalClient(internalclientset.Interface)
SetScalesClient(scaleclient.ScalesGetter)
GetReplicas() int
GetLabelValue(string) (string, bool)
GetGroupResource() schema.GroupResource
}
type RCConfig struct {
Affinity *v1.Affinity
Client clientset.Interface
InternalClient internalclientset.Interface
ScalesGetter scaleclient.ScalesGetter
Image string
Command []string
Name string
@ -277,6 +282,10 @@ func (config *DeploymentConfig) GetKind() schema.GroupKind {
return extensionsinternal.Kind("Deployment")
}
func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
return extensionsinternal.Resource("deployments")
}
func (config *DeploymentConfig) create() error {
deployment := &extensions.Deployment{
ObjectMeta: metav1.ObjectMeta{
@ -344,6 +353,10 @@ func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
return extensionsinternal.Kind("ReplicaSet")
}
func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
return extensionsinternal.Resource("replicasets")
}
func (config *ReplicaSetConfig) create() error {
rs := &extensions.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
@ -411,6 +424,10 @@ func (config *JobConfig) GetKind() schema.GroupKind {
return batchinternal.Kind("Job")
}
func (config *JobConfig) GetGroupResource() schema.GroupResource {
return batchinternal.Resource("jobs")
}
func (config *JobConfig) create() error {
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
@ -482,6 +499,10 @@ func (config *RCConfig) GetKind() schema.GroupKind {
return api.Kind("ReplicationController")
}
func (config *RCConfig) GetGroupResource() schema.GroupResource {
return api.Resource("replicationcontrollers")
}
func (config *RCConfig) GetClient() clientset.Interface {
return config.Client
}
@ -490,6 +511,10 @@ func (config *RCConfig) GetInternalClient() internalclientset.Interface {
return config.InternalClient
}
func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
return config.ScalesGetter
}
func (config *RCConfig) SetClient(c clientset.Interface) {
config.Client = c
}
@ -498,6 +523,10 @@ func (config *RCConfig) SetInternalClient(c internalclientset.Interface) {
config.InternalClient = c
}
func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
config.ScalesGetter = getter
}
func (config *RCConfig) GetReplicas() int {
return config.Replicas
}