[scheduler] interface for configuration factory, configurator.

pull/6/head
jayunit100 2017-01-13 18:51:38 -05:00
parent c14fa94a4a
commit a98d14d2c5
13 changed files with 430 additions and 272 deletions

View File

@ -19,6 +19,8 @@ go_library(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/record:go_default_library", "//pkg/client/record:go_default_library",
"//pkg/client/restclient:go_default_library", "//pkg/client/restclient:go_default_library",
"//pkg/util:go_default_library", "//pkg/util:go_default_library",
@ -29,11 +31,13 @@ go_library(
"//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/metrics:go_default_library", "//plugin/pkg/scheduler/metrics:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:github.com/golang/groupcache/lru", "//vendor:github.com/golang/groupcache/lru",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
], ],
) )
@ -87,6 +91,7 @@ filegroup(
"//plugin/pkg/scheduler/metrics:all-srcs", "//plugin/pkg/scheduler/metrics:all-srcs",
"//plugin/pkg/scheduler/schedulercache:all-srcs", "//plugin/pkg/scheduler/schedulercache:all-srcs",
"//plugin/pkg/scheduler/testing:all-srcs", "//plugin/pkg/scheduler/testing:all-srcs",
"//plugin/pkg/scheduler/util:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
) )

View File

@ -29,6 +29,7 @@ go_library(
"//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/validation:go_default_library", "//plugin/pkg/scheduler/api/validation:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
@ -61,9 +62,9 @@ go_test(
"//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/api/latest:go_default_library", "//plugin/pkg/scheduler/api/latest:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
], ],
) )

View File

