Improve performance of affinity/anti-affinity predicate

pull/8/head
Bobby (Babak) Salamat 2018-04-04 17:49:59 -07:00
parent 9c40f5b5a6
commit 418c7502f0
4 changed files with 464 additions and 53 deletions

View File

@ -20,14 +20,17 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/schedulercache" "k8s.io/kubernetes/pkg/scheduler/schedulercache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"github.com/golang/glog"
) )
// PredicateMetadataFactory defines a factory of predicate metadata. // PredicateMetadataFactory defines a factory of predicate metadata.
@ -50,7 +53,13 @@ type predicateMetadata struct {
podRequest *schedulercache.Resource podRequest *schedulercache.Resource
podPorts []*v1.ContainerPort podPorts []*v1.ContainerPort
//key is a pod full name with the anti-affinity rules. //key is a pod full name with the anti-affinity rules.
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
// A map of node name to a list of Pods on the node that can potentially match
// the affinity rules of the "pod".
matchingAffinityPods map[string][]*v1.Pod
// A map of node name to a list of Pods on the node that can potentially match
// the anti-affinity rules of the "pod".
matchingAntiAffinityPods map[string][]*v1.Pod
serviceAffinityInUse bool serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service serviceAffinityMatchingPodServices []*v1.Service
@ -108,12 +117,19 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
if err != nil { if err != nil {
return nil return nil
} }
affinityPods, antiAffinityPods, err := getPodsMatchingAffinity(pod, nodeNameToInfoMap)
if err != nil {
glog.Errorf("[predicate meta data generation] error finding pods that match affinity terms")
return nil
}
predicateMetadata := &predicateMetadata{ predicateMetadata := &predicateMetadata{
pod: pod, pod: pod,
podBestEffort: isPodBestEffort(pod), podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod), podRequest: GetResourceRequest(pod),
podPorts: schedutil.GetContainerPorts(pod), podPorts: schedutil.GetContainerPorts(pod),
matchingAntiAffinityTerms: matchingTerms, matchingAntiAffinityTerms: matchingTerms,
matchingAffinityPods: affinityPods,
matchingAntiAffinityPods: antiAffinityPods,
} }
for predicateName, precomputeFunc := range predicateMetadataProducers { for predicateName, precomputeFunc := range predicateMetadataProducers {
glog.V(10).Infof("Precompute: %v", predicateName) glog.V(10).Infof("Precompute: %v", predicateName)
@ -131,6 +147,27 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
} }
// Delete any anti-affinity rule from the deletedPod. // Delete any anti-affinity rule from the deletedPod.
delete(meta.matchingAntiAffinityTerms, deletedPodFullName) delete(meta.matchingAntiAffinityTerms, deletedPodFullName)
// Delete pod from the matching affinity or anti-affinity pods if exists.
affinity := meta.pod.Spec.Affinity
podNodeName := deletedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
if affinity.PodAffinity != nil {
for i, p := range meta.matchingAffinityPods[podNodeName] {
if p == deletedPod {
meta.matchingAffinityPods[podNodeName] = append(meta.matchingAffinityPods[podNodeName][:i], meta.matchingAffinityPods[podNodeName][i+1:]...)
break
}
}
}
if affinity.PodAntiAffinity != nil {
for i, p := range meta.matchingAntiAffinityPods[podNodeName] {
if p == deletedPod {
meta.matchingAntiAffinityPods[podNodeName] = append(meta.matchingAntiAffinityPods[podNodeName][:i], meta.matchingAntiAffinityPods[podNodeName][i+1:]...)
break
}
}
}
}
// All pods in the serviceAffinityMatchingPodList are in the same namespace. // All pods in the serviceAffinityMatchingPodList are in the same namespace.
// So, if the namespace of the first one is not the same as the namespace of the // So, if the namespace of the first one is not the same as the namespace of the
// deletedPod, we don't need to check the list, as deletedPod isn't in the list. // deletedPod, we don't need to check the list, as deletedPod isn't in the list.
@ -173,6 +210,35 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms
} }
} }
// Add the pod to matchingAffinityPods and matchingAntiAffinityPods if needed.
affinity := meta.pod.Spec.Affinity
podNodeName := addedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
if targetPodMatchesAffinityOfPod(meta.pod, addedPod) {
found := false
for _, p := range meta.matchingAffinityPods[podNodeName] {
if p == addedPod {
found = true
break
}
}
if !found {
meta.matchingAffinityPods[podNodeName] = append(meta.matchingAffinityPods[podNodeName], addedPod)
}
}
if targetPodMatchesAntiAffinityOfPod(meta.pod, addedPod) {
found := false
for _, p := range meta.matchingAntiAffinityPods[podNodeName] {
if p == addedPod {
found = true
break
}
}
if !found {
meta.matchingAntiAffinityPods[podNodeName] = append(meta.matchingAntiAffinityPods[podNodeName], addedPod)
}
}
}
// If addedPod is in the same namespace as the meta.pod, update the list // If addedPod is in the same namespace as the meta.pod, update the list
// of matching pods if applicable. // of matching pods if applicable.
if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace { if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace {
@ -200,9 +266,162 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
for k, v := range meta.matchingAntiAffinityTerms { for k, v := range meta.matchingAntiAffinityTerms {
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
} }
newPredMeta.matchingAffinityPods = make(map[string][]*v1.Pod)
for k, v := range meta.matchingAffinityPods {
newPredMeta.matchingAffinityPods[k] = append([]*v1.Pod(nil), v...)
}
newPredMeta.matchingAntiAffinityPods = make(map[string][]*v1.Pod)
for k, v := range meta.matchingAntiAffinityPods {
newPredMeta.matchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
}
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...) meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...) meta.serviceAffinityMatchingPodList...)
return (algorithm.PredicateMetadata)(newPredMeta) return (algorithm.PredicateMetadata)(newPredMeta)
} }
type affinityTermProperties struct {
namespaces sets.String
selector labels.Selector
}
// getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and
// selectors of the terms.
func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (properties []*affinityTermProperties, err error) {
if terms == nil {
return properties, nil
}
for _, term := range terms {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector})
}
return properties, nil
}
// podMatchesAffinityTermProperties return true IFF the given pod matches all the given properties.
func podMatchesAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
if len(properties) == 0 {
return false
}
for _, property := range properties {
if !priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
return false
}
}
return true
}
// getPodsMatchingAffinity finds existing Pods that match affinity terms of the given "pod".
// It ignores topology. It returns a set of Pods that are checked later by the affinity
// predicate. With this set of pods available, the affinity predicate does not
// need to check all the pods in the cluster.
func getPodsMatchingAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (affinityPods map[string][]*v1.Pod, antiAffinityPods map[string][]*v1.Pod, err error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return nil, nil, nil
}
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
var lock sync.Mutex
var firstError error
affinityPods = make(map[string][]*v1.Pod)
antiAffinityPods = make(map[string][]*v1.Pod)
appendResult := func(nodeName string, affPods, antiAffPods []*v1.Pod) {
lock.Lock()
defer lock.Unlock()
if len(affPods) > 0 {
affinityPods[nodeName] = affPods
}
if len(antiAffPods) > 0 {
antiAffinityPods[nodeName] = antiAffPods
}
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
return nil, nil, err
}
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
return nil, nil, err
}
processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]]
node := nodeInfo.Node()
if node == nil {
catchError(fmt.Errorf("nodeInfo.Node is nil"))
return
}
affPods := make([]*v1.Pod, 0, len(nodeInfo.Pods()))
antiAffPods := make([]*v1.Pod, 0, len(nodeInfo.Pods()))
for _, existingPod := range nodeInfo.Pods() {
// Check affinity properties.
if podMatchesAffinityTermProperties(existingPod, affinityProperties) {
affPods = append(affPods, existingPod)
}
// Check anti-affinity properties.
if podMatchesAffinityTermProperties(existingPod, antiAffinityProperties) {
antiAffPods = append(antiAffPods, existingPod)
}
}
if len(antiAffPods) > 0 || len(affPods) > 0 {
appendResult(node.Name, affPods, antiAffPods)
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
return affinityPods, antiAffinityPods, firstError
}
// podMatchesAffinity returns true if "targetPod" matches any affinity rule of
// "pod". Similar to getPodsMatchingAffinity, this function does not check topology.
// So, whether the targetPod actually matches or not needs further checks for a specific
// node.
func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAffinity == nil {
return false
}
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
glog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
return false
}
return podMatchesAffinityTermProperties(targetPod, affinityProperties)
}
// targetPodMatchesAntiAffinityOfPod returns true if "targetPod" matches any anti-affinity
// rule of "pod". Similar to getPodsMatchingAffinity, this function does not check topology.
// So, whether the targetPod actually matches or not needs further checks for a specific
// node.
func targetPodMatchesAntiAffinityOfPod(pod, targetPod *v1.Pod) bool {
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAntiAffinity == nil {
return false
}
properties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
glog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
return false
}
return podMatchesAffinityTermProperties(targetPod, properties)
}

