From a98d14d2c5275d822889adb5e1b8dcbca8703335 Mon Sep 17 00:00:00 2001 From: jayunit100 Date: Fri, 13 Jan 2017 18:51:38 -0500 Subject: [PATCH] [scheduler] interface for configuration factory, configurator. --- plugin/pkg/scheduler/BUILD | 5 + plugin/pkg/scheduler/factory/BUILD | 3 +- plugin/pkg/scheduler/factory/factory.go | 276 +++++++----------- plugin/pkg/scheduler/factory/factory_test.go | 83 +----- plugin/pkg/scheduler/scheduler.go | 33 +++ plugin/pkg/scheduler/util/BUILD | 40 +++ plugin/pkg/scheduler/util/backoff_utils.go | 135 +++++++++ .../pkg/scheduler/util/backoff_utils_test.go | 85 ++++++ test/integration/scheduler/scheduler_test.go | 2 +- test/integration/scheduler_perf/BUILD | 2 +- .../scheduler_perf/scheduler_bench_test.go | 6 +- .../scheduler_perf/scheduler_test.go | 30 +- test/integration/scheduler_perf/util.go | 2 +- 13 files changed, 430 insertions(+), 272 deletions(-) create mode 100644 plugin/pkg/scheduler/util/BUILD create mode 100644 plugin/pkg/scheduler/util/backoff_utils.go create mode 100644 plugin/pkg/scheduler/util/backoff_utils_test.go diff --git a/plugin/pkg/scheduler/BUILD b/plugin/pkg/scheduler/BUILD index ec056fda75..0535c94ac5 100644 --- a/plugin/pkg/scheduler/BUILD +++ b/plugin/pkg/scheduler/BUILD @@ -19,6 +19,8 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/client/cache:go_default_library", + "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/record:go_default_library", "//pkg/client/restclient:go_default_library", "//pkg/util:go_default_library", @@ -29,11 +31,13 @@ go_library( "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/metrics:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", + "//plugin/pkg/scheduler/util:go_default_library", "//vendor:github.com/golang/glog", "//vendor:github.com/golang/groupcache/lru", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/net", + "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", ], ) @@ -87,6 +91,7 @@ filegroup( "//plugin/pkg/scheduler/metrics:all-srcs", "//plugin/pkg/scheduler/schedulercache:all-srcs", "//plugin/pkg/scheduler/testing:all-srcs", + "//plugin/pkg/scheduler/util:all-srcs", ], tags = ["automanaged"], ) diff --git a/plugin/pkg/scheduler/factory/BUILD b/plugin/pkg/scheduler/factory/BUILD index 7bf4bd921c..e18fc41e7d 100644 --- a/plugin/pkg/scheduler/factory/BUILD +++ b/plugin/pkg/scheduler/factory/BUILD @@ -29,6 +29,7 @@ go_library( "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/validation:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", + "//plugin/pkg/scheduler/util:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", @@ -61,9 +62,9 @@ go_test( "//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", + "//plugin/pkg/scheduler/util:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", - "//vendor:k8s.io/apimachinery/pkg/types", ], ) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 1140d85eba..1fc40ef411 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -21,8 +21,6 @@ package factory import ( "fmt" "strings" - "sync" - "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/errors" @@ -46,6 +44,7 @@ import ( "github.com/golang/glog" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) const ( @@ -54,27 +53,28 @@ const ( maximalGetBackoff = time.Minute ) -// ConfigFactory knows how to fill out a scheduler config with its support functions. +// ConfigFactory is the default implementation of the scheduler.Configurator interface. +// TODO make this private if possible, so that only its interface is externally used. type ConfigFactory struct { - Client clientset.Interface + client clientset.Interface // queue for pods that need scheduling - PodQueue *cache.FIFO + podQueue *cache.FIFO // a means to list all known scheduled pods. - ScheduledPodLister *cache.StoreToPodLister + scheduledPodLister *cache.StoreToPodLister // a means to list all known scheduled pods and pods assumed to have been scheduled. - PodLister algorithm.PodLister + podLister algorithm.PodLister // a means to list all nodes - NodeLister *cache.StoreToNodeLister + nodeLister *cache.StoreToNodeLister // a means to list all PersistentVolumes - PVLister *cache.StoreToPVFetcher + pVLister *cache.StoreToPVFetcher // a means to list all PersistentVolumeClaims - PVCLister *cache.StoreToPersistentVolumeClaimLister + pVCLister *cache.StoreToPersistentVolumeClaimLister // a means to list all services - ServiceLister *cache.StoreToServiceLister + serviceLister *cache.StoreToServiceLister // a means to list all controllers - ControllerLister *cache.StoreToReplicationControllerLister + controllerLister *cache.StoreToReplicationControllerLister // a means to list all replicasets - ReplicaSetLister *cache.StoreToReplicaSetLister + replicaSetLister *cache.StoreToReplicaSetLister // Close this to stop all reflectors StopEverything chan struct{} @@ -92,22 +92,23 @@ type ConfigFactory struct { // SchedulerName of a scheduler is used to select which pods will be // processed by this scheduler, based on pods's annotation key: // 'scheduler.alpha.kubernetes.io/name' - SchedulerName string + schedulerName string // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule // corresponding to every RequiredDuringScheduling affinity rule. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. - HardPodAffinitySymmetricWeight int + hardPodAffinitySymmetricWeight int // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. - FailureDomains string + failureDomains []string // Equivalence class cache - EquivalencePodCache *scheduler.EquivalenceCache + equivalencePodCache *scheduler.EquivalenceCache } -// Initializes the factory. -func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory { +// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only +// return the interface. +func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) scheduler.Configurator { stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) @@ -116,32 +117,32 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA pvcInformer := informerFactory.PersistentVolumeClaims() c := &ConfigFactory{ - Client: client, - PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - ScheduledPodLister: &cache.StoreToPodLister{}, + client: client, + podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), + scheduledPodLister: &cache.StoreToPodLister{}, informerFactory: informerFactory, // Only nodes in the "Ready" condition with status == "True" are schedulable - NodeLister: &cache.StoreToNodeLister{}, - PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - PVCLister: pvcInformer.Lister(), + nodeLister: &cache.StoreToNodeLister{}, + pVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + pVCLister: pvcInformer.Lister(), pvcPopulator: pvcInformer.Informer().GetController(), - ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, - ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, + serviceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, + controllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, + replicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, schedulerCache: schedulerCache, StopEverything: stopEverything, - SchedulerName: schedulerName, - HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, - FailureDomains: failureDomains, + schedulerName: schedulerName, + hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, + failureDomains: strings.Split(failureDomains, ","), } - c.PodLister = schedulerCache + c.podLister = schedulerCache // On add/delete to the scheduled pods, remove from the assumed pods. // We construct this here instead of in CreateFromKeys because // ScheduledPodLister is something we provide to plug in functions that // they may need to call. - c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer( + c.scheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer( c.createAssignedNonTerminatedPodLW(), &v1.Pod{}, 0, @@ -153,7 +154,7 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - c.NodeLister.Store, c.nodePopulator = cache.NewInformer( + c.nodeLister.Store, c.nodePopulator = cache.NewInformer( c.createNodeLW(), &v1.Node{}, 0, @@ -165,14 +166,14 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA ) // TODO(harryz) need to fill all the handlers here and below for equivalence cache - c.PVLister.Store, c.pvPopulator = cache.NewInformer( + c.pVLister.Store, c.pvPopulator = cache.NewInformer( c.createPersistentVolumeLW(), &v1.PersistentVolume{}, 0, cache.ResourceEventHandlerFuncs{}, ) - c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( + c.serviceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( c.createServiceLW(), &v1.Service{}, 0, @@ -180,7 +181,7 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, ) - c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( + c.controllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( c.createControllerLW(), &v1.ReplicationController{}, 0, @@ -191,6 +192,33 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA return c } +// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests. +func (c *ConfigFactory) GetNodeStore() cache.Store { + return c.nodeLister.Store +} + +func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int { + return c.hardPodAffinitySymmetricWeight +} + +func (c *ConfigFactory) GetFailureDomains() []string { + return c.failureDomains +} + +func (f *ConfigFactory) GetSchedulerName() string { + return f.schedulerName +} + +// GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests. +func (f *ConfigFactory) GetClient() clientset.Interface { + return f.client +} + +// GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests. +func (c *ConfigFactory) GetScheduledPodListerIndexer() cache.Indexer { + return c.scheduledPodLister.Indexer +} + // TODO(harryz) need to update all the handlers here and below for equivalence cache func (c *ConfigFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) @@ -347,8 +375,8 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) - if f.HardPodAffinitySymmetricWeight < 0 || f.HardPodAffinitySymmetricWeight > 100 { - return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight) + if f.GetHardPodAffinitySymmetricWeight() < 0 || f.GetHardPodAffinitySymmetricWeight() > 100 { + return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.GetHardPodAffinitySymmetricWeight()) } predicateFuncs, err := f.GetPredicates(predicateKeys) @@ -373,25 +401,18 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, f.Run() algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) - podBackoff := podBackoff{ - perPodBackoff: map[types.NamespacedName]*backoffEntry{}, - clock: realClock{}, - - defaultDuration: 1 * time.Second, - maxDuration: 60 * time.Second, - } - + podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ SchedulerCache: f.schedulerCache, // The scheduler only needs to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), + NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()), Algorithm: algo, - Binder: &binder{f.Client}, - PodConditionUpdater: &podConditionUpdater{f.Client}, + Binder: &binder{f.client}, + PodConditionUpdater: &podConditionUpdater{f.client}, NextPod: func() *v1.Pod { return f.getNextPod() }, - Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), + Error: f.MakeDefaultErrorFunc(podBackoff, f.podQueue), StopEverything: f.StopEverything, }, nil } @@ -432,31 +453,30 @@ func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]alg } func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { - failureDomainArgs := strings.Split(f.FailureDomains, ",") - for _, failureDomain := range failureDomainArgs { + for _, failureDomain := range f.failureDomains { if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 { return nil, fmt.Errorf("invalid failure domain: %q: %s", failureDomain, strings.Join(errs, ";")) } } return &PluginFactoryArgs{ - PodLister: f.PodLister, - ServiceLister: f.ServiceLister, - ControllerLister: f.ControllerLister, - ReplicaSetLister: f.ReplicaSetLister, + PodLister: f.podLister, + ServiceLister: f.serviceLister, + ControllerLister: f.controllerLister, + ReplicaSetLister: f.replicaSetLister, // All fit predicates only need to consider schedulable nodes. - NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), - NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.NodeLister}, - PVInfo: f.PVLister, - PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.PVCLister}, - HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight, - FailureDomains: sets.NewString(failureDomainArgs...).List(), + NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()), + NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.nodeLister}, + PVInfo: f.pVLister, + PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.pVCLister}, + HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight, + FailureDomains: sets.NewString(f.failureDomains...).List(), }, nil } func (f *ConfigFactory) Run() { // Watch and queue pods that need scheduling. - cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) + cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.podQueue, 0).RunUntil(f.StopEverything) // Begin populating scheduled pods. go f.scheduledPodPopulator.Run(f.StopEverything) @@ -480,24 +500,24 @@ func (f *ConfigFactory) Run() { // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. - cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Indexer, 0).RunUntil(f.StopEverything) + cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.replicaSetLister.Indexer, 0).RunUntil(f.StopEverything) } func (f *ConfigFactory) getNextPod() *v1.Pod { for { - pod := cache.Pop(f.PodQueue).(*v1.Pod) - if f.responsibleForPod(pod) { + pod := cache.Pop(f.podQueue).(*v1.Pod) + if f.ResponsibleForPod(pod) { glog.V(4).Infof("About to try and schedule pod %v", pod.Name) return pod } } } -func (f *ConfigFactory) responsibleForPod(pod *v1.Pod) bool { - if f.SchedulerName == v1.DefaultSchedulerName { - return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName || pod.Annotations[SchedulerAnnotationKey] == "" +func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool { + if f.schedulerName == v1.DefaultSchedulerName { + return pod.Annotations[SchedulerAnnotationKey] == f.schedulerName || pod.Annotations[SchedulerAnnotationKey] == "" } else { - return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName + return pod.Annotations[SchedulerAnnotationKey] == f.schedulerName } } @@ -533,7 +553,7 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate { // scheduled. func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch { selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) } // Returns a cache.ListWatch that finds all pods that are @@ -541,7 +561,7 @@ func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWa // TODO: return a ListerWatcher interface instead? func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch { selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) } // createNodeLW returns a cache.ListWatch that gets all changes to nodes. @@ -549,42 +569,42 @@ func (factory *ConfigFactory) createNodeLW() *cache.ListWatch { // all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups // the NodeCondition is used to filter out the nodes that are not ready or unschedulable // the filtered list is used as the super set of nodes to consider for scheduling - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "nodes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "nodes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } // createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes. func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "persistentVolumes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } // createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims. func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "persistentVolumeClaims", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumeClaims", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } // Returns a cache.ListWatch that gets all changes to services. func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "services", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "services", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } // Returns a cache.ListWatch that gets all changes to controllers. func (factory *ConfigFactory) createControllerLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "replicationControllers", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "replicationControllers", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } // Returns a cache.ListWatch that gets all changes to replicasets. func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch { - return cache.NewListWatchFromClient(factory.Client.Extensions().RESTClient(), "replicasets", v1.NamespaceAll, fields.ParseSelectorOrDie("")) + return cache.NewListWatchFromClient(factory.client.Extensions().RESTClient(), "replicasets", v1.NamespaceAll, fields.ParseSelectorOrDie("")) } -func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { +func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) { if err == scheduler.ErrNoNodesAvailable { glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) } else { glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err) } - backoff.gc() + backoff.Gc() // Retry asynchronously. // Note that this is extremely rudimentary and we need a more real error handling path. go func() { @@ -594,15 +614,15 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue Name: pod.Name, } - entry := backoff.getEntry(podID) - if !entry.TryWait(backoff.maxDuration) { + entry := backoff.GetEntry(podID) + if !entry.TryWait(backoff.MaxDuration()) { glog.Warningf("Request for pod %v already in flight, abandoning", podID) return } // Get the pod again; it may have changed/been scheduled already. getBackoff := initialGetBackoff for { - pod, err := factory.Client.Core().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) + pod, err := factory.client.Core().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) if err == nil { if len(pod.Spec.NodeName) == 0 { podQueue.AddIfNotPresent(pod) @@ -666,91 +686,3 @@ func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) er } return nil } - -type clock interface { - Now() time.Time -} - -type realClock struct{} - -func (realClock) Now() time.Time { - return time.Now() -} - -// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. -// It is expected that all users will only use the public TryWait(...) method -// It is also not safe to copy this object. -type backoffEntry struct { - backoff time.Duration - lastUpdate time.Time - reqInFlight int32 -} - -// tryLock attempts to acquire a lock via atomic compare and swap. -// returns true if the lock was acquired, false otherwise -func (b *backoffEntry) tryLock() bool { - return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) -} - -// unlock returns the lock. panics if the lock isn't held -func (b *backoffEntry) unlock() { - if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { - panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) - } -} - -// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. -func (b *backoffEntry) TryWait(maxDuration time.Duration) bool { - if !b.tryLock() { - return false - } - defer b.unlock() - b.wait(maxDuration) - return true -} - -func (entry *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { - duration := entry.backoff - newDuration := time.Duration(duration) * 2 - if newDuration > maxDuration { - newDuration = maxDuration - } - entry.backoff = newDuration - glog.V(4).Infof("Backing off %s for pod %+v", duration.String(), entry) - return duration -} - -func (entry *backoffEntry) wait(maxDuration time.Duration) { - time.Sleep(entry.getBackoff(maxDuration)) -} - -type podBackoff struct { - perPodBackoff map[types.NamespacedName]*backoffEntry - lock sync.Mutex - clock clock - defaultDuration time.Duration - maxDuration time.Duration -} - -func (p *podBackoff) getEntry(podID types.NamespacedName) *backoffEntry { - p.lock.Lock() - defer p.lock.Unlock() - entry, ok := p.perPodBackoff[podID] - if !ok { - entry = &backoffEntry{backoff: p.defaultDuration} - p.perPodBackoff[podID] = entry - } - entry.lastUpdate = p.clock.Now() - return entry -} - -func (p *podBackoff) gc() { - p.lock.Lock() - defer p.lock.Unlock() - now := p.clock.Now() - for podID, entry := range p.perPodBackoff { - if now.Sub(entry.lastUpdate) > p.maxDuration { - delete(p.perPodBackoff, podID) - } - } -} diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 004c63f464..6c7c6bb5b2 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -25,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" apitesting "k8s.io/kubernetes/pkg/api/testing" @@ -38,6 +37,7 @@ import ( schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" ) func TestCreate(t *testing.T) { @@ -152,13 +152,8 @@ func TestDefaultErrorFunc(t *testing.T) { defer server.Close() factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - podBackoff := podBackoff{ - perPodBackoff: map[types.NamespacedName]*backoffEntry{}, - clock: &fakeClock{}, - defaultDuration: 1 * time.Millisecond, - maxDuration: 1 * time.Second, - } - errFunc := factory.makeDefaultErrorFunc(&podBackoff, queue) + podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second) + errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue) errFunc(testPod, nil) for { @@ -202,14 +197,6 @@ func TestNodeEnumerator(t *testing.T) { } } -type fakeClock struct { - t time.Time -} - -func (f *fakeClock) Now() time.Time { - return f.t -} - func TestBind(t *testing.T) { table := []struct { binding *v1.Binding @@ -245,66 +232,6 @@ func TestBind(t *testing.T) { } } -func TestBackoff(t *testing.T) { - clock := fakeClock{} - backoff := podBackoff{ - perPodBackoff: map[types.NamespacedName]*backoffEntry{}, - clock: &clock, - defaultDuration: 1 * time.Second, - maxDuration: 60 * time.Second, - } - - tests := []struct { - podID types.NamespacedName - expectedDuration time.Duration - advanceClock time.Duration - }{ - { - podID: types.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 1 * time.Second, - }, - { - podID: types.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 2 * time.Second, - }, - { - podID: types.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 4 * time.Second, - }, - { - podID: types.NamespacedName{Namespace: "default", Name: "bar"}, - expectedDuration: 1 * time.Second, - advanceClock: 120 * time.Second, - }, - // 'foo' should have been gc'd here. - { - podID: types.NamespacedName{Namespace: "default", Name: "foo"}, - expectedDuration: 1 * time.Second, - }, - } - - for _, test := range tests { - duration := backoff.getEntry(test.podID).getBackoff(backoff.maxDuration) - if duration != test.expectedDuration { - t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) - } - clock.t = clock.t.Add(test.advanceClock) - backoff.gc() - } - fooID := types.NamespacedName{Namespace: "default", Name: "foo"} - backoff.perPodBackoff[fooID].backoff = 60 * time.Second - duration := backoff.getEntry(fooID).getBackoff(backoff.maxDuration) - if duration != 60*time.Second { - t.Errorf("expected: 60, got %s", duration.String()) - } - // Verify that we split on namespaces correctly, same name, different namespace - fooID.Namespace = "other" - duration = backoff.getEntry(fooID).getBackoff(backoff.maxDuration) - if duration != 1*time.Second { - t.Errorf("expected: 1, got %s", duration.String()) - } -} - // TestResponsibleForPod tests if a pod with an annotation that should cause it to // be picked up by the default scheduler, is in fact picked by the default scheduler // Two schedulers are made in the test: one is default scheduler and other scheduler @@ -363,8 +290,8 @@ func TestResponsibleForPod(t *testing.T) { } for _, test := range tests { - podOfDefault := factoryDefaultScheduler.responsibleForPod(test.pod) - podOfFoo := factoryFooScheduler.responsibleForPod(test.pod) + podOfDefault := factoryDefaultScheduler.ResponsibleForPod(test.pod) + podOfFoo := factoryFooScheduler.ResponsibleForPod(test.pod) results := []bool{podOfDefault, podOfFoo} expected := []bool{test.pickedByDefault, test.pickedByFoo} if !reflect.DeepEqual(results, expected) { diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 12e8c7f6cc..51858b7b79 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -24,10 +24,16 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + "k8s.io/kubernetes/plugin/pkg/scheduler/util" + + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/client/cache" ) // Binder knows how to write a binding. @@ -45,6 +51,33 @@ type Scheduler struct { config *Config } +// These are the functions which need to be provided in order to build a Scheduler configuration. +// An implementation of this can be seen in factory.go. +type Configurator interface { + GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error) + GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) + GetPredicateMetadataProducer() (algorithm.MetadataProducer, error) + GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) + GetHardPodAffinitySymmetricWeight() int + GetFailureDomains() []string + GetSchedulerName() string + MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) + + // Probably doesn't need to be public. But exposed for now in case. + ResponsibleForPod(pod *v1.Pod) bool + + // Needs to be exposed for things like integration tests where we want to make fake nodes. + GetNodeStore() cache.Store + GetClient() clientset.Interface + GetScheduledPodListerIndexer() cache.Indexer + Run() + + Create() (*Config, error) + CreateFromProvider(providerName string) (*Config, error) + CreateFromConfig(policy schedulerapi.Policy) (*Config, error) + CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error) +} + type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. diff --git a/plugin/pkg/scheduler/util/BUILD b/plugin/pkg/scheduler/util/BUILD new file mode 100644 index 0000000000..6f1b76e1f8 --- /dev/null +++ b/plugin/pkg/scheduler/util/BUILD @@ -0,0 +1,40 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = ["backoff_utils_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = ["//vendor:k8s.io/apimachinery/pkg/types"], +) + +go_library( + name = "go_default_library", + srcs = ["backoff_utils.go"], + tags = ["automanaged"], + deps = [ + "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/types", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/plugin/pkg/scheduler/util/backoff_utils.go b/plugin/pkg/scheduler/util/backoff_utils.go new file mode 100644 index 0000000000..90566bbfba --- /dev/null +++ b/plugin/pkg/scheduler/util/backoff_utils.go @@ -0,0 +1,135 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "github.com/golang/glog" + ktypes "k8s.io/apimachinery/pkg/types" + "sync" + "sync/atomic" + "time" +) + +type clock interface { + Now() time.Time +} + +type realClock struct{} + +func (realClock) Now() time.Time { + return time.Now() +} + +// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time. +// It is expected that all users will only use the public TryWait(...) method +// It is also not safe to copy this object. +type backoffEntry struct { + backoff time.Duration + lastUpdate time.Time + reqInFlight int32 +} + +// tryLock attempts to acquire a lock via atomic compare and swap. +// returns true if the lock was acquired, false otherwise +func (b *backoffEntry) tryLock() bool { + return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1) +} + +// unlock returns the lock. panics if the lock isn't held +func (b *backoffEntry) unlock() { + if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) { + panic(fmt.Sprintf("unexpected state on unlocking: %+v", b)) + } +} + +// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for. +func (b *backoffEntry) TryWait(maxDuration time.Duration) bool { + if !b.tryLock() { + return false + } + defer b.unlock() + b.wait(maxDuration) + return true +} + +func (entry *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration { + duration := entry.backoff + newDuration := time.Duration(duration) * 2 + if newDuration > maxDuration { + newDuration = maxDuration + } + entry.backoff = newDuration + glog.V(4).Infof("Backing off %s for pod %+v", duration.String(), entry) + return duration +} + +func (entry *backoffEntry) wait(maxDuration time.Duration) { + time.Sleep(entry.getBackoff(maxDuration)) +} + +type PodBackoff struct { + perPodBackoff map[ktypes.NamespacedName]*backoffEntry + lock sync.Mutex + clock clock + defaultDuration time.Duration + maxDuration time.Duration +} + +func (p *PodBackoff) MaxDuration() time.Duration { + return p.maxDuration +} + +func CreateDefaultPodBackoff() *PodBackoff { + return CreatePodBackoff(1*time.Second, 60*time.Second) +} + +func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff { + return CreatePodBackoffWithClock(defaultDuration, maxDuration, realClock{}) +} + +func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff { + return &PodBackoff{ + perPodBackoff: map[ktypes.NamespacedName]*backoffEntry{}, + clock: clock, + defaultDuration: defaultDuration, + maxDuration: maxDuration, + } +} + +func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *backoffEntry { + p.lock.Lock() + defer p.lock.Unlock() + entry, ok := p.perPodBackoff[podID] + if !ok { + entry = &backoffEntry{backoff: p.defaultDuration} + p.perPodBackoff[podID] = entry + } + entry.lastUpdate = p.clock.Now() + return entry +} + +func (p *PodBackoff) Gc() { + p.lock.Lock() + defer p.lock.Unlock() + now := p.clock.Now() + for podID, entry := range p.perPodBackoff { + if now.Sub(entry.lastUpdate) > p.maxDuration { + delete(p.perPodBackoff, podID) + } + } +} diff --git a/plugin/pkg/scheduler/util/backoff_utils_test.go b/plugin/pkg/scheduler/util/backoff_utils_test.go new file mode 100644 index 0000000000..11c6fd56f0 --- /dev/null +++ b/plugin/pkg/scheduler/util/backoff_utils_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + ktypes "k8s.io/apimachinery/pkg/types" + "testing" + "time" +) + +type fakeClock struct { + t time.Time +} + +func (f *fakeClock) Now() time.Time { + return f.t +} + +func TestBackoff(t *testing.T) { + clock := fakeClock{} + backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock) + tests := []struct { + podID ktypes.NamespacedName + expectedDuration time.Duration + advanceClock time.Duration + }{ + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 1 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 2 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 4 * time.Second, + }, + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"}, + expectedDuration: 1 * time.Second, + advanceClock: 120 * time.Second, + }, + // 'foo' should have been gc'd here. + { + podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"}, + expectedDuration: 1 * time.Second, + }, + } + + for _, test := range tests { + duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration) + if duration != test.expectedDuration { + t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID) + } + clock.t = clock.t.Add(test.advanceClock) + backoff.Gc() + } + fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"} + backoff.perPodBackoff[fooID].backoff = 60 * time.Second + duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + if duration != 60*time.Second { + t.Errorf("expected: 60, got %s", duration.String()) + } + // Verify that we split on namespaces correctly, same name, different namespace + fooID.Namespace = "other" + duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration) + if duration != 1*time.Second { + t.Errorf("expected: 1, got %s", duration.String()) + } +} diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 21c752e5d4..e69f49ae2d 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -71,7 +71,7 @@ func TestUnschedulableNodes(t *testing.T) { defer close(schedulerConfig.StopEverything) - DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.NodeLister.Store) + DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeStore()) } func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { diff --git a/test/integration/scheduler_perf/BUILD b/test/integration/scheduler_perf/BUILD index 0acf50a9c5..06bdd784a7 100644 --- a/test/integration/scheduler_perf/BUILD +++ b/test/integration/scheduler_perf/BUILD @@ -37,7 +37,7 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", - "//plugin/pkg/scheduler/factory:go_default_library", + "//plugin/pkg/scheduler:go_default_library", "//test/integration/framework:go_default_library", "//test/utils:go_default_library", "//vendor:github.com/golang/glog", diff --git a/test/integration/scheduler_perf/scheduler_bench_test.go b/test/integration/scheduler_perf/scheduler_bench_test.go index 44e289aa6b..b4f431906c 100644 --- a/test/integration/scheduler_perf/scheduler_bench_test.go +++ b/test/integration/scheduler_perf/scheduler_bench_test.go @@ -56,7 +56,7 @@ func BenchmarkScheduling1000Nodes1000Pods(b *testing.B) { func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { schedulerConfigFactory, finalFunc := mustSetupScheduler() defer finalFunc() - c := schedulerConfigFactory.Client + c := schedulerConfigFactory.GetClient() nodePreparer := framework.NewIntegrationTestNodePreparer( c, @@ -74,7 +74,7 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { podCreator.CreatePods() for { - scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List() + scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List() if len(scheduled) >= numScheduledPods { break } @@ -89,7 +89,7 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { for { // This can potentially affect performance of scheduler, since List() is done under mutex. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List() + scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List() if len(scheduled) >= numScheduledPods+b.N { break } diff --git a/test/integration/scheduler_perf/scheduler_test.go b/test/integration/scheduler_perf/scheduler_test.go index 4ff7e96666..46436d6492 100644 --- a/test/integration/scheduler_perf/scheduler_test.go +++ b/test/integration/scheduler_perf/scheduler_test.go @@ -24,12 +24,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/plugin/pkg/scheduler/factory" "k8s.io/kubernetes/test/integration/framework" testutils "k8s.io/kubernetes/test/utils" "github.com/golang/glog" "github.com/renstrom/dedent" + "k8s.io/kubernetes/plugin/pkg/scheduler" ) const ( @@ -74,7 +74,7 @@ func TestSchedule100Node3KNodeAffinityPods(t *testing.T) { }) } config.nodePreparer = framework.NewIntegrationTestNodePreparer( - config.schedulerConfigFactory.Client, + config.schedulerSupportFunctions.GetClient(), nodeStrategies, "scheduler-perf-", ) @@ -106,7 +106,7 @@ func TestSchedule100Node3KNodeAffinityPods(t *testing.T) { }), ) } - config.podCreator = testutils.NewTestPodCreator(config.schedulerConfigFactory.Client, podCreatorConfig) + config.podCreator = testutils.NewTestPodCreator(config.schedulerSupportFunctions.GetClient(), podCreatorConfig) if min := schedulePods(config); min < threshold30K { t.Errorf("Too small pod scheduling throughput for 30k pods. Expected %v got %v", threshold30K, min) @@ -144,19 +144,19 @@ func TestSchedule1000Node30KPods(t *testing.T) { // } type testConfig struct { - numPods int - numNodes int - nodePreparer testutils.TestNodePreparer - podCreator *testutils.TestPodCreator - schedulerConfigFactory *factory.ConfigFactory - destroyFunc func() + numPods int + numNodes int + nodePreparer testutils.TestNodePreparer + podCreator *testutils.TestPodCreator + schedulerSupportFunctions scheduler.Configurator + destroyFunc func() } func baseConfig() *testConfig { schedulerConfigFactory, destroyFunc := mustSetupScheduler() return &testConfig{ - schedulerConfigFactory: schedulerConfigFactory, - destroyFunc: destroyFunc, + schedulerSupportFunctions: schedulerConfigFactory, + destroyFunc: destroyFunc, } } @@ -164,14 +164,14 @@ func defaultSchedulerBenchmarkConfig(numNodes, numPods int) *testConfig { baseConfig := baseConfig() nodePreparer := framework.NewIntegrationTestNodePreparer( - baseConfig.schedulerConfigFactory.Client, + baseConfig.schedulerSupportFunctions.GetClient(), []testutils.CountToStrategy{{Count: numNodes, Strategy: &testutils.TrivialNodePrepareStrategy{}}}, "scheduler-perf-", ) config := testutils.NewTestPodCreatorConfig() config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1")) - podCreator := testutils.NewTestPodCreator(baseConfig.schedulerConfigFactory.Client, config) + podCreator := testutils.NewTestPodCreator(baseConfig.schedulerSupportFunctions.GetClient(), config) baseConfig.nodePreparer = nodePreparer baseConfig.podCreator = podCreator @@ -203,7 +203,7 @@ func schedulePods(config *testConfig) int32 { // Bake in time for the first pod scheduling event. for { time.Sleep(50 * time.Millisecond) - scheduled := config.schedulerConfigFactory.ScheduledPodLister.Indexer.List() + scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List() // 30,000 pods -> wait till @ least 300 are scheduled to start measuring. // TODO Find out why sometimes there may be scheduling blips in the beggining. if len(scheduled) > config.numPods/100 { @@ -218,7 +218,7 @@ func schedulePods(config *testConfig) int32 { // This can potentially affect performance of scheduler, since List() is done under mutex. // Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler. // TODO: Setup watch on apiserver and wait until all pods scheduled. - scheduled := config.schedulerConfigFactory.ScheduledPodLister.Indexer.List() + scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List() // We will be completed when all pods are done being scheduled. // return the worst-case-scenario interval that was seen during this time. diff --git a/test/integration/scheduler_perf/util.go b/test/integration/scheduler_perf/util.go index aed8e15f04..59faa1fe80 100644 --- a/test/integration/scheduler_perf/util.go +++ b/test/integration/scheduler_perf/util.go @@ -40,7 +40,7 @@ import ( // remove resources after finished. // Notes on rate limiter: // - client rate limit is set to 5000. -func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destroyFunc func()) { +func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destroyFunc func()) { h := &framework.MasterHolder{Initialized: make(chan struct{})} s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {