Adding service affinity predicate

pull/6/head
Abhishek Gupta 2014-12-22 15:55:31 -08:00
parent 9dd7d2a0a1
commit 3f722a3d8e
4 changed files with 251 additions and 4 deletions

View File

@ -183,7 +183,7 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) FitPre
}
// CheckNodeLabelPresence checks whether a particular label exists on a minion or not, regardless of its value
// Consider the cases where the minions are places in regions/zones/racks and these are identified by labels
// Consider the cases where the minions are placed in regions/zones/racks and these are identified by labels
// In some cases, it is required that only minions that are part of ANY of the defined regions/zones/racks be selected
//
// Alternately, eliminating minions that have a certain label, regardless of value, is also useful
@ -204,6 +204,82 @@ func (n *NodeLabelChecker) CheckNodeLabelPresence(pod api.Pod, existingPods []ap
return true, nil
}
type ServiceAffinity struct {
podLister PodLister
serviceLister ServiceLister
nodeInfo NodeInfo
labels []string
}
func NewServiceAffinityPredicate(podLister PodLister, serviceLister ServiceLister, nodeInfo NodeInfo, labels []string) FitPredicate {
affinity := &ServiceAffinity{
podLister: podLister,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
labels: labels,
}
return affinity.CheckServiceAffinity
}
func (s *ServiceAffinity) CheckServiceAffinity(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
var affinitySelector labels.Selector
// check if the pod being scheduled has the affinity labels specified
affinityLabels := map[string]string{}
labelsExist := true
for _, l := range s.labels {
if labels.Set(pod.Labels).Has(l) {
affinityLabels[l] = labels.Set(pod.Labels).Get(l)
} else {
// the current pod does not specify all the labels, look in the existing service pods
labelsExist = false
}
}
// skip looking at other pods in the service if the current pod defines all the required affinity labels
if !labelsExist {
service, err := s.serviceLister.GetPodService(pod)
if err == nil {
selector := labels.SelectorFromSet(service.Spec.Selector)
servicePods, err := s.podLister.ListPods(selector)
if err != nil {
return false, err
}
if len(servicePods) > 0 {
// consider any service pod and fetch the minion its hosted on
otherMinion, err := s.nodeInfo.GetNodeInfo(servicePods[0].Status.Host)
if err != nil {
return false, err
}
for _, l := range s.labels {
// If the pod being scheduled has the label value specified, do not override it
if _, exists := affinityLabels[l]; exists {
continue
}
if labels.Set(otherMinion.Labels).Has(l) {
affinityLabels[l] = labels.Set(otherMinion.Labels).Get(l)
}
}
}
}
}
// if there are no existing pods in the service, consider all minions
if len(affinityLabels) == 0 {
affinitySelector = labels.Everything()
} else {
affinitySelector = labels.Set(affinityLabels).AsSelector()
}
minion, err := s.nodeInfo.GetNodeInfo(node)
if err != nil {
return false, err
}
// check if the minion matches the selector
return affinitySelector.Matches(labels.Set(minion.Labels)), nil
}
func PodFitsPorts(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
existingPorts := getUsedPorts(existingPods...)
wantPorts := getUsedPorts(pod)

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler
import (
"fmt"
"reflect"
"testing"
@ -31,7 +32,22 @@ func (n FakeNodeInfo) GetNodeInfo(nodeName string) (*api.Node, error) {
return &node, nil
}
<<<<<<< HEAD
func makeResources(milliCPU int64, memory int64) api.NodeResources {
=======
type FakeNodeListInfo []api.Node
func (nodes FakeNodeListInfo) GetNodeInfo(nodeName string) (*api.Node, error) {
for _, node := range nodes {
if node.Name == nodeName {
return &node, nil
}
}
return nil, fmt.Errorf("Unable to find node: %s", nodeName)
}
func makeResources(milliCPU int, memory int) api.NodeResources {
>>>>>>> e0101c2... Adding service affinity predicate
return api.NodeResources{
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
@ -446,3 +462,115 @@ func TestNodeLabelPresence(t *testing.T) {
}
}
}
func TestServiceAffinity(t *testing.T) {
selector := map[string]string{"foo": "bar"}
labels1 := map[string]string{
"region": "r1",
"zone": "z11",
}
labels2 := map[string]string{
"region": "r1",
"zone": "z12",
}
labels3 := map[string]string{
"region": "r2",
"zone": "z21",
}
labels4 := map[string]string{
"region": "r2",
"zone": "z22",
}
node1 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1", Labels: labels1}}
node2 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine2", Labels: labels2}}
node3 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine3", Labels: labels3}}
node4 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine4", Labels: labels4}}
node5 := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine5", Labels: labels4}}
tests := []struct {
pod api.Pod
pods []api.Pod
services []api.Service
node string
labels []string
fits bool
test string
}{
{
node: "machine1",
fits: true,
labels: []string{"region"},
test: "nothing scheduled",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r1"}}},
node: "machine1",
fits: true,
labels: []string{"region"},
test: "pod with region label match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"region": "r2"}}},
node: "machine1",
fits: false,
labels: []string{"region"},
test: "pod with region label mismatch",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on same minion",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region"},
test: "service pod on different minion, region match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region"},
test: "service pod on different minion, region mismatch",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine1",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: false,
labels: []string{"region", "zone"},
test: "service pod on different minion, multiple labels, not all match",
},
{
pod: api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
pods: []api.Pod{{Status: api.PodStatus{Host: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
node: "machine4",
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
fits: true,
labels: []string{"region", "zone"},
test: "service pod on different minion, multiple labels, all match",
},
}
for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{FakePodLister(test.pods), FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
fits, err := serviceAffinity.CheckServiceAffinity(test.pod, []api.Pod{}, test.node)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
}
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This algorithm provider has predicates and priorities related to affinity/anti-affinity for the scheduler.
package affinity
import (
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
)
const Provider string = "AffinityProvider"
func init() {
factory.RegisterAlgorithmProvider(Provider, defaultPredicates(), defaultPriorities())
}
func defaultPredicates() util.StringSet {
return util.NewStringSet(
// Fit is defined based on whether the minion has the specified label values as the pod being scheduled
// Alternately, if the pod does not specify any/all labels, the other pods in the service are looked at
factory.RegisterFitPredicate("ServiceAffinity", algorithm.NewServiceAffinityPredicate(factory.PodLister, factory.ServiceLister, factory.MinionLister, []string{"region"})),
)
}
func defaultPriorities() util.StringSet {
return util.NewStringSet(
// spreads pods belonging to the same service across minions in different zones
// region and zone can be nested infrastructure topology levels and defined by labels on minions
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1),
)
}

View File

@ -48,9 +48,6 @@ func defaultPriorities() util.StringSet {
factory.RegisterPriorityFunction("LeastRequestedPriority", algorithm.LeastRequestedPriority, 1),
// spreads pods by minimizing the number of pods (belonging to the same service) on the same minion.
factory.RegisterPriorityFunction("ServiceSpreadingPriority", algorithm.NewServiceSpreadPriority(factory.ServiceLister), 1),
// spreads pods belonging to the same service across minions in different zones
// TODO: remove the hardcoding of the "zone" label and move it to a constant
factory.RegisterPriorityFunction("ZoneSpreadingPriority", algorithm.NewServiceAntiAffinityPriority(factory.ServiceLister, "zone"), 1),
// EqualPriority is a prioritizer function that gives an equal weight of one to all minions
factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0),
)