View File

@ -88,6 +88,13 @@ func (s sortableServices) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
var _ = sort.Interface(&sortableServices{}) var _ = sort.Interface(&sortableServices{})
func sortNodePodMap(np map[string][]*v1.Pod) {
for _, pl := range np {
sortablePods := sortablePods(pl)
sort.Sort(sortablePods)
}
}
// predicateMetadataEquivalent returns true if the two metadata are equivalent. // predicateMetadataEquivalent returns true if the two metadata are equivalent.
// Note: this function does not compare podRequest. // Note: this function does not compare podRequest.
func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
@ -111,6 +118,16 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) { if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) {
return fmt.Errorf("matchingAntiAffinityTerms are not euqal") return fmt.Errorf("matchingAntiAffinityTerms are not euqal")
} }
sortNodePodMap(meta1.matchingAffinityPods)
sortNodePodMap(meta2.matchingAffinityPods)
if !reflect.DeepEqual(meta1.matchingAffinityPods, meta2.matchingAffinityPods) {
return fmt.Errorf("matchingAffinityPods are not euqal")
}
sortNodePodMap(meta1.matchingAntiAffinityPods)
sortNodePodMap(meta2.matchingAntiAffinityPods)
if !reflect.DeepEqual(meta1.matchingAntiAffinityPods, meta2.matchingAntiAffinityPods) {
return fmt.Errorf("matchingAntiAffinityPods are not euqal")
}
if meta1.serviceAffinityInUse { if meta1.serviceAffinityInUse {
sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList) sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList)
sort.Sort(sortablePods1) sort.Sort(sortablePods1)
@ -189,6 +206,34 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
}, },
}, },
} }
affinityComplex := &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar", "buzz"},
},
},
},
TopologyKey: "region",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"bar", "security", "test"},
},
},
},
TopologyKey: "zone",
},
},
}
tests := []struct { tests := []struct {
description string description string
@ -312,6 +357,41 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}}, {ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
}, },
}, },
{
description: "metadata matching pod affinity and anti-affinity are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityFooBar,
PodAffinity: affinityComplex,
},
},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{
NodeName: "nodeA",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityComplex,
},
},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
} }
for _, test := range tests { for _, test := range tests {
@ -360,6 +440,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
// on the idea that shallow-copy should produce an object that is deep-equal to the original // on the idea that shallow-copy should produce an object that is deep-equal to the original
// object. // object.
func TestPredicateMetadata_ShallowCopy(t *testing.T) { func TestPredicateMetadata_ShallowCopy(t *testing.T) {
selector1 := map[string]string{"foo": "bar"}
source := predicateMetadata{ source := predicateMetadata{
pod: &v1.Pod{ pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -392,6 +473,45 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
}, },
}, },
}, },
matchingAffinityPods: map[string][]*v1.Pod{
"nodeA": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
},
"nodeC": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
},
matchingAntiAffinityPods: map[string][]*v1.Pod{
"nodeN": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeN"},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeM",
},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p3"},
Spec: v1.PodSpec{
NodeName: "nodeM",
},
},
},
"nodeM": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeM"},
},
},
},
serviceAffinityInUse: true, serviceAffinityInUse: true,
serviceAffinityMatchingPodList: []*v1.Pod{ serviceAffinityMatchingPodList: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}, {ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},

