diff --git a/plugin/pkg/scheduler/algorithm/priorities/metadata.go b/plugin/pkg/scheduler/algorithm/priorities/metadata.go new file mode 100644 index 0000000000..5517e0a849 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/metadata.go @@ -0,0 +1,49 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +// priorityMetadata is a type that is passed as metadata for priority functions +type priorityMetadata struct { + nonZeroRequest *schedulercache.Resource + podTolerations []api.Toleration + affinity *api.Affinity +} + +func PriorityMetadata(pod *api.Pod) interface{} { + // If we cannot compute metadata, just return nil + if pod == nil { + return nil + } + tolerations, err := getTolerationListFromPod(pod) + if err != nil { + return nil + } + affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) + if err != nil { + return nil + } + return &priorityMetadata{ + nonZeroRequest: getNonZeroRequests(pod), + podTolerations: tolerations, + affinity: affinity, + } +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities.go b/plugin/pkg/scheduler/algorithm/priorities/priorities.go index f798dd4d5e..52b92c8c4d 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities.go @@ -29,33 +29,6 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -// priorityMetadata is a type that is passed as metadata for priority functions -type priorityMetadata struct { - nonZeroRequest *schedulercache.Resource - podTolerations []api.Toleration - affinity *api.Affinity -} - -func PriorityMetadata(pod *api.Pod) interface{} { - // If we cannot compute metadata, just return nil - if pod == nil { - return nil - } - tolerations, err := getTolerationListFromPod(pod) - if err != nil { - return nil - } - affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations) - if err != nil { - return nil - } - return &priorityMetadata{ - nonZeroRequest: getNonZeroRequests(pod), - podTolerations: tolerations, - affinity: affinity, - } -} - func getNonZeroRequests(pod *api.Pod) *schedulercache.Resource { result := &schedulercache.Resource{} for i := range pod.Spec.Containers { diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index 9e55208399..954c96f07c 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -38,6 +38,9 @@ type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedule // TODO: Change interface{} to a specific type. type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error +// MetdataProducer is a function that computes metadata for a given pod. +type MetadataProducer func(pod *api.Pod) interface{} + // DEPRECATED // Use Map-Reduce pattern for priority functions. type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) @@ -51,6 +54,10 @@ type PriorityConfig struct { Weight int } +func EmptyMetadataProducer(pod *api.Pod) interface{} { + return nil +} + type PredicateFailureReason interface { GetReason() string } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index ecb53bf912..487a0f9bb1 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -58,6 +58,12 @@ func init() { // Cluster autoscaler friendly scheduling algorithm. factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(), replace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority")) + + factory.RegisterPriorityMetadataProducerFactory( + func(args factory.PluginFactoryArgs) algorithm.MetadataProducer { + return priorities.PriorityMetadata + }) + // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // Register the priority function so that its available // but do not include it as part of the default priorities diff --git a/plugin/pkg/scheduler/extender_test.go b/plugin/pkg/scheduler/extender_test.go index 3b06f06280..0dc897a4ea 100644 --- a/plugin/pkg/scheduler/extender_test.go +++ b/plugin/pkg/scheduler/extender_test.go @@ -282,7 +282,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { for ii := range test.extenders { extenders = append(extenders, &test.extenders[ii]) } - scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders) + scheduler := NewGenericScheduler( + schedulertesting.PodsToCache(test.pods), test.predicates, algorithm.EmptyMetadataProducer, + test.prioritizers, extenders) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { if err == nil { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index f49cb99c4d..6dd62f52e6 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -315,9 +315,14 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return nil, err } + priorityMetaProducer, err := f.GetPriorityMetadataProducer() + if err != nil { + return nil, err + } + f.Run() - algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders) + algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityMetaProducer, priorityConfigs, extenders) podBackoff := podBackoff{ perPodBackoff: map[types.NamespacedName]*backoffEntry{}, @@ -351,6 +356,15 @@ func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([] return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) } +func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) { + pluginArgs, err := f.getPluginArgs() + if err != nil { + return nil, err + } + + return getPriorityMetadataProducer(*pluginArgs) +} + func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { pluginArgs, err := f.getPluginArgs() if err != nil { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 712f2b030f..4f00bba3a0 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -46,6 +46,9 @@ type PluginFactoryArgs struct { FailureDomains []string } +// MetadataProducerFactory produces MetadataProducer from the given args. +type MetadataProducerFactory func(PluginFactoryArgs) algorithm.MetadataProducer + // A FitPredicateFactory produces a FitPredicate from the given args. type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate @@ -73,6 +76,8 @@ var ( fitPredicateMap = make(map[string]FitPredicateFactory) priorityFunctionMap = make(map[string]PriorityConfigFactory) algorithmProviderMap = make(map[string]AlgorithmProviderConfig) + // Registered metadata producers + priorityMetadataProducer MetadataProducerFactory ) const ( @@ -148,6 +153,12 @@ func IsFitPredicateRegistered(name string) bool { return ok } +func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + priorityMetadataProducer = factory +} + // DEPRECATED // Use Map-Reduce pattern for priority functions. // Registers a priority function with the algorithm registry. Returns the name, @@ -283,6 +294,16 @@ func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[st return predicates, nil } +func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) { + schedulerFactoryMutex.Lock() + defer schedulerFactoryMutex.Unlock() + + if priorityMetadataProducer == nil { + return algorithm.EmptyMetadataProducer, nil + } + return priorityMetadataProducer(args), nil +} + func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) { schedulerFactoryMutex.Lock() defer schedulerFactoryMutex.Unlock() diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 0e3d53713a..24b45eb5bf 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -62,13 +61,14 @@ func (f *FitError) Error() string { } type genericScheduler struct { - cache schedulercache.Cache - predicates map[string]algorithm.FitPredicate - prioritizers []algorithm.PriorityConfig - extenders []algorithm.SchedulerExtender - pods algorithm.PodLister - lastNodeIndexLock sync.Mutex - lastNodeIndex uint64 + cache schedulercache.Cache + predicates map[string]algorithm.FitPredicate + priorityMetaProducer algorithm.MetadataProducer + prioritizers []algorithm.PriorityConfig + extenders []algorithm.SchedulerExtender + pods algorithm.PodLister + lastNodeIndexLock sync.Mutex + lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo } @@ -113,7 +113,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe } trace.Step("Prioritizing") - priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, g.prioritizers, filteredNodes, g.extenders) + meta := g.priorityMetaProducer(pod) + priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, meta, g.prioritizers, filteredNodes, g.extenders) if err != nil { return "", err } @@ -234,6 +235,7 @@ func podFitsOnNode(pod *api.Pod, meta interface{}, info *schedulercache.NodeInfo func PrioritizeNodes( pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, + meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*api.Node, extenders []algorithm.SchedulerExtender, @@ -255,7 +257,6 @@ func PrioritizeNodes( errs = append(errs, err) } - meta := priorities.PriorityMetadata(pod) results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs)) for range priorityConfigs { results = append(results, nil) @@ -365,12 +366,18 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf return result, nil } -func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { +func NewGenericScheduler( + cache schedulercache.Cache, + predicates map[string]algorithm.FitPredicate, + priorityMetaProducer algorithm.MetadataProducer, + prioritizers []algorithm.PriorityConfig, + extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ - cache: cache, - predicates: predicates, - prioritizers: prioritizers, - extenders: extenders, - cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), + cache: cache, + predicates: predicates, + priorityMetaProducer: priorityMetaProducer, + prioritizers: prioritizers, + extenders: extenders, + cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), } } diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index c47064ba7d..d55dad06ec 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -300,7 +300,9 @@ func TestGenericScheduler(t *testing.T) { for _, name := range test.nodes { cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) } - scheduler := NewGenericScheduler(cache, test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}) + scheduler := NewGenericScheduler( + cache, test.predicates, algorithm.EmptyMetadataProducer, + test.prioritizers, []algorithm.SchedulerExtender{}) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if !reflect.DeepEqual(err, test.wErr) { @@ -501,7 +503,7 @@ func TestZeroRequest(t *testing.T) { } nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) list, err := PrioritizeNodes( - test.pod, nodeNameToInfo, priorityConfigs, + test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs, algorithm.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index f6044f3abf..3aae5a211a 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -384,6 +384,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. algo := NewGenericScheduler( scache, predicateMap, + algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{}, []algorithm.SchedulerExtender{}) bindingChan := make(chan *api.Binding, 1)