Merge pull request #32862 from wojtek-t/scheduler_map_reduce_4

Automatic merge from submit-queue

Support metadata producer with underlying data.

Ref #24246
pull/6/head
Kubernetes Submit Queue 2016-09-30 01:30:54 -07:00 committed by GitHub
commit 448ceb3881
10 changed files with 129 additions and 47 deletions

View File

@ -0,0 +1,49 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
nonZeroRequest *schedulercache.Resource
podTolerations []api.Toleration
affinity *api.Affinity
}
func PriorityMetadata(pod *api.Pod) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
tolerations, err := getTolerationListFromPod(pod)
if err != nil {
return nil
}
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return nil
}
return &priorityMetadata{
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: tolerations,
affinity: affinity,
}
}

View File

@ -29,33 +29,6 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
nonZeroRequest *schedulercache.Resource
podTolerations []api.Toleration
affinity *api.Affinity
}
func PriorityMetadata(pod *api.Pod) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
tolerations, err := getTolerationListFromPod(pod)
if err != nil {
return nil
}
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return nil
}
return &priorityMetadata{
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: tolerations,
affinity: affinity,
}
}
func getNonZeroRequests(pod *api.Pod) *schedulercache.Resource { func getNonZeroRequests(pod *api.Pod) *schedulercache.Resource {
result := &schedulercache.Resource{} result := &schedulercache.Resource{}
for i := range pod.Spec.Containers { for i := range pod.Spec.Containers {

View File

@ -38,6 +38,9 @@ type PriorityMapFunction func(pod *api.Pod, meta interface{}, nodeInfo *schedule
// TODO: Change interface{} to a specific type. // TODO: Change interface{} to a specific type.
type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error type PriorityReduceFunction func(pod *api.Pod, result schedulerapi.HostPriorityList) error
// MetdataProducer is a function that computes metadata for a given pod.
type MetadataProducer func(pod *api.Pod) interface{}
// DEPRECATED // DEPRECATED
// Use Map-Reduce pattern for priority functions. // Use Map-Reduce pattern for priority functions.
type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error)
@ -51,6 +54,10 @@ type PriorityConfig struct {
Weight int Weight int
} }
func EmptyMetadataProducer(pod *api.Pod) interface{} {
return nil
}
type PredicateFailureReason interface { type PredicateFailureReason interface {
GetReason() string GetReason() string
} }

View File

@ -58,6 +58,12 @@ func init() {
// Cluster autoscaler friendly scheduling algorithm. // Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(), factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(),
replace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority")) replace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority"))
factory.RegisterPriorityMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
return priorities.PriorityMetadata
})
// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes // EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available // Register the priority function so that its available
// but do not include it as part of the default priorities // but do not include it as part of the default priorities

View File