View File

@ -1150,7 +1150,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta algorithm
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) { if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil, nil return true, nil, nil
} }
if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, nodeInfo, affinity); failedPredicates != nil { if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {
failedPredicates := append([]algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates) failedPredicates := append([]algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
return false, failedPredicates, error return false, failedPredicates, error
} }
@ -1380,60 +1380,129 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
return nil, nil return nil, nil
} }
// anyMatchingPodInTopology checks that any of the given Pods are in the
// topology specified by the affinity term.
func (c *PodAffinityChecker) anyMatchingPodInTopology(pod *v1.Pod, matchingPods map[string][]*v1.Pod, nodeInfo *schedulercache.NodeInfo, term *v1.PodAffinityTerm) (bool, error) {
if len(term.TopologyKey) == 0 {
return false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if len(matchingPods) == 0 {
return false, nil
}
// Special case: When the topological domain is node, we can limit our
// search to pods on that node without searching the entire cluster.
if term.TopologyKey == kubeletapis.LabelHostname {
if pods, ok := matchingPods[nodeInfo.Node().Name]; ok {
// It may seem odd that we are comparing a node with itself to see if it
// has the same topology key, but it is necessary to check extra conditions
// that the function performs, such as checking that node labels are not nil.
return len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), nodeInfo.Node(), term.TopologyKey), nil
}
return false, nil
}
// Topology key is not "Hostname". Checking all matching pods.
for nodeName, pods := range matchingPods {
matchingPodNodeInfo, err := c.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
if len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), matchingPodNodeInfo, term.TopologyKey) {
return true, nil
}
}
return false, nil
}
// Checks if scheduling the pod onto this node would break any rules of this pod. // Checks if scheduling the pod onto this node would break any rules of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo, affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) { func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo,
affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node() node := nodeInfo.Node()
if node == nil { if node == nil {
return ErrPodAffinityRulesNotMatch, fmt.Errorf("Node is nil") return ErrPodAffinityRulesNotMatch, fmt.Errorf("Node is nil")
} }
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything()) if predicateMeta, ok := meta.(*predicateMetadata); ok {
if err != nil { // Check all affinity terms.
return ErrPodAffinityRulesNotMatch, err matchingPods := predicateMeta.matchingAffinityPods
} for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term)
// Check all affinity terms.
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if !termMatches {
// If the requirement matches a pod's own labels are namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
if matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil { if err != nil {
errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err) errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Errorf(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if !termMatches {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if !(len(matchingPods) == 0 && targetPodMatchesAffinityOfPod(pod, pod)) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
matchingPods = predicateMeta.matchingAntiAffinityPods
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
} else { // We don't have precomputed metadata. We have to follow a slow path to check affinity rules.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return ErrPodAffinityRulesNotMatch, err
}
// Check all affinity terms.
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Error(errMessage) glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage) return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
} }
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) if !termMatches {
if !match { // If the requirement matches a pod's own labels are namespace, and there are
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v", // no other such pods, then disregard the requirement. This is necessary to
podName(pod), node.Name, term) // not block forever because the first pod of the collection can't be scheduled.
return ErrPodAffinityRulesNotMatch, nil if matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if !match {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
} }
} }
} }
// Check all anti-affinity terms.
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
if glog.V(10) { if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it. // not logged. There is visible performance gain from it.

View File

@ -2793,7 +2793,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
}, },
}, },
pods: []*v1.Pod{ pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelA}}, {Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: podLabelA}},
}, },
nodes: []v1.Node{ nodes: []v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}}, {ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
@ -3132,7 +3132,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
for indexTest, test := range tests { for indexTest, test := range tests {
nodeListInfo := FakeNodeListInfo(test.nodes) nodeListInfo := FakeNodeListInfo(test.nodes)
for indexNode, node := range test.nodes { nodeInfoMap := make(map[string]*schedulercache.NodeInfo)
for i, node := range test.nodes {
var podsOnNode []*v1.Pod var podsOnNode []*v1.Pod
for _, pod := range test.pods { for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name { if pod.Spec.NodeName == node.Name {
@ -3140,21 +3141,23 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
} }
} }
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&test.nodes[i])
nodeInfoMap[node.Name] = nodeInfo
}
for indexNode, node := range test.nodes {
testFit := PodAffinityChecker{ testFit := PodAffinityChecker{
info: nodeListInfo, info: nodeListInfo,
podLister: schedulertesting.FakePodLister(test.pods), podLister: schedulertesting.FakePodLister(test.pods),
} }
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
var meta algorithm.PredicateMetadata var meta algorithm.PredicateMetadata
if !test.nometa { if !test.nometa {
meta = PredicateMetadata(test.pod, nodeInfoMap) meta = PredicateMetadata(test.pod, nodeInfoMap)
} }
fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfo) fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfoMap[node.Name])
if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) { if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) {
t.Errorf("index: %d test: %s unexpected failure reasons: %v expect: %v", indexTest, test.test, reasons, test.nodesExpectAffinityFailureReasons[indexNode]) t.Errorf("index: %d test: %s unexpected failure reasons: %v expect: %v", indexTest, test.test, reasons, test.nodesExpectAffinityFailureReasons[indexNode])
} }