@ -21,8 +21,6 @@ package factory
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
@ -46,6 +44,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
) )
const ( const (
@ -54,27 +53,28 @@ const (
maximalGetBackoff = time.Minute maximalGetBackoff = time.Minute
) )
// ConfigFactory knows how to fill out a scheduler config with its support functions. // ConfigFactory is the default implementation of the scheduler.Configurator interface.
// TODO make this private if possible, so that only its interface is externally used.
type ConfigFactory struct { type ConfigFactory struct {
Client clientset.Interface client clientset.Interface
// queue for pods that need scheduling // queue for pods that need scheduling
PodQueue *cache.FIFO podQueue *cache.FIFO
// a means to list all known scheduled pods. // a means to list all known scheduled pods.
ScheduledPodLister *cache.StoreToPodLister scheduledPodLister *cache.StoreToPodLister
// a means to list all known scheduled pods and pods assumed to have been scheduled. // a means to list all known scheduled pods and pods assumed to have been scheduled.
PodLister algorithm.PodLister podLister algorithm.PodLister
// a means to list all nodes // a means to list all nodes
NodeLister *cache.StoreToNodeLister nodeLister *cache.StoreToNodeLister
// a means to list all PersistentVolumes // a means to list all PersistentVolumes
PVLister *cache.StoreToPVFetcher pVLister *cache.StoreToPVFetcher
// a means to list all PersistentVolumeClaims // a means to list all PersistentVolumeClaims
PVCLister *cache.StoreToPersistentVolumeClaimLister pVCLister *cache.StoreToPersistentVolumeClaimLister
// a means to list all services // a means to list all services
ServiceLister *cache.StoreToServiceLister serviceLister *cache.StoreToServiceLister
// a means to list all controllers // a means to list all controllers
ControllerLister *cache.StoreToReplicationControllerLister controllerLister *cache.StoreToReplicationControllerLister
// a means to list all replicasets // a means to list all replicasets
ReplicaSetLister *cache.StoreToReplicaSetLister replicaSetLister *cache.StoreToReplicaSetLister
// Close this to stop all reflectors // Close this to stop all reflectors
StopEverything chan struct{} StopEverything chan struct{}
@ -92,22 +92,23 @@ type ConfigFactory struct {
// SchedulerName of a scheduler is used to select which pods will be // SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key: // processed by this scheduler, based on pods's annotation key:
// 'scheduler.alpha.kubernetes.io/name' // 'scheduler.alpha.kubernetes.io/name'
SchedulerName string schedulerName string
// RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule // RequiredDuringScheduling affinity is not symmetric, but there is an implicit PreferredDuringScheduling affinity rule
// corresponding to every RequiredDuringScheduling affinity rule. // corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
HardPodAffinitySymmetricWeight int hardPodAffinitySymmetricWeight int
// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
FailureDomains string failureDomains []string
// Equivalence class cache // Equivalence class cache
EquivalencePodCache *scheduler.EquivalenceCache equivalencePodCache *scheduler.EquivalenceCache
} }
// Initializes the factory. // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) *ConfigFactory { // return the interface.
func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodAffinitySymmetricWeight int, failureDomains string) scheduler.Configurator {
stopEverything := make(chan struct{}) stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything) schedulerCache := schedulercache.New(30*time.Second, stopEverything)
@ -116,32 +117,32 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
pvcInformer := informerFactory.PersistentVolumeClaims() pvcInformer := informerFactory.PersistentVolumeClaims()
c := &ConfigFactory{ c := &ConfigFactory{
Client: client, client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), podQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
ScheduledPodLister: &cache.StoreToPodLister{}, scheduledPodLister: &cache.StoreToPodLister{},
informerFactory: informerFactory, informerFactory: informerFactory,
// Only nodes in the "Ready" condition with status == "True" are schedulable // Only nodes in the "Ready" condition with status == "True" are schedulable
NodeLister: &cache.StoreToNodeLister{}, nodeLister: &cache.StoreToNodeLister{},
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, pVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
PVCLister: pvcInformer.Lister(), pVCLister: pvcInformer.Lister(),
pvcPopulator: pvcInformer.Informer().GetController(), pvcPopulator: pvcInformer.Informer().GetController(),
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, serviceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, controllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, replicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
schedulerCache: schedulerCache, schedulerCache: schedulerCache,
StopEverything: stopEverything, StopEverything: stopEverything,
SchedulerName: schedulerName, schedulerName: schedulerName,
HardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight, hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
FailureDomains: failureDomains, failureDomains: strings.Split(failureDomains, ","),
} }
c.PodLister = schedulerCache c.podLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods. // On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because // We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that // ScheduledPodLister is something we provide to plug in functions that
// they may need to call. // they may need to call.
c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer( c.scheduledPodLister.Indexer, c.scheduledPodPopulator = cache.NewIndexerInformer(
c.createAssignedNonTerminatedPodLW(), c.createAssignedNonTerminatedPodLW(),
&v1.Pod{}, &v1.Pod{},
0, 0,
@ -153,7 +154,7 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
) )
c.NodeLister.Store, c.nodePopulator = cache.NewInformer( c.nodeLister.Store, c.nodePopulator = cache.NewInformer(
c.createNodeLW(), c.createNodeLW(),
&v1.Node{}, &v1.Node{},
0, 0,
@ -165,14 +166,14 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
) )
// TODO(harryz) need to fill all the handlers here and below for equivalence cache // TODO(harryz) need to fill all the handlers here and below for equivalence cache
c.PVLister.Store, c.pvPopulator = cache.NewInformer( c.pVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(), c.createPersistentVolumeLW(),
&v1.PersistentVolume{}, &v1.PersistentVolume{},
0, 0,
cache.ResourceEventHandlerFuncs{}, cache.ResourceEventHandlerFuncs{},
) )
c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( c.serviceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(), c.createServiceLW(),
&v1.Service{}, &v1.Service{},
0, 0,
@ -180,7 +181,7 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
) )
c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( c.controllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(), c.createControllerLW(),
&v1.ReplicationController{}, &v1.ReplicationController{},
0, 0,
@ -191,6 +192,33 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
return c return c
} }
// GetNodeStore provides the cache to the nodes, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetNodeStore() cache.Store {
return c.nodeLister.Store
}
func (c *ConfigFactory) GetHardPodAffinitySymmetricWeight() int {
return c.hardPodAffinitySymmetricWeight
}
func (c *ConfigFactory) GetFailureDomains() []string {
return c.failureDomains
}
func (f *ConfigFactory) GetSchedulerName() string {
return f.schedulerName
}
// GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests.
func (f *ConfigFactory) GetClient() clientset.Interface {
return f.client
}
// GetScheduledPodListerIndexer provides a pod lister, mostly internal use, but may also be called by mock-tests.
func (c *ConfigFactory) GetScheduledPodListerIndexer() cache.Indexer {
return c.scheduledPodLister.Indexer
}
// TODO(harryz) need to update all the handlers here and below for equivalence cache // TODO(harryz) need to update all the handlers here and below for equivalence cache
func (c *ConfigFactory) addPodToCache(obj interface{}) { func (c *ConfigFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
@ -347,8 +375,8 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) { func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
if f.HardPodAffinitySymmetricWeight < 0 || f.HardPodAffinitySymmetricWeight > 100 { if f.GetHardPodAffinitySymmetricWeight() < 0 || f.GetHardPodAffinitySymmetricWeight() > 100 {
return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.HardPodAffinitySymmetricWeight) return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 0-100", f.GetHardPodAffinitySymmetricWeight())
} }
predicateFuncs, err := f.GetPredicates(predicateKeys) predicateFuncs, err := f.GetPredicates(predicateKeys)
@ -373,25 +401,18 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
f.Run() f.Run()
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
podBackoff := podBackoff{ podBackoff := util.CreateDefaultPodBackoff()
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: realClock{},
defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}
return &scheduler.Config{ return &scheduler.Config{
SchedulerCache: f.schedulerCache, SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes. // The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo, Algorithm: algo,
Binder: &binder{f.Client}, Binder: &binder{f.client},
PodConditionUpdater: &podConditionUpdater{f.Client}, PodConditionUpdater: &podConditionUpdater{f.client},
NextPod: func() *v1.Pod { NextPod: func() *v1.Pod {
return f.getNextPod() return f.getNextPod()
}, },
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue), Error: f.MakeDefaultErrorFunc(podBackoff, f.podQueue),
StopEverything: f.StopEverything, StopEverything: f.StopEverything,
}, nil }, nil
} }
@ -432,31 +453,30 @@ func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]alg
} }
func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) {
failureDomainArgs := strings.Split(f.FailureDomains, ",") for _, failureDomain := range f.failureDomains {
for _, failureDomain := range failureDomainArgs {
if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 { if errs := utilvalidation.IsQualifiedName(failureDomain); len(errs) != 0 {
return nil, fmt.Errorf("invalid failure domain: %q: %s", failureDomain, strings.Join(errs, ";")) return nil, fmt.Errorf("invalid failure domain: %q: %s", failureDomain, strings.Join(errs, ";"))
} }
} }
return &PluginFactoryArgs{ return &PluginFactoryArgs{
PodLister: f.PodLister, PodLister: f.podLister,
ServiceLister: f.ServiceLister, ServiceLister: f.serviceLister,
ControllerLister: f.ControllerLister, ControllerLister: f.controllerLister,
ReplicaSetLister: f.ReplicaSetLister, ReplicaSetLister: f.replicaSetLister,
// All fit predicates only need to consider schedulable nodes. // All fit predicates only need to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeLister: f.nodeLister.NodeCondition(getNodeConditionPredicate()),
NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.NodeLister}, NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.nodeLister},
PVInfo: f.PVLister, PVInfo: f.pVLister,
PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.PVCLister}, PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.pVCLister},
HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight, HardPodAffinitySymmetricWeight: f.hardPodAffinitySymmetricWeight,
FailureDomains: sets.NewString(failureDomainArgs...).List(), FailureDomains: sets.NewString(f.failureDomains...).List(),
}, nil }, nil
} }
func (f *ConfigFactory) Run() { func (f *ConfigFactory) Run() {
// Watch and queue pods that need scheduling. // Watch and queue pods that need scheduling.
cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.PodQueue, 0).RunUntil(f.StopEverything) cache.NewReflector(f.createUnassignedNonTerminatedPodLW(), &v1.Pod{}, f.podQueue, 0).RunUntil(f.StopEverything)
// Begin populating scheduled pods. // Begin populating scheduled pods.
go f.scheduledPodPopulator.Run(f.StopEverything) go f.scheduledPodPopulator.Run(f.StopEverything)
@ -480,24 +500,24 @@ func (f *ConfigFactory) Run() {
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally. // Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Indexer, 0).RunUntil(f.StopEverything) cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.replicaSetLister.Indexer, 0).RunUntil(f.StopEverything)
} }
func (f *ConfigFactory) getNextPod() *v1.Pod { func (f *ConfigFactory) getNextPod() *v1.Pod {
for { for {
pod := cache.Pop(f.PodQueue).(*v1.Pod) pod := cache.Pop(f.podQueue).(*v1.Pod)
if f.responsibleForPod(pod) { if f.ResponsibleForPod(pod) {
glog.V(4).Infof("About to try and schedule pod %v", pod.Name) glog.V(4).Infof("About to try and schedule pod %v", pod.Name)
return pod return pod
} }
} }
} }
func (f *ConfigFactory) responsibleForPod(pod *v1.Pod) bool { func (f *ConfigFactory) ResponsibleForPod(pod *v1.Pod) bool {
if f.SchedulerName == v1.DefaultSchedulerName { if f.schedulerName == v1.DefaultSchedulerName {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName || pod.Annotations[SchedulerAnnotationKey] == "" return pod.Annotations[SchedulerAnnotationKey] == f.schedulerName || pod.Annotations[SchedulerAnnotationKey] == ""
} else { } else {
return pod.Annotations[SchedulerAnnotationKey] == f.SchedulerName return pod.Annotations[SchedulerAnnotationKey] == f.schedulerName
} }
} }
@ -533,7 +553,7 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
// scheduled. // scheduled.
func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch { func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", v1.NamespaceAll, selector)
} }
// Returns a cache.ListWatch that finds all pods that are // Returns a cache.ListWatch that finds all pods that are
@ -541,7 +561,7 @@ func (factory *ConfigFactory) createUnassignedNonTerminatedPodLW() *cache.ListWa
// TODO: return a ListerWatcher interface instead? // TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch { func (factory *ConfigFactory) createAssignedNonTerminatedPodLW() *cache.ListWatch {
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed)) selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + string(v1.PodSucceeded) + ",status.phase!=" + string(v1.PodFailed))
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "pods", v1.NamespaceAll, selector) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "pods", v1.NamespaceAll, selector)
} }
// createNodeLW returns a cache.ListWatch that gets all changes to nodes. // createNodeLW returns a cache.ListWatch that gets all changes to nodes.
@ -549,42 +569,42 @@ func (factory *ConfigFactory) createNodeLW() *cache.ListWatch {
// all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups // all nodes are considered to ensure that the scheduler cache has access to all nodes for lookups
// the NodeCondition is used to filter out the nodes that are not ready or unschedulable // the NodeCondition is used to filter out the nodes that are not ready or unschedulable
// the filtered list is used as the super set of nodes to consider for scheduling // the filtered list is used as the super set of nodes to consider for scheduling
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "nodes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "nodes", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes. // createPersistentVolumeLW returns a cache.ListWatch that gets all changes to persistentVolumes.
func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch { func (factory *ConfigFactory) createPersistentVolumeLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "persistentVolumes", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumes", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims. // createPersistentVolumeClaimLW returns a cache.ListWatch that gets all changes to persistentVolumeClaims.
func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch { func (factory *ConfigFactory) createPersistentVolumeClaimLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "persistentVolumeClaims", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "persistentVolumeClaims", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// Returns a cache.ListWatch that gets all changes to services. // Returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "services", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "services", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// Returns a cache.ListWatch that gets all changes to controllers. // Returns a cache.ListWatch that gets all changes to controllers.
func (factory *ConfigFactory) createControllerLW() *cache.ListWatch { func (factory *ConfigFactory) createControllerLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.Core().RESTClient(), "replicationControllers", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Core().RESTClient(), "replicationControllers", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
// Returns a cache.ListWatch that gets all changes to replicasets. // Returns a cache.ListWatch that gets all changes to replicasets.
func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch { func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch {
return cache.NewListWatchFromClient(factory.Client.Extensions().RESTClient(), "replicasets", v1.NamespaceAll, fields.ParseSelectorOrDie("")) return cache.NewListWatchFromClient(factory.client.Extensions().RESTClient(), "replicasets", v1.NamespaceAll, fields.ParseSelectorOrDie(""))
} }
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) { func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
return func(pod *v1.Pod, err error) { return func(pod *v1.Pod, err error) {
if err == scheduler.ErrNoNodesAvailable { if err == scheduler.ErrNoNodesAvailable {
glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name) glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
} else { } else {
glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err) glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)
} }
backoff.gc() backoff.Gc()
// Retry asynchronously. // Retry asynchronously.
// Note that this is extremely rudimentary and we need a more real error handling path. // Note that this is extremely rudimentary and we need a more real error handling path.
go func() { go func() {
@ -594,15 +614,15 @@ func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue
Name: pod.Name, Name: pod.Name,
} }
entry := backoff.getEntry(podID) entry := backoff.GetEntry(podID)
if !entry.TryWait(backoff.maxDuration) { if !entry.TryWait(backoff.MaxDuration()) {
glog.Warningf("Request for pod %v already in flight, abandoning", podID) glog.Warningf("Request for pod %v already in flight, abandoning", podID)
return return
} }
// Get the pod again; it may have changed/been scheduled already. // Get the pod again; it may have changed/been scheduled already.
getBackoff := initialGetBackoff getBackoff := initialGetBackoff
for { for {
pod, err := factory.Client.Core().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{}) pod, err := factory.client.Core().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
if err == nil { if err == nil {
if len(pod.Spec.NodeName) == 0 { if len(pod.Spec.NodeName) == 0 {
podQueue.AddIfNotPresent(pod) podQueue.AddIfNotPresent(pod)
@ -666,91 +686,3 @@ func (p *podConditionUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) er
} }
return nil return nil
} }
type clock interface {
Now() time.Time
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time.
// It is expected that all users will only use the public TryWait(...) method
// It is also not safe to copy this object.
type backoffEntry struct {
backoff time.Duration
lastUpdate time.Time
reqInFlight int32
}
// tryLock attempts to acquire a lock via atomic compare and swap.
// returns true if the lock was acquired, false otherwise
func (b *backoffEntry) tryLock() bool {
return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1)
}
// unlock returns the lock. panics if the lock isn't held
func (b *backoffEntry) unlock() {
if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) {
panic(fmt.Sprintf("unexpected state on unlocking: %+v", b))
}
}
// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for.
func (b *backoffEntry) TryWait(maxDuration time.Duration) bool {
if !b.tryLock() {
return false
}
defer b.unlock()
b.wait(maxDuration)
return true
}
func (entry *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
duration := entry.backoff
newDuration := time.Duration(duration) * 2
if newDuration > maxDuration {
newDuration = maxDuration
}
entry.backoff = newDuration
glog.V(4).Infof("Backing off %s for pod %+v", duration.String(), entry)
return duration
}
func (entry *backoffEntry) wait(maxDuration time.Duration) {
time.Sleep(entry.getBackoff(maxDuration))
}
type podBackoff struct {
perPodBackoff map[types.NamespacedName]*backoffEntry
lock sync.Mutex
clock clock
defaultDuration time.Duration
maxDuration time.Duration
}
func (p *podBackoff) getEntry(podID types.NamespacedName) *backoffEntry {
p.lock.Lock()
defer p.lock.Unlock()
entry, ok := p.perPodBackoff[podID]
if !ok {
entry = &backoffEntry{backoff: p.defaultDuration}
p.perPodBackoff[podID] = entry
}
entry.lastUpdate = p.clock.Now()
return entry
}
func (p *podBackoff) gc() {
p.lock.Lock()
defer p.lock.Unlock()
now := p.clock.Now()
for podID, entry := range p.perPodBackoff {
if now.Sub(entry.lastUpdate) > p.maxDuration {
delete(p.perPodBackoff, podID)
}
}
}

View File

@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing" apitesting "k8s.io/kubernetes/pkg/api/testing"
@ -38,6 +37,7 @@ import (
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
) )
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
@ -152,13 +152,8 @@ func TestDefaultErrorFunc(t *testing.T) {
defer server.Close() defer server.Close()
factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains) factory := NewConfigFactory(clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}), v1.DefaultSchedulerName, v1.DefaultHardPodAffinitySymmetricWeight, v1.DefaultFailureDomains)
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
podBackoff := podBackoff{ podBackoff := util.CreatePodBackoff(1*time.Millisecond, 1*time.Second)
perPodBackoff: map[types.NamespacedName]*backoffEntry{}, errFunc := factory.MakeDefaultErrorFunc(podBackoff, queue)
clock: &fakeClock{},
defaultDuration: 1 * time.Millisecond,
maxDuration: 1 * time.Second,
}
errFunc := factory.makeDefaultErrorFunc(&podBackoff, queue)
errFunc(testPod, nil) errFunc(testPod, nil)
for { for {
@ -202,14 +197,6 @@ func TestNodeEnumerator(t *testing.T) {
} }
} }
type fakeClock struct {
t time.Time
}
func (f *fakeClock) Now() time.Time {
return f.t
}
func TestBind(t *testing.T) { func TestBind(t *testing.T) {
table := []struct { table := []struct {
binding *v1.Binding binding *v1.Binding
@ -245,66 +232,6 @@ func TestBind(t *testing.T) {
} }
} }
func TestBackoff(t *testing.T) {
clock := fakeClock{}
backoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
clock: &clock,
defaultDuration: 1 * time.Second,
maxDuration: 60 * time.Second,
}
tests := []struct {
podID types.NamespacedName
expectedDuration time.Duration
advanceClock time.Duration
}{
{
podID: types.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 1 * time.Second,
},
{
podID: types.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 2 * time.Second,
},
{
podID: types.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 4 * time.Second,
},
{
podID: types.NamespacedName{Namespace: "default", Name: "bar"},
expectedDuration: 1 * time.Second,
advanceClock: 120 * time.Second,
},
// 'foo' should have been gc'd here.
{
podID: types.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 1 * time.Second,
},
}
for _, test := range tests {
duration := backoff.getEntry(test.podID).getBackoff(backoff.maxDuration)
if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID)
}
clock.t = clock.t.Add(test.advanceClock)
backoff.gc()
}
fooID := types.NamespacedName{Namespace: "default", Name: "foo"}
backoff.perPodBackoff[fooID].backoff = 60 * time.Second
duration := backoff.getEntry(fooID).getBackoff(backoff.maxDuration)
if duration != 60*time.Second {
t.Errorf("expected: 60, got %s", duration.String())
}
// Verify that we split on namespaces correctly, same name, different namespace
fooID.Namespace = "other"
duration = backoff.getEntry(fooID).getBackoff(backoff.maxDuration)
if duration != 1*time.Second {
t.Errorf("expected: 1, got %s", duration.String())
}
}
// TestResponsibleForPod tests if a pod with an annotation that should cause it to // TestResponsibleForPod tests if a pod with an annotation that should cause it to
// be picked up by the default scheduler, is in fact picked by the default scheduler // be picked up by the default scheduler, is in fact picked by the default scheduler
// Two schedulers are made in the test: one is default scheduler and other scheduler // Two schedulers are made in the test: one is default scheduler and other scheduler
@ -363,8 +290,8 @@ func TestResponsibleForPod(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
podOfDefault := factoryDefaultScheduler.responsibleForPod(test.pod) podOfDefault := factoryDefaultScheduler.ResponsibleForPod(test.pod)
podOfFoo := factoryFooScheduler.responsibleForPod(test.pod) podOfFoo := factoryFooScheduler.ResponsibleForPod(test.pod)
results := []bool{podOfDefault, podOfFoo} results := []bool{podOfDefault, podOfFoo}
expected := []bool{test.pickedByDefault, test.pickedByFoo} expected := []bool{test.pickedByDefault, test.pickedByFoo}
if !reflect.DeepEqual(results, expected) { if !reflect.DeepEqual(results, expected) {

View File

@ -24,10 +24,16 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/client/cache"
) )
// Binder knows how to write a binding. // Binder knows how to write a binding.
@ -45,6 +51,33 @@ type Scheduler struct {
config *Config config *Config
} }
// These are the functions which need to be provided in order to build a Scheduler configuration.
// An implementation of this can be seen in factory.go.
type Configurator interface {
GetPriorityFunctionConfigs(priorityKeys sets.String) ([]algorithm.PriorityConfig, error)
GetPriorityMetadataProducer() (algorithm.MetadataProducer, error)
GetPredicateMetadataProducer() (algorithm.MetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error)
GetHardPodAffinitySymmetricWeight() int
GetFailureDomains() []string
GetSchedulerName() string
MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error)
// Probably doesn't need to be public. But exposed for now in case.
ResponsibleForPod(pod *v1.Pod) bool
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeStore() cache.Store
GetClient() clientset.Interface
GetScheduledPodListerIndexer() cache.Indexer
Run()
Create() (*Config, error)
CreateFromProvider(providerName string) (*Config, error)
CreateFromConfig(policy schedulerapi.Policy) (*Config, error)
CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*Config, error)
}
type Config struct { type Config struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.

View File

@ -0,0 +1,40 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["backoff_utils_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor:k8s.io/apimachinery/pkg/types"],
)
go_library(
name = "go_default_library",
srcs = ["backoff_utils.go"],
tags = ["automanaged"],
deps = [
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/types",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,135 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"github.com/golang/glog"
ktypes "k8s.io/apimachinery/pkg/types"
"sync"
"sync/atomic"
"time"
)
type clock interface {
Now() time.Time
}
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
// backoffEntry is single threaded. in particular, it only allows a single action to be waiting on backoff at a time.
// It is expected that all users will only use the public TryWait(...) method
// It is also not safe to copy this object.
type backoffEntry struct {
backoff time.Duration
lastUpdate time.Time
reqInFlight int32
}
// tryLock attempts to acquire a lock via atomic compare and swap.
// returns true if the lock was acquired, false otherwise
func (b *backoffEntry) tryLock() bool {
return atomic.CompareAndSwapInt32(&b.reqInFlight, 0, 1)
}
// unlock returns the lock. panics if the lock isn't held
func (b *backoffEntry) unlock() {
if !atomic.CompareAndSwapInt32(&b.reqInFlight, 1, 0) {
panic(fmt.Sprintf("unexpected state on unlocking: %+v", b))
}
}
// TryWait tries to acquire the backoff lock, maxDuration is the maximum allowed period to wait for.
func (b *backoffEntry) TryWait(maxDuration time.Duration) bool {
if !b.tryLock() {
return false
}
defer b.unlock()
b.wait(maxDuration)
return true
}
func (entry *backoffEntry) getBackoff(maxDuration time.Duration) time.Duration {
duration := entry.backoff
newDuration := time.Duration(duration) * 2
if newDuration > maxDuration {
newDuration = maxDuration
}
entry.backoff = newDuration
glog.V(4).Infof("Backing off %s for pod %+v", duration.String(), entry)
return duration
}
func (entry *backoffEntry) wait(maxDuration time.Duration) {
time.Sleep(entry.getBackoff(maxDuration))
}
type PodBackoff struct {
perPodBackoff map[ktypes.NamespacedName]*backoffEntry
lock sync.Mutex
clock clock
defaultDuration time.Duration
maxDuration time.Duration
}
func (p *PodBackoff) MaxDuration() time.Duration {
return p.maxDuration
}
func CreateDefaultPodBackoff() *PodBackoff {
return CreatePodBackoff(1*time.Second, 60*time.Second)
}
func CreatePodBackoff(defaultDuration, maxDuration time.Duration) *PodBackoff {
return CreatePodBackoffWithClock(defaultDuration, maxDuration, realClock{})
}
func CreatePodBackoffWithClock(defaultDuration, maxDuration time.Duration, clock clock) *PodBackoff {
return &PodBackoff{
perPodBackoff: map[ktypes.NamespacedName]*backoffEntry{},
clock: clock,
defaultDuration: defaultDuration,
maxDuration: maxDuration,
}
}
func (p *PodBackoff) GetEntry(podID ktypes.NamespacedName) *backoffEntry {
p.lock.Lock()
defer p.lock.Unlock()
entry, ok := p.perPodBackoff[podID]
if !ok {
entry = &backoffEntry{backoff: p.defaultDuration}
p.perPodBackoff[podID] = entry
}
entry.lastUpdate = p.clock.Now()
return entry
}
func (p *PodBackoff) Gc() {
p.lock.Lock()
defer p.lock.Unlock()
now := p.clock.Now()
for podID, entry := range p.perPodBackoff {
if now.Sub(entry.lastUpdate) > p.maxDuration {
delete(p.perPodBackoff, podID)
}
}
}

View File

@ -0,0 +1,85 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
ktypes "k8s.io/apimachinery/pkg/types"
"testing"
"time"
)
type fakeClock struct {
t time.Time
}
func (f *fakeClock) Now() time.Time {
return f.t
}
func TestBackoff(t *testing.T) {
clock := fakeClock{}
backoff := CreatePodBackoffWithClock(1*time.Second, 60*time.Second, &clock)
tests := []struct {
podID ktypes.NamespacedName
expectedDuration time.Duration
advanceClock time.Duration
}{
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 1 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 2 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 4 * time.Second,
},
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "bar"},
expectedDuration: 1 * time.Second,
advanceClock: 120 * time.Second,
},
// 'foo' should have been gc'd here.
{
podID: ktypes.NamespacedName{Namespace: "default", Name: "foo"},
expectedDuration: 1 * time.Second,
},
}
for _, test := range tests {
duration := backoff.GetEntry(test.podID).getBackoff(backoff.maxDuration)
if duration != test.expectedDuration {
t.Errorf("expected: %s, got %s for %s", test.expectedDuration.String(), duration.String(), test.podID)
}
clock.t = clock.t.Add(test.advanceClock)
backoff.Gc()
}
fooID := ktypes.NamespacedName{Namespace: "default", Name: "foo"}
backoff.perPodBackoff[fooID].backoff = 60 * time.Second
duration := backoff.GetEntry(fooID).getBackoff(backoff.maxDuration)
if duration != 60*time.Second {
t.Errorf("expected: 60, got %s", duration.String())
}
// Verify that we split on namespaces correctly, same name, different namespace
fooID.Namespace = "other"
duration = backoff.GetEntry(fooID).getBackoff(backoff.maxDuration)
if duration != 1*time.Second {
t.Errorf("expected: 1, got %s", duration.String())
}
}

