refactor scheduler factory to use plugin architecture style like credentialprovider and cloudprovider for configuring priority functions and fit predicates

pull/6/head
Mike Danese 2014-12-09 20:25:45 -08:00
parent 7c66ebe2e0
commit 4850bdbe63
9 changed files with 329 additions and 102 deletions

View File

@ -169,7 +169,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
// Scheduler
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}

View File

@ -38,6 +38,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog"
@ -105,7 +106,7 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p
func RunScheduler(cl *client.Client) {
// Scheduler
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog"
)
@ -61,7 +62,7 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := factory.NewConfigFactory(kubeClient)
config, err := configFactory.Create(nil, nil)
config, err := configFactory.Create()
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}

View File

@ -0,0 +1,52 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This is the default algorithm provider for the scheduler.
package defaults
import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
func init() {
factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())
}
func defaultPredicates() util.StringSet {
return util.NewStringSet(
// Fit is defined based on the absence of port conflicts.
factory.RegisterFitPredicate("PodFitsPorts", algorithm.PodFitsPorts),
// Fit is determined by resource availability
factory.RegisterFitPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister)),
// Fit is determined by non-conflicting disk volumes
factory.RegisterFitPredicate("NoDiskConflict", algorithm.NoDiskConflict),
// Fit is determined by node selector query
factory.RegisterFitPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister)),
)
}
func defaultPriorities() util.StringSet {
return util.NewStringSet(
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.RegisterPriorityFunction("SpreadingPriority", algorithm.CalculateSpreadPriority, 1),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
)
}

View File

@ -0,0 +1,22 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This package is used to register algorithm provider plugins.
package algorithmprovider
import (
_ "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)

View File

@ -0,0 +1,65 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package algorithmprovider
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
var (
algorithmProviderNames = []string{
factory.DefaultProvider,
}
)
func TestDefaultConfigExists(t *testing.T) {
p, err := factory.GetAlgorithmProvider(factory.DefaultProvider)
if err != nil {
t.Errorf("error retrivieving default provider: %v", err)
}
if p == nil {
t.Error("algorithm provider config should not be nil")
}
if len(p.FitPredicateKeys) == 0 {
t.Error("default algorithm provider shouldn't have 0 fit predicates")
}
}
func TestAlgorithmProviders(t *testing.T) {
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("error retrivieving '%s' provider: %v", pn, err)
break
}
if len(p.PriorityFunctionKeys) == 0 {
t.Error("%s algorithm provider shouldn't have 0 priority functions", pn)
}
for _, pf := range p.PriorityFunctionKeys.List() {
if !factory.IsPriorityFunctionRegistered(pf) {
t.Errorf("priority function %s is not registerd but is used in the %s algorithm provider", pf, pn)
}
}
for _, fp := range p.FitPredicateKeys.List() {
if !factory.IsFitPredicateRegistered(fp) {
t.Errorf("fit predicate %s is not registerd but is used in the %s algorithm provider", fp, pn)
}
}
}
}

View File

