Combining scheduler priority functions using weighted averages

pull/6/head
Abhishek Gupta 2014-11-25 18:10:25 -08:00
parent 7f374030c1
commit 13831856c9
10 changed files with 136 additions and 127 deletions

View File

@ -27,7 +27,7 @@ import (
type genericScheduler struct {
predicates []FitPredicate
prioritizers []PriorityFunction
prioritizers []PriorityConfig
pods PodLister
random *rand.Rand
randomLock sync.Mutex
@ -62,7 +62,7 @@ func (g *genericScheduler) selectHost(priorityList HostPriorityList) (string, er
if len(priorityList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
sort.Sort(priorityList)
sort.Sort(sort.Reverse(priorityList))
hosts := getMinHosts(priorityList)
g.randomLock.Lock()
@ -97,19 +97,21 @@ func findNodesThatFit(pod api.Pod, podLister PodLister, predicates []FitPredicat
return api.MinionList{Items: filtered}, nil
}
func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityFunction, minionLister MinionLister) (HostPriorityList, error) {
func prioritizeNodes(pod api.Pod, podLister PodLister, priorities []PriorityConfig, minionLister MinionLister) (HostPriorityList, error) {
result := HostPriorityList{}
combinedScores := map[string]int{}
for _, priority := range priorities {
prioritizedList, err := priority(pod, podLister, minionLister)
if err != nil {
return HostPriorityList{}, err
}
if len(priorities) == 1 {
return prioritizedList, nil
}
for _, hostEntry := range prioritizedList {
combinedScores[hostEntry.host] += hostEntry.score
weight := priority.Weight
// skip the priority function if the weight is specified as 0
if weight > 0 {
priorityFunc := priority.Function
prioritizedList, err := priorityFunc(pod, podLister, minionLister)
if err != nil {
return HostPriorityList{}, err
}
for _, hostEntry := range prioritizedList {
combinedScores[hostEntry.host] += hostEntry.score * weight
}
}
}
for host, score := range combinedScores {
@ -148,7 +150,7 @@ func EqualPriority(pod api.Pod, podLister PodLister, minionLister MinionLister)
return result, nil
}
func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityFunction, pods PodLister, random *rand.Rand) Scheduler {
func NewGenericScheduler(predicates []FitPredicate, prioritizers []PriorityConfig, pods PodLister, random *rand.Rand) Scheduler {
return &genericScheduler{
predicates: predicates,
prioritizers: prioritizers,

View File

@ -18,6 +18,7 @@ package scheduler
import (
"fmt"
"math"
"math/rand"
"strconv"
"testing"
@ -59,6 +60,29 @@ func numericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister
return result, nil
}
func reverseNumericPriority(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error) {
var maxScore float64
minScore := math.MaxFloat64
reverseResult := []HostPriority{}
result, err := numericPriority(pod, podLister, minionLister)
if err != nil {
return nil, err
}
for _, hostPriority := range result {
maxScore = math.Max(maxScore, float64(hostPriority.score))
minScore = math.Min(minScore, float64(hostPriority.score))
}
for _, hostPriority := range result {
reverseResult = append(reverseResult, HostPriority{
host: hostPriority.host,
score: int(maxScore + minScore - float64(hostPriority.score)),
})
}
return reverseResult, nil
}
func makeMinionList(nodeNames []string) api.MinionList {
result := api.MinionList{
Items: make([]api.Minion, len(nodeNames)),
@ -81,28 +105,28 @@ func TestSelectHost(t *testing.T) {
{host: "machine1.1", score: 1},
{host: "machine2.1", score: 2},
},
possibleHosts: util.NewStringSet("machine1.1"),
possibleHosts: util.NewStringSet("machine2.1"),
expectsErr: false,
},
// equal scores
{
list: []HostPriority{
{host: "machine1.1", score: 1},
{host: "machine1.2", score: 1},
{host: "machine1.3", score: 1},
{host: "machine1.2", score: 2},
{host: "machine1.3", score: 2},
{host: "machine2.1", score: 2},
},
possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"),
possibleHosts: util.NewStringSet("machine1.2", "machine1.3", "machine2.1"),
expectsErr: false,
},
// out of order scores
{
list: []HostPriority{
{host: "machine1.1", score: 1},
{host: "machine1.2", score: 1},
{host: "machine1.1", score: 3},
{host: "machine1.2", score: 3},
{host: "machine2.1", score: 2},
{host: "machine3.1", score: 3},
{host: "machine1.3", score: 1},
{host: "machine3.1", score: 1},
{host: "machine1.3", score: 3},
},
possibleHosts: util.NewStringSet("machine1.1", "machine1.2", "machine1.3"),
expectsErr: false,
@ -137,8 +161,9 @@ func TestSelectHost(t *testing.T) {
func TestGenericScheduler(t *testing.T) {
tests := []struct {
name string
predicates []FitPredicate
prioritizers []PriorityFunction
prioritizers []PriorityConfig
nodes []string
pod api.Pod
expectedHost string
@ -146,43 +171,57 @@ func TestGenericScheduler(t *testing.T) {
}{
{
predicates: []FitPredicate{falsePredicate},
prioritizers: []PriorityFunction{EqualPriority},
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
expectsErr: true,
name: "test 1",
},
{
predicates: []FitPredicate{truePredicate},
prioritizers: []PriorityFunction{EqualPriority},
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
// Random choice between both, the rand seeded above with zero, chooses "machine2"
expectedHost: "machine2",
// Random choice between both, the rand seeded above with zero, chooses "machine1"
expectedHost: "machine1",
name: "test 2",
},
{
// Fits on a machine where the pod ID matches the machine name
predicates: []FitPredicate{matchesPredicate},
prioritizers: []PriorityFunction{EqualPriority},
prioritizers: []PriorityConfig{{Function: EqualPriority, Weight: 1}},
nodes: []string{"machine1", "machine2"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "machine2"}},
expectedHost: "machine2",
name: "test 3",
},
{
predicates: []FitPredicate{truePredicate},
prioritizers: []PriorityFunction{numericPriority},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
expectedHost: "1",
expectedHost: "3",
name: "test 4",
},
{
predicates: []FitPredicate{matchesPredicate},
prioritizers: []PriorityFunction{numericPriority},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHost: "2",
name: "test 5",
},
{
predicates: []FitPredicate{truePredicate},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}, {Function: reverseNumericPriority, Weight: 2}},
nodes: []string{"3", "2", "1"},
pod: api.Pod{ObjectMeta: api.ObjectMeta{Name: "2"}},
expectedHost: "1",
name: "test 6",
},
{
predicates: []FitPredicate{truePredicate, falsePredicate},
prioritizers: []PriorityFunction{numericPriority},
prioritizers: []PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3", "2", "1"},
expectsErr: true,
name: "test 7",
},
}
@ -199,7 +238,7 @@ func TestGenericScheduler(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
if test.expectedHost != machine {
t.Errorf("Expected: %s, Saw: %s", test.expectedHost, machine)
t.Errorf("Failed : %s, Expected: %s, Saw: %s", test.name, test.expectedHost, machine)
}
}
}

View File

@ -22,11 +22,13 @@ import (
"github.com/golang/glog"
)
func calculatePercentage(requested, capacity int) int {
// the unused capacity is calculated on a scale of 0-10
// 0 being the lowest priority and 10 being the highest
func calculateScore(requested, capacity int) int {
if capacity == 0 {
return 0
}
return (requested * 100) / capacity
return ((capacity - requested) * 10) / capacity
}
// Calculate the occupancy on a node. 'node' has information about the resources on the node.
@ -41,13 +43,13 @@ func calculateOccupancy(node api.Minion, pods []api.Pod) HostPriority {
}
}
percentageCPU := calculatePercentage(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0))
percentageMemory := calculatePercentage(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0))
glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Percentage:(%d\\%m, %d\\%)", totalCPU, totalMemory, percentageCPU, percentageMemory)
cpuScore := calculateScore(totalCPU, resources.GetIntegerResource(node.Spec.Capacity, resources.CPU, 0))
memoryScore := calculateScore(totalMemory, resources.GetIntegerResource(node.Spec.Capacity, resources.Memory, 0))
glog.V(4).Infof("Least Requested Priority, AbsoluteRequested: (%d, %d) Score:(%d, %d)", totalCPU, totalMemory, cpuScore, memoryScore)
return HostPriority{
host: node.Name,
score: int((percentageCPU + percentageMemory) / 2),
score: int((cpuScore + memoryScore) / 2),
}
}

View File

@ -74,12 +74,12 @@ func TestLeastRequested(t *testing.T) {
}{
{
nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "nothing scheduled",
},
{
nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "no resources requested",
pods: []api.Pod{
{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}},
@ -90,8 +90,8 @@ func TestLeastRequested(t *testing.T) {
},
{
nodes: []api.Minion{makeMinion("machine1", 4000, 10000), makeMinion("machine2", 4000, 10000)},
expectedList: []HostPriority{{"machine1", 37 /* int(75% / 2) */}, {"machine2", 62 /* int( 75% + 50% / 2) */}},
test: "no resources requested",
expectedList: []HostPriority{{"machine1", 6 /* int(200%-75% / 2) */}, {"machine2", 3 /* int( 200%-125% / 2) */}},
test: "resources requested",
pods: []api.Pod{
{Spec: cpuOnly, Status: api.PodStatus{Host: "machine1"}},
{Spec: cpuAndMemory, Status: api.PodStatus{Host: "machine2"}},

View File

@ -39,7 +39,7 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
}
var maxCount int
var fScore float32
var fScore float32 = 10.0
counts := map[string]int{}
if len(pods) > 0 {
for _, pod := range pods {
@ -59,10 +59,11 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
}
result := []HostPriority{}
//score int
//score int - scale of 0-10
// 0 being the lowest priority and 10 being the highest
for _, minion := range minions.Items {
if maxCount > 0 {
fScore = 100 * (float32(counts[minion.Name]) / float32(maxCount))
fScore = 10 * (float32(maxCount-counts[minion.Name]) / float32(maxCount))
}
result = append(result, HostPriority{host: minion.Name, score: int(fScore)})
}
@ -70,5 +71,5 @@ func CalculateSpreadPriority(pod api.Pod, podLister PodLister, minionLister Mini
}
func NewSpreadingScheduler(podLister PodLister, minionLister MinionLister, predicates []FitPredicate, random *rand.Rand) Scheduler {
return NewGenericScheduler(predicates, []PriorityFunction{CalculateSpreadPriority}, podLister, random)
return NewGenericScheduler(predicates, []PriorityConfig{{Function: CalculateSpreadPriority, Weight: 1}}, podLister, random)
}

View File

@ -47,21 +47,21 @@ func TestSpreadPriority(t *testing.T) {
}{
{
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "nothing scheduled",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status}},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "no labels",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
pods: []api.Pod{{Status: machine1Status, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 10}},
test: "different labels",
},
{
@ -71,7 +71,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 100}},
expectedList: []HostPriority{{"machine1", 10}, {"machine2", 0}},
test: "one label match",
},
{
@ -82,7 +82,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 100}, {"machine2", 100}},
expectedList: []HostPriority{{"machine1", 0}, {"machine2", 0}},
test: "two label matches on different machines",
},
{
@ -94,7 +94,7 @@ func TestSpreadPriority(t *testing.T) {
{Status: machine2Status, ObjectMeta: api.ObjectMeta{Labels: labels1}},
},
nodes: []string{"machine1", "machine2"},
expectedList: []HostPriority{{"machine1", 50}, {"machine2", 100}},
expectedList: []HostPriority{{"machine1", 5}, {"machine2", 0}},
test: "three label matches",
},
}

View File

@ -47,3 +47,8 @@ func (h HostPriorityList) Swap(i, j int) {
}
type PriorityFunction func(pod api.Pod, podLister PodLister, minionLister MinionLister) (HostPriorityList, error)
type PriorityConfig struct {
Function PriorityFunction
Weight int
}

View File

@ -1,44 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/golang/glog"
)
func CreateOnMinion1(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.V(2).Infof("custom predicate minion1 --> node: %s", node)
if node == "10.245.2.2" {
glog.V(2).Infof("custom predicate minion1 matched")
return true, nil
} else {
glog.V(2).Infof("custom predicate minion1 did not match")
return false, nil
}
}
func CreateOnMinion2(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
glog.V(2).Infof("custom predicate minion2 --> node: %s", node)
if node == "10.245.2.3" {
glog.V(2).Infof("custom predicate minion2 matched")
return true, nil
} else {
glog.V(2).Infof("custom predicate minion2 did not match")
return false, nil
}
}

View File

@ -49,9 +49,9 @@ type configFactory struct {
// map of strings to predicate functions to be used
// to filter the minions for scheduling pods
PredicateMap map[string]algorithm.FitPredicate
// map of strings to priority functions to be used
// map of strings to priority config to be used
// to prioritize the filtered minions for scheduling pods
PriorityMap map[string]algorithm.PriorityFunction
PriorityMap map[string]algorithm.PriorityConfig
}
// NewConfigFactory initializes the factory.
@ -62,13 +62,10 @@ func NewConfigFactory(client *client.Client) *configFactory {
PodLister: &storeToPodLister{cache.NewStore()},
MinionLister: &storeToMinionLister{cache.NewStore()},
PredicateMap: make(map[string]algorithm.FitPredicate),
PriorityMap: make(map[string]algorithm.PriorityFunction),
PriorityMap: make(map[string]algorithm.PriorityConfig),
}
// add default predicates
factory.addDefaultPredicates()
// add default predicates
factory.addDefaultPriorities()
return factory
@ -89,7 +86,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch
glog.V(2).Infof("Custom priority list not provided, using default priorities")
priorityKeys = []string{"LeastRequestedPriority"}
}
priorityFuncs, err := factory.getPriorityFunctions(priorityKeys)
priorityConfigs, err := factory.getPriorityConfigs(priorityKeys)
if err != nil {
return nil, err
}
@ -112,7 +109,7 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityFuncs, factory.PodLister, r)
algo := algorithm.NewGenericScheduler(predicateFuncs, priorityConfigs, factory.PodLister, r)
podBackoff := podBackoff{
perPodBackoff: map[string]*backoffEntry{},
@ -133,11 +130,10 @@ func (factory *configFactory) Create(predicateKeys, priorityKeys []string) (*sch
}
func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.FitPredicate, error) {
var function algorithm.FitPredicate
predicates := []algorithm.FitPredicate{}
for _, key := range keys {
function = factory.PredicateMap[key]
if function == nil {
function, ok := factory.PredicateMap[key]
if !ok {
return nil, fmt.Errorf("Invalid predicate key %s specified - no corresponding function found", key)
}
predicates = append(predicates, function)
@ -145,17 +141,16 @@ func (factory *configFactory) getPredicateFunctions(keys []string) ([]algorithm.
return predicates, nil
}
func (factory *configFactory) getPriorityFunctions(keys []string) ([]algorithm.PriorityFunction, error) {
var function algorithm.PriorityFunction
priorities := []algorithm.PriorityFunction{}
func (factory *configFactory) getPriorityConfigs(keys []string) ([]algorithm.PriorityConfig, error) {
configs := []algorithm.PriorityConfig{}
for _, key := range keys {
function = factory.PriorityMap[key]
if function == nil {
config, ok := factory.PriorityMap[key]
if !ok {
return nil, fmt.Errorf("Invalid priority key %s specified - no corresponding function found", key)
}
priorities = append(priorities, function)
configs = append(configs, config)
}
return priorities, nil
return configs, nil
}
func (factory *configFactory) addDefaultPredicates() {
@ -170,13 +165,22 @@ func (factory *configFactory) AddPredicate(key string, function algorithm.FitPre
}
func (factory *configFactory) addDefaultPriorities() {
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority)
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority)
factory.AddPriority("EqualPriority", algorithm.EqualPriority)
factory.AddPriority("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1)
factory.AddPriority("SpreadingPriority", algorithm.CalculateSpreadPriority, 1)
factory.AddPriority("EqualPriority", algorithm.EqualPriority, 0)
}
func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction) {
factory.PriorityMap[key] = function
func (factory *configFactory) AddPriority(key string, function algorithm.PriorityFunction, weight int) {
factory.PriorityMap[key] = algorithm.PriorityConfig{Function: function, Weight: weight}
}
func (factory *configFactory) SetWeight(key string, weight int) {
config, ok := factory.PriorityMap[key]
if !ok {
glog.Errorf("Invalid priority key %s specified - no corresponding function found", key)
return
}
config.Weight = weight
}
type listWatch struct {
@ -209,7 +213,7 @@ func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
func (factory *configFactory) createUnassignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: labels.Set{"Status.Host": ""}.AsSelector(),
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
resource: "pods",
}
}
@ -227,7 +231,7 @@ func parseSelectorOrDie(s string) labels.Selector {
func (factory *configFactory) createAssignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie("Status.Host!="),
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
resource: "pods",
}
}

View File

@ -59,12 +59,12 @@ func TestCreateLists(t *testing.T) {
},
// Assigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=Status.Host!%3D",
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D",
factory: factory.createAssignedPodLW,
},
// Unassigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=Status.Host%3D",
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D",
factory: factory.createUnassignedPodLW,
},
}
@ -108,21 +108,21 @@ func TestCreateWatches(t *testing.T) {
// Assigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=",
factory: factory.createAssignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host!%3D&resourceVersion=42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=",
factory: factory.createUnassignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=Status.Host%3D&resourceVersion=42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW,
},
}