Scheduler changes for extensibility

pull/6/head
Abhishek Gupta 2014-11-19 08:16:25 -08:00
parent 4845e524af
commit 6b712cc700
8 changed files with 194 additions and 49 deletions

View File

@ -168,8 +168,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler.delegate = m.Handler
// Scheduler
schedulerConfigFactory := &factory.ConfigFactory{cl}
schedulerConfig := schedulerConfigFactory.Create()
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}
scheduler.New(schedulerConfig).Run()
endpoints := service.NewEndpointController(cl)

View File

@ -38,10 +38,15 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str
if err != nil {
return "", err
}
if len(minions.Items) == 0 {
return "", fmt.Errorf("no minions available to schedule pods")
}
filteredNodes, err := findNodesThatFit(pod, g.pods, g.predicates, minions)
if err != nil {
return "", err
}
priorityList, err := g.prioritizer(pod, g.pods, FakeMinionLister(filteredNodes))
if err != nil {
return "", err
@ -49,6 +54,7 @@ func (g *genericScheduler) Schedule(pod api.Pod, minionLister MinionLister) (str
if len(priorityList) == 0 {
return "", fmt.Errorf("failed to find a fit for pod: %v", pod)
}
return g.selectHost(priorityList)
}

View File

@ -74,6 +74,7 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
manifest := &(pod.Spec)
glog.Errorf("custom predicate NoDiskConflict --> node: %s", node)
for ix := range manifest.Volumes {
for podIx := range existingPods {
if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) {
@ -104,6 +105,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
// PodFitsResources calculates fit based on requested, rather than used resources
func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.Errorf("custom predicate PodFitsResources --> node: %s", node)
podRequest := getResourceRequest(&pod)
if podRequest.milliCPU == 0 && podRequest.memory == 0 {
// no resources requested always fits.
@ -152,6 +154,7 @@ type NodeSelector struct {
func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
if len(pod.Spec.NodeSelector) == 0 {
glog.Errorf("custom predicate PodSelectorMatches --> node: %s", node)
return true, nil
}
selector := labels.SelectorFromSet(pod.Spec.NodeSelector)
@ -163,6 +166,7 @@ func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, n
}
func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.Errorf("custom predicate PodFitsPorts --> node: %s", node)
existingPorts := getUsedPorts(existingPods...)
wantPorts := getUsedPorts(pod)
for wport := range wantPorts {

View File

@ -104,8 +104,11 @@ func RunApiServer(cl *client.Client, etcdClient tools.EtcdClient, addr string, p
// RunScheduler starts up a scheduler in it's own goroutine
func RunScheduler(cl *client.Client) {
// Scheduler
schedulerConfigFactory := &factory.ConfigFactory{cl}
schedulerConfig := schedulerConfigFactory.Create()
schedulerConfigFactory := factory.NewConfigFactory(cl)
schedulerConfig, err := schedulerConfigFactory.Create(nil, nil)
if err != nil {
glog.Fatal("Couldn't create scheduler config: %v", err)
}
scheduler.New(schedulerConfig).Run()
}

View File

@ -60,8 +60,14 @@ func main() {
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()
configFactory := factory.NewConfigFactory(kubeClient)
configFactory.AddPredicate("CreateOnMinion1", scheduler.CreateOnMinion1)
configFactory.AddPredicate("CreateOnMinion2", scheduler.CreateOnMinion2)
config, err := configFactory.Create([]string{"CreateOnMinion2"}, nil)
if err != nil {
glog.Fatalf("Failed to create scheduler configuration: %v", err)
}
s := scheduler.New(config)
s.Run()

View File

@ -0,0 +1,44 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
func CreateOnMinion1(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.V(2).Infof("custom predicate minion1 --> node: %s", node)
if node == "10.245.2.2" {
glog.V(2).Infof("custom predicate minion1 matched")
return true, nil
} else {
glog.V(2).Infof("custom predicate minion1 did not match")
return false, nil
}
}
func CreateOnMinion2(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.V(2).Infof("custom predicate minion2 --> node: %s", node)
if node == "10.245.2.3" {
glog.V(2).Infof("custom predicate minion2 matched")
return true, nil
} else {
glog.V(2).Infof("custom predicate minion2 did not match")
return false, nil
}
}

View File

@ -37,49 +37,82 @@ import (
"github.com/golang/glog"
)
// ConfigFactory knows how to fill out a scheduler config with its support functions.
type ConfigFactory struct {
// configFactory knows how to fill out a scheduler config with its support functions.
type configFactory struct {
Client *client.Client
// queue for pods that need scheduling
PodQueue *cache.FIFO
// a means to list all scheduled pods
PodLister *storeToPodLister
// a means to list all minions
MinionLister *storeToMinionLister
// map of strings to predicate functions to be used
// to filter the minions for scheduling pods
PredicateMap map[string]algorithm.FitPredicate
// map of strings to priority functions to be used
// to prioritize the filtered minions for scheduling pods
PriorityMap map[string]algorithm.PriorityFunction
}
// NewConfigFactory initializes the factory.
func NewConfigFactory(client *client.Client) *configFactory {
// initialize the factory struct
factory := &configFactory{Client: client,
PodQueue: cache.NewFIFO(),
PodLister: &storeToPodLister{cache.NewStore()},
MinionLister: &storeToMinionLister{cache.NewStore()},
PredicateMap: make(map[string]algorithm.FitPredicate),
PriorityMap: make(map[string]algorithm.PriorityFunction),
}
// add default predicates
factory.addDefaultPredicates()
// add default predicates
factory.addDefaultPriorities()
return factory
}
// Create creates a scheduler and all support functions.
func (factory *ConfigFactory) Create() *scheduler.Config {
func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*scheduler.Config, error) {
if predicateKeys == nil {
glog.V(2).Infof("Custom predicates list not provided, using default predicates")
predicateKeys = []string{"PodFitsPorts", "PodFitsResources", "NoDiskConflict", "MatchNodeSelector"}
}
predicateFuncs, err := factory.getPredicateFunctions(predicateKeys)
if err != nil {
return nil, err
}
if priorityKeys == nil {
glog.V(2).Infof("Custom priority list not provided, using default priorities")
priorityKeys = []string{"LeastRequestedPriority"}
}
priorityFuncs, err := factory.getPriorityFunctions(priorityKeys)
if err != nil {
return nil, err
}
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run()
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, factory.PodQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run()
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, factory.PodLister.Store).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run()
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, factory.MinionLister.Store).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
cache.NewPoller(factory.pollMinions, 10*time.Second, factory.MinionLister.Store).Run()
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
minionLister := &storeToMinionLister{minionCache}
algo := algorithm.NewGenericScheduler(
[]algorithm.FitPredicate{
// Fit is defined based on the absence of port conflicts.
algorithm.PodFitsPorts,
// Fit is determined by resource availability
algorithm.NewResourceFitPredicate(minionLister),
// Fit is determined by non-conflicting disk volumes
algorithm.NoDiskConflict,
// Fit is determined by node selector query
algorithm.NewSelectorMatchPredicate(minionLister),
},
// Prioritize nodes by least requested utilization.
algorithm.LeastRequestedPriority,
&storeToPodLister{podCache}, r)
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs[0], factory.PodLister, r)
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},
@ -87,19 +120,64 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
}
return &scheduler.Config{
MinionLister: minionLister,
MinionLister: factory.MinionLister,
Algorithm: algo,
Binder: &binder{factory.Client},
NextPod: func() *api.Pod {
pod := podQueue.Pop().(*api.Pod)
glog.V(2).Infof("About to try and schedule pod %v\n"+
"\tknown minions: %v\n"+
"\tknown scheduled pods: %v\n",
pod.Name, minionCache.ContainedIDs(), podCache.ContainedIDs())
pod := factory.PodQueue.Pop().(*api.Pod)
glog.V(2).Infof("glog.v2 --> About to try and schedule pod %v", pod.Name)
glog.Errorf("glog.error --> About to try and schedule pod %v", pod.Name)
return pod
},
Error: factory.makeDefaultErrorFunc(&podBackoff, podQueue),
Error: factory.makeDefaultErrorFunc(&podBackoff, factory.PodQueue),
}, nil
}
func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) {
var function algorithm.FitPredicate
predicates := []algorithm.FitPredicate{}
for _, key := range keys {
glog.Errorf("Adding predicate function for key: %s", key)
function = factory.PredicateMap[key]
if function == nil {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
}
return predicates, nil
}
func (factory *configFactory) getPriorityFunctions(keys []string) ([]algorithm.PriorityFunction, error) {
var function algorithm.PriorityFunction
priorities := []algorithm.PriorityFunction{}
for _, key := range keys {
function = factory.PriorityMap[key]
if function == nil {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
priorities = append(priorities, function)
}
return priorities, nil
}
func (factory *configFactory) addDefaultPredicates() {
factory.AddPredicate("PodFitsPorts", algorithm.PodFitsPorts)
factory.AddPredicate("PodFitsResources", algorithm.NewResourceFitPredicate(factory.MinionLister))
factory.AddPredicate("NoDiskConflict", algorithm.NoDiskConflict)
factory.AddPredicate("MatchNodeSelector", algorithm.NewSelectorMatchPredicate(factory.MinionLister))
}
func (factory *configFactory) AddPredicate(key string, function algorithm.FitPredicate) {
factory.PredicateMap[key] = function
}
func (factory *configFactory) addDefaultPriorities() {
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority)
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority)
}
func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) {
factory.PriorityMap[key] = function
}
type listWatch struct {
@ -129,7 +207,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
// createUnassignedPodLW returns a listWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *listWatch {
func (factory *configFactory) createUnassignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
@ -147,7 +225,7 @@ func parseSelectorOrDie(s string) labels.Selector {
// createAssignedPodLW returns a listWatch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
func (factory *configFactory) createAssignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
@ -156,7 +234,7 @@ func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
}
// createMinionLW returns a listWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *listWatch {
func (factory *configFactory) createMinionLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie(""),
@ -165,7 +243,7 @@ func (factory *ConfigFactory) createMinionLW() *listWatch {
}
// pollMinions lists all minions and returns an enumerator for cache.Poller.
func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
func (factory *configFactory) pollMinions() (cache.Enumerator, error) {
list := &api.MinionList{}
err := factory.Client.Get().Path("minions").Do().Into(list)
if err != nil {
@ -174,7 +252,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
return &minionEnumerator{list}, nil
}
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
func (factory *configFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
return func(pod *api.Pod, err error) {
glog.Errorf("Error scheduling %v: %v; retrying", pod.Name, err)
backoff.gc()

View File

@ -42,12 +42,12 @@ func TestCreate(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
factory := ConfigFactory{client}
factory.Create()
factory := NewConfigFactory(client)
factory.Create(nil, nil)
}
func TestCreateLists(t *testing.T) {
factory := ConfigFactory{nil}
factory := NewConfigFactory(nil)
table := []struct {
location string
factory func() *listWatch
@ -85,7 +85,7 @@ func TestCreateLists(t *testing.T) {
}
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
factory := NewConfigFactory(nil)
table := []struct {
rv string
location string
@ -136,6 +136,7 @@ func TestCreateWatches(t *testing.T) {
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.factory().Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
@ -167,7 +168,7 @@ func TestPollMinions(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
cf := ConfigFactory{client}
cf := NewConfigFactory(client)
ce, err := cf.pollMinions()
if err != nil {
@ -194,7 +195,7 @@ func TestDefaultErrorFunc(t *testing.T) {
mux.Handle("/api/"+testapi.Version()+"/pods/foo", &handler)
server := httptest.NewServer(mux)
defer server.Close()
factory := ConfigFactory{client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})}
factory := NewConfigFactory(client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}))
queue := cache.NewFIFO()
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},