@ -37,6 +37,11 @@ import (
"github.com/golang/glog"
)
var (
PodLister = &storeToPodLister{cache.NewStore()}
MinionLister = &storeToNodeLister{cache.NewStore()}
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
Client *client.Client
@ -46,71 +51,66 @@ type ConfigFactory struct {
PodLister *storeToPodLister
// a means to list all minions
MinionLister *storeToNodeLister
// map of strings to predicate functions to be used
// to filter the minions for scheduling pods
PredicateMap map[string]algorithm.FitPredicate
// map of strings to priority config to be used
// to prioritize the filtered minions for scheduling pods
PriorityMap map[string]algorithm.PriorityConfig
}
// NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *ConfigFactory {
// initialize the factory struct
factory := &ConfigFactory{
return &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(),
PodLister: &storeToPodLister{cache.NewStore()},
MinionLister: &storeToNodeLister{cache.NewStore()},
PredicateMap: make(map[string]algorithm.FitPredicate),
PriorityMap: make(map[string]algorithm.PriorityConfig),
PodLister: PodLister,
MinionLister: MinionLister,
}
factory.addDefaultPredicates()
factory.addDefaultPriorities()
return factory
}
// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) {
if predicateKeys == nil {
glog.V(2).Infof("Custom predicates list not provided, using default predicates")
predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"}
}
predicateFuncs, err := factory.getPredicateFunctions(predicateKeys)
// Create creates a scheduler with the default algorithm provider.
func (f *ConfigFactory) Create() (*scheduler.Config, error) {
return f.CreateFromProvider(DefaultProvider)
}
// CreateFromProvider creates a scheduler from the name of a registered algorithm provider.
func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler from algorithm provider '%v'", providerName)
provider, err := GetAlgorithmProvider(providerName)
if err != nil {
return nil, err
}
if priorityKeys == nil {
glog.V(2).Infof("Custom priority list not provided, using default priority: LeastRequestedPriority")
priorityKeys = []string{"LeastRequestedPriority"}
return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys)
}
// CreateFromKeys creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) {
glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)
predicateFuncs, err := getFitPredicateFunctions(predicateKeys)
if err != nil {
return nil, err
}
priorityConfigs, err := factory.getPriorityConfigs(priorityKeys)
priorityConfigs, err := getPriorityFunctionConfigs(priorityKeys)
if err != nil {
return nil, err
}
// Watch and queue pods that need scheduling.
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run()
cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run()
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionLW(), &api.Node{}, factory.MinionLister.Store).Run()
cache.NewReflector(f.createMinionLW(), &api.Node{}, f.MinionLister.Store).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run()
cache.NewPoller(f.pollMinions, 10*time.Second, f.MinionLister.Store).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r)
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, f.PodLister, r)
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},
@ -118,79 +118,18 @@ func (factory *ConfigFactory) Create(predicateKeys, priorityKeys []string) (*sch
}
return &scheduler.Config{
MinionLister: factory.MinionLister,
MinionLister: f.MinionLister,
Algorithm: algo,
Binder: &binder{factory.Client},
Binder: &binder{f.Client},
NextPod: func() *api.Pod {
pod := factory.PodQueue.Pop().(*api.Pod)
pod := f.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
return pod
},
Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue),
Error: f.makeDefaultErrorFunc(&podBackoff, f.PodQueue),
}, nil
}
func (factory *ConfigFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) {
predicates := []algorithm.FitPredicate{}
for _, key := range keys {
function, ok := factory.PredicateMap[key]
if !ok {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
}
return predicates, nil
}
func (factory *ConfigFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) {
configs := []algorithm.PriorityConfig{}
for _, key := range keys {
config, ok := factory.PriorityMap[key]
if !ok {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
configs = append(configs, config)
}
return configs, nil
}
func (factory *ConfigFactory) addDefaultPredicates() {
// Fit is defined based on the absence of port conflicts.
factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts)
// Fit is determined by resource availability
factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister))
// Fit is determined by non-conflicting disk volumes
factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict)
// Fit is determined by node selector query
factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister))
}
func (factory *ConfigFactory) AddPredicate(key string, function algorithm.FitPredicate) {
factory.PredicateMap[key] = function
}
func (factory *ConfigFactory) addDefaultPriorities() {
// Prioritize nodes by least requested utilization.
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1)
// spreads pods by minimizing the number of pods on the same minion with the same labels.
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1)
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0)
}
func (factory *ConfigFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) {
factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight}
}
func (factory *ConfigFactory) SetWeight(key string, weight int) {
config, ok := factory.PriorityMap[key]
if !ok {
glog.Errorf("Invalid priority key %s specified - no corresponding function found", key)
return
}
config.Weight = weight
}
type listWatch struct {
client *client.Client
fieldSelector labels.Selector

View File

@ -43,7 +43,7 @@ func TestCreate(t *testing.T) {
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
factory := NewConfigFactory(client)
factory.Create(nil, nil)
factory.Create()
}
func TestCreateLists(t *testing.T) {

View File

@ -0,0 +1,147 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package factory
import (
"fmt"
"sync"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
var (
schedulerFactoryMutex sync.Mutex
// maps that hold registered algorithm types
fitPredicateMap = make(map[string]algorithm.FitPredicate)
priorityFunctionMap = make(map[string]algorithm.PriorityConfig)
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
)
const (
DefaultProvider = "default"
)
type AlgorithmProviderConfig struct {
FitPredicateKeys util.StringSet
PriorityFunctionKeys util.StringSet
}
// RegisterFitPredicate registers a fit predicate with the algorithm registry. Returns the key,
// with which the predicate was registered.
func RegisterFitPredicate(key string, predicate algorithm.FitPredicate) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
fitPredicateMap[key] = predicate
return key
}
// IsFitPredicateRegistered check is useful for testing providers.
func IsFitPredicateRegistered(key string) bool {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
_, ok := fitPredicateMap[key]
return ok
}
// RegisterFitPredicate registers a priority function with the algorithm registry. Returns the key,
// with which the function was registered.
func RegisterPriorityFunction(key string, function algorithm.PriorityFunction, weight int) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
priorityFunctionMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight}
return key
}
// IsPriorityFunctionRegistered check is useful for testing providers.
func IsPriorityFunctionRegistered(key string) bool {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
_, ok := priorityFunctionMap[key]
return ok
}
// SetPriorityFunctionWeight sets the weight of an already registered priority function.
func SetPriorityFunctionWeight(key string, weight int) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
config, ok := priorityFunctionMap[key]
if !ok {
glog.Errorf("Invalid priority key %s specified - no corresponding function found", key)
return
}
config.Weight = weight
}
// RegisterAlgorithmProvider registers a new algorithm provider with the algorithm registry. This should
// be called from the init function in a provider plugin.
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys util.StringSet) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
algorithmProviderMap[name] = AlgorithmProviderConfig{
FitPredicateKeys: predicateKeys,
PriorityFunctionKeys: priorityKeys,
}
return name
}
// GetAlgorithmProvider should not be used to modify providers. It is publicly visible for testing.
func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
var provider AlgorithmProviderConfig
provider, ok := algorithmProviderMap[name]
if !ok {
return nil, fmt.Errorf("plugin '%v' has not been registered", provider)
}
return &provider, nil
}
func getFitPredicateFunctions(keys util.StringSet) ([]algorithm.FitPredicate, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
predicates := []algorithm.FitPredicate{}
for _, key := range keys.List() {
function, ok := fitPredicateMap[key]
if !ok {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
}
return predicates, nil
}
func getPriorityFunctionConfigs(keys util.StringSet) ([]algorithm.PriorityConfig, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
configs := []algorithm.PriorityConfig{}
for _, key := range keys.List() {
config, ok := priorityFunctionMap[key]
if !ok {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
configs = append(configs, config)
}
return configs, nil
}