@ -282,7 +282,9 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders { for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii]) extenders = append(extenders, &test.extenders[ii])
} }
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders) scheduler := NewGenericScheduler(
schedulertesting.PodsToCache(test.pods), test.predicates, algorithm.EmptyMetadataProducer,
test.prioritizers, extenders)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {
if err == nil { if err == nil {

View File

@ -315,9 +315,14 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return nil, err return nil, err
} }
priorityMetaProducer, err := f.GetPriorityMetadataProducer()
if err != nil {
return nil, err
}
f.Run() f.Run()
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders) algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityMetaProducer, priorityConfigs, extenders)
podBackoff := podBackoff{ podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{}, perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@ -351,6 +356,15 @@ func (f *ConfigFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]
return getPriorityFunctionConfigs(priorityKeys, *pluginArgs) return getPriorityFunctionConfigs(priorityKeys, *pluginArgs)
} }
func (f *ConfigFactory) GetPriorityMetadataProducer() (algorithm.MetadataProducer, error) {
pluginArgs, err := f.getPluginArgs()
if err != nil {
return nil, err
}
return getPriorityMetadataProducer(*pluginArgs)
}
func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) { func (f *ConfigFactory) GetPredicates(predicateKeys sets.String) (map[string]algorithm.FitPredicate, error) {
pluginArgs, err := f.getPluginArgs() pluginArgs, err := f.getPluginArgs()
if err != nil { if err != nil {

View File

@ -46,6 +46,9 @@ type PluginFactoryArgs struct {
FailureDomains []string FailureDomains []string
} }
// MetadataProducerFactory produces MetadataProducer from the given args.
type MetadataProducerFactory func(PluginFactoryArgs) algorithm.MetadataProducer
// A FitPredicateFactory produces a FitPredicate from the given args. // A FitPredicateFactory produces a FitPredicate from the given args.
type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate
@ -73,6 +76,8 @@ var (
fitPredicateMap = make(map[string]FitPredicateFactory) fitPredicateMap = make(map[string]FitPredicateFactory)
priorityFunctionMap = make(map[string]PriorityConfigFactory) priorityFunctionMap = make(map[string]PriorityConfigFactory)
algorithmProviderMap = make(map[string]AlgorithmProviderConfig) algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
// Registered metadata producers
priorityMetadataProducer MetadataProducerFactory
) )
const ( const (
@ -148,6 +153,12 @@ func IsFitPredicateRegistered(name string) bool {
return ok return ok
} }
func RegisterPriorityMetadataProducerFactory(factory MetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
priorityMetadataProducer = factory
}
// DEPRECATED // DEPRECATED
// Use Map-Reduce pattern for priority functions. // Use Map-Reduce pattern for priority functions.
// Registers a priority function with the algorithm registry. Returns the name, // Registers a priority function with the algorithm registry. Returns the name,
@ -283,6 +294,16 @@ func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[st
return predicates, nil return predicates, nil
} }
func getPriorityMetadataProducer(args PluginFactoryArgs) (algorithm.MetadataProducer, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
if priorityMetadataProducer == nil {
return algorithm.EmptyMetadataProducer, nil
}
return priorityMetadataProducer(args), nil
}
func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) { func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]algorithm.PriorityConfig, error) {
schedulerFactoryMutex.Lock() schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock() defer schedulerFactoryMutex.Unlock()

View File

@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
@ -62,13 +61,14 @@ func (f *FitError) Error() string {
} }
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig priorityMetaProducer algorithm.MetadataProducer
extenders []algorithm.SchedulerExtender prioritizers []algorithm.PriorityConfig
pods algorithm.PodLister extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex pods algorithm.PodLister
lastNodeIndex uint64 lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo cachedNodeInfoMap map[string]*schedulercache.NodeInfo
} }
@ -113,7 +113,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
} }
trace.Step("Prioritizing") trace.Step("Prioritizing")
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, g.prioritizers, filteredNodes, g.extenders) meta := g.priorityMetaProducer(pod)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, meta, g.prioritizers, filteredNodes, g.extenders)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -234,6 +235,7 @@ func podFitsOnNode(pod *api.Pod, meta interface{}, info *schedulercache.NodeInfo
func PrioritizeNodes( func PrioritizeNodes(
pod *api.Pod, pod *api.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeNameToInfo map[string]*schedulercache.NodeInfo,
meta interface{},
priorityConfigs []algorithm.PriorityConfig, priorityConfigs []algorithm.PriorityConfig,
nodes []*api.Node, nodes []*api.Node,
extenders []algorithm.SchedulerExtender, extenders []algorithm.SchedulerExtender,
@ -255,7 +257,6 @@ func PrioritizeNodes(
errs = append(errs, err) errs = append(errs, err)
} }
meta := priorities.PriorityMetadata(pod)
results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs)) results := make([]schedulerapi.HostPriorityList, 0, len(priorityConfigs))
for range priorityConfigs { for range priorityConfigs {
results = append(results, nil) results = append(results, nil)
@ -365,12 +366,18 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
return result, nil return result, nil
} }
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { func NewGenericScheduler(
cache schedulercache.Cache,
predicates map[string]algorithm.FitPredicate,
priorityMetaProducer algorithm.MetadataProducer,
prioritizers []algorithm.PriorityConfig,
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache, cache: cache,
predicates: predicates, predicates: predicates,
prioritizers: prioritizers, priorityMetaProducer: priorityMetaProducer,
extenders: extenders, prioritizers: prioritizers,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
} }
} }

View File

@ -300,7 +300,9 @@ func TestGenericScheduler(t *testing.T) {
for _, name := range test.nodes { for _, name := range test.nodes {
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
} }
scheduler := NewGenericScheduler(cache, test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}) scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer,
test.prioritizers, []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) { if !reflect.DeepEqual(err, test.wErr) {
@ -501,7 +503,7 @@ func TestZeroRequest(t *testing.T) {
} }
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
list, err := PrioritizeNodes( list, err := PrioritizeNodes(
test.pod, nodeNameToInfo, priorityConfigs, test.pod, nodeNameToInfo, algorithm.EmptyMetadataProducer, priorityConfigs,
algorithm.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{}) algorithm.FakeNodeLister(test.nodes), []algorithm.SchedulerExtender{})
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)

View File

@ -384,6 +384,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algo := NewGenericScheduler( algo := NewGenericScheduler(
scache, scache,
predicateMap, predicateMap,
algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{}) []algorithm.SchedulerExtender{})
bindingChan := make(chan *api.Binding, 1) bindingChan := make(chan *api.Binding, 1)