View File

@ -71,7 +71,7 @@ func TestUnschedulableNodes(t *testing.T) {
defer close(schedulerConfig.StopEverything) defer close(schedulerConfig.StopEverything)
DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.NodeLister.Store) DoTestUnschedulableNodes(t, clientSet, ns, schedulerConfigFactory.GetNodeStore())
} }
func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc { func podScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {

View File

@ -37,7 +37,7 @@ go_test(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library", "//plugin/pkg/scheduler:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//test/utils:go_default_library", "//test/utils:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",

View File

@ -56,7 +56,7 @@ func BenchmarkScheduling1000Nodes1000Pods(b *testing.B) {
func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) { func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
schedulerConfigFactory, finalFunc := mustSetupScheduler() schedulerConfigFactory, finalFunc := mustSetupScheduler()
defer finalFunc() defer finalFunc()
c := schedulerConfigFactory.Client c := schedulerConfigFactory.GetClient()
nodePreparer := framework.NewIntegrationTestNodePreparer( nodePreparer := framework.NewIntegrationTestNodePreparer(
c, c,
@ -74,7 +74,7 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
podCreator.CreatePods() podCreator.CreatePods()
for { for {
scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List() scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List()
if len(scheduled) >= numScheduledPods { if len(scheduled) >= numScheduledPods {
break break
} }
@ -89,7 +89,7 @@ func benchmarkScheduling(numNodes, numScheduledPods int, b *testing.B) {
for { for {
// This can potentially affect performance of scheduler, since List() is done under mutex. // This can potentially affect performance of scheduler, since List() is done under mutex.
// TODO: Setup watch on apiserver and wait until all pods scheduled. // TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled := schedulerConfigFactory.ScheduledPodLister.Indexer.List() scheduled := schedulerConfigFactory.GetScheduledPodListerIndexer().List()
if len(scheduled) >= numScheduledPods+b.N { if len(scheduled) >= numScheduledPods+b.N {
break break
} }

View File

@ -24,12 +24,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/renstrom/dedent" "github.com/renstrom/dedent"
"k8s.io/kubernetes/plugin/pkg/scheduler"
) )
const ( const (
@ -74,7 +74,7 @@ func TestSchedule100Node3KNodeAffinityPods(t *testing.T) {
}) })
} }
config.nodePreparer = framework.NewIntegrationTestNodePreparer( config.nodePreparer = framework.NewIntegrationTestNodePreparer(
config.schedulerConfigFactory.Client, config.schedulerSupportFunctions.GetClient(),
nodeStrategies, nodeStrategies,
"scheduler-perf-", "scheduler-perf-",
) )
@ -106,7 +106,7 @@ func TestSchedule100Node3KNodeAffinityPods(t *testing.T) {
}), }),
) )
} }
config.podCreator = testutils.NewTestPodCreator(config.schedulerConfigFactory.Client, podCreatorConfig) config.podCreator = testutils.NewTestPodCreator(config.schedulerSupportFunctions.GetClient(), podCreatorConfig)
if min := schedulePods(config); min < threshold30K { if min := schedulePods(config); min < threshold30K {
t.Errorf("Too small pod scheduling throughput for 30k pods. Expected %v got %v", threshold30K, min) t.Errorf("Too small pod scheduling throughput for 30k pods. Expected %v got %v", threshold30K, min)
@ -144,19 +144,19 @@ func TestSchedule1000Node30KPods(t *testing.T) {
// } // }
type testConfig struct { type testConfig struct {
numPods int numPods int
numNodes int numNodes int
nodePreparer testutils.TestNodePreparer nodePreparer testutils.TestNodePreparer
podCreator *testutils.TestPodCreator podCreator *testutils.TestPodCreator
schedulerConfigFactory *factory.ConfigFactory schedulerSupportFunctions scheduler.Configurator
destroyFunc func() destroyFunc func()
} }
func baseConfig() *testConfig { func baseConfig() *testConfig {
schedulerConfigFactory, destroyFunc := mustSetupScheduler() schedulerConfigFactory, destroyFunc := mustSetupScheduler()
return &testConfig{ return &testConfig{
schedulerConfigFactory: schedulerConfigFactory, schedulerSupportFunctions: schedulerConfigFactory,
destroyFunc: destroyFunc, destroyFunc: destroyFunc,
} }
} }
@ -164,14 +164,14 @@ func defaultSchedulerBenchmarkConfig(numNodes, numPods int) *testConfig {
baseConfig := baseConfig() baseConfig := baseConfig()
nodePreparer := framework.NewIntegrationTestNodePreparer( nodePreparer := framework.NewIntegrationTestNodePreparer(
baseConfig.schedulerConfigFactory.Client, baseConfig.schedulerSupportFunctions.GetClient(),
[]testutils.CountToStrategy{{Count: numNodes, Strategy: &testutils.TrivialNodePrepareStrategy{}}}, []testutils.CountToStrategy{{Count: numNodes, Strategy: &testutils.TrivialNodePrepareStrategy{}}},
"scheduler-perf-", "scheduler-perf-",
) )
config := testutils.NewTestPodCreatorConfig() config := testutils.NewTestPodCreatorConfig()
config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1")) config.AddStrategy("sched-test", numPods, testutils.NewSimpleWithControllerCreatePodStrategy("rc1"))
podCreator := testutils.NewTestPodCreator(baseConfig.schedulerConfigFactory.Client, config) podCreator := testutils.NewTestPodCreator(baseConfig.schedulerSupportFunctions.GetClient(), config)
baseConfig.nodePreparer = nodePreparer baseConfig.nodePreparer = nodePreparer
baseConfig.podCreator = podCreator baseConfig.podCreator = podCreator
@ -203,7 +203,7 @@ func schedulePods(config *testConfig) int32 {
// Bake in time for the first pod scheduling event. // Bake in time for the first pod scheduling event.
for { for {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
scheduled := config.schedulerConfigFactory.ScheduledPodLister.Indexer.List() scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List()
// 30,000 pods -> wait till @ least 300 are scheduled to start measuring. // 30,000 pods -> wait till @ least 300 are scheduled to start measuring.
// TODO Find out why sometimes there may be scheduling blips in the beggining. // TODO Find out why sometimes there may be scheduling blips in the beggining.
if len(scheduled) > config.numPods/100 { if len(scheduled) > config.numPods/100 {
@ -218,7 +218,7 @@ func schedulePods(config *testConfig) int32 {
// This can potentially affect performance of scheduler, since List() is done under mutex. // This can potentially affect performance of scheduler, since List() is done under mutex.
// Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler. // Listing 10000 pods is an expensive operation, so running it frequently may impact scheduler.
// TODO: Setup watch on apiserver and wait until all pods scheduled. // TODO: Setup watch on apiserver and wait until all pods scheduled.
scheduled := config.schedulerConfigFactory.ScheduledPodLister.Indexer.List() scheduled := config.schedulerSupportFunctions.GetScheduledPodListerIndexer().List()
// We will be completed when all pods are done being scheduled. // We will be completed when all pods are done being scheduled.
// return the worst-case-scenario interval that was seen during this time. // return the worst-case-scenario interval that was seen during this time.

View File

@ -40,7 +40,7 @@ import (
// remove resources after finished. // remove resources after finished.
// Notes on rate limiter: // Notes on rate limiter:
// - client rate limit is set to 5000. // - client rate limit is set to 5000.
func mustSetupScheduler() (schedulerConfigFactory *factory.ConfigFactory, destroyFunc func()) { func mustSetupScheduler() (schedulerConfigFactory scheduler.Configurator, destroyFunc func()) {
h := &framework.MasterHolder{Initialized: make(chan struct{})} h := &framework.MasterHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {