Merge pull request #62211 from bsalamat/affinity_performance

Automatic merge from submit-queue (batch tested with PRs 62467, 62482, 62211). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Improve performance of affinity/anti-affinity predicate by 20x in large clusters

**What this PR does / why we need it**:
Improves performance of affinity/anti-affinity predicate by over 20x in large clusters. Performance improvement is smaller in small clusters, but it is still very significant and is about 4x. Also, before this PR, performance of the predicate was dropping quadratically with increasing size of nodes and pods. As the results shows, the slow down is now linear in larger clusters.

Affinity/anti-affinity predicate was checking all pods of the cluster for each node in the cluster to determine feasibility of affinit/anti-affinity terms of the pod being scheduled. This optimization first finds all the pods in a cluster that match the affinity/anti-affinity terms of the pod being scheduled once and stores the metadata. It then only checks the topology of the matching pods for each node in the cluster. 
 This results in major reduction of the search space per node and improves performance significantly. 

Below results are obtained by running scheduler benchmarks:
```
make test-integration WHAT=./test/integration/scheduler_perf KUBE_TEST_ARGS="-run=xxx -bench=.*BenchmarkSchedulingAntiAffinity"
```
```
AntiAffinity Topology: Hostname
before: BenchmarkSchedulingAntiAffinity/500Nodes/250Pods-12         	     	  37031638 ns/op
after:  BenchmarkSchedulingAntiAffinity/500Nodes/250Pods-12         	     	  10373222 ns/op

before: BenchmarkSchedulingAntiAffinity/500Nodes/5000Pods-12        	     	 134205302 ns/op
after:  BenchmarkSchedulingAntiAffinity/500Nodes/5000Pods-12        	     	  12000580 ns/op

befor: BenchmarkSchedulingAntiAffinity/1000Nodes/10000Pods-12         	     	 498439953 ns/op
after: BenchmarkSchedulingAntiAffinity/1000Nodes/10000Pods-12         	     	  24692552 ns/op


AntiAffinity Topology: Region
before: BenchmarkSchedulingAntiAffinity/500Nodes/250Pods-12         	     	  60003672 ns/op
after:  BenchmarkSchedulingAntiAffinity/500Nodes/250Pods-12         	     	  13346400 ns/op

before: BenchmarkSchedulingAntiAffinity/1000Nodes/10000Pods-12         	     	 600085491 ns/op
after: BenchmarkSchedulingAntiAffinity/1000Nodes/10000Pods-12         	     	  27783333 ns/op
```

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

ref/ #56032 #47318 #25319

**Release note**:

```release-note
improve performance of affinity/anti-affinity predicate of default scheduler significantly.
```

/sig scheduling
pull/8/head
Kubernetes Submit Queue 2018-04-13 07:25:21 -07:00 committed by GitHub
commit 3cdf5eecd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 475 additions and 58 deletions

View File

@ -20,14 +20,17 @@ import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"github.com/golang/glog"
)
// PredicateMetadataFactory defines a factory of predicate metadata.
@ -50,7 +53,13 @@ type predicateMetadata struct {
podRequest *schedulercache.Resource
podPorts []*v1.ContainerPort
//key is a pod full name with the anti-affinity rules.
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
// A map of node name to a list of Pods on the node that can potentially match
// the affinity rules of the "pod".
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
// A map of node name to a list of Pods on the node that can potentially match
// the anti-affinity rules of the "pod".
nodeNameToMatchingAntiAffinityPods map[string][]*v1.Pod
serviceAffinityInUse bool
serviceAffinityMatchingPodList []*v1.Pod
serviceAffinityMatchingPodServices []*v1.Service
@ -108,12 +117,19 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
if err != nil {
return nil
}
affinityPods, antiAffinityPods, err := getPodsMatchingAffinity(pod, nodeNameToInfoMap)
if err != nil {
glog.Errorf("[predicate meta data generation] error finding pods that match affinity terms")
return nil
}
predicateMetadata := &predicateMetadata{
pod: pod,
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: schedutil.GetContainerPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
pod: pod,
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: schedutil.GetContainerPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
nodeNameToMatchingAffinityPods: affinityPods,
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
}
for predicateName, precomputeFunc := range predicateMetadataProducers {
glog.V(10).Infof("Precompute: %v", predicateName)
@ -131,6 +147,33 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
}
// Delete any anti-affinity rule from the deletedPod.
delete(meta.matchingAntiAffinityTerms, deletedPodFullName)
// Delete pod from the matching affinity or anti-affinity pods if exists.
affinity := meta.pod.Spec.Affinity
podNodeName := deletedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
if affinity.PodAffinity != nil {
for i, p := range meta.nodeNameToMatchingAffinityPods[podNodeName] {
if p == deletedPod {
s := meta.nodeNameToMatchingAffinityPods[podNodeName]
s[i] = s[len(s)-1]
s = s[:len(s)-1]
meta.nodeNameToMatchingAffinityPods[podNodeName] = s
break
}
}
}
if affinity.PodAntiAffinity != nil {
for i, p := range meta.nodeNameToMatchingAntiAffinityPods[podNodeName] {
if p == deletedPod {
s := meta.nodeNameToMatchingAntiAffinityPods[podNodeName]
s[i] = s[len(s)-1]
s = s[:len(s)-1]
meta.nodeNameToMatchingAntiAffinityPods[podNodeName] = s
break
}
}
}
}
// All pods in the serviceAffinityMatchingPodList are in the same namespace.
// So, if the namespace of the first one is not the same as the namespace of the
// deletedPod, we don't need to check the list, as deletedPod isn't in the list.
@ -173,6 +216,35 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms
}
}
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
affinity := meta.pod.Spec.Affinity
podNodeName := addedPod.Spec.NodeName
if affinity != nil && len(podNodeName) > 0 {
if targetPodMatchesAffinityOfPod(meta.pod, addedPod) {
found := false
for _, p := range meta.nodeNameToMatchingAffinityPods[podNodeName] {
if p == addedPod {
found = true
break
}
}
if !found {
meta.nodeNameToMatchingAffinityPods[podNodeName] = append(meta.nodeNameToMatchingAffinityPods[podNodeName], addedPod)
}
}
if targetPodMatchesAntiAffinityOfPod(meta.pod, addedPod) {
found := false
for _, p := range meta.nodeNameToMatchingAntiAffinityPods[podNodeName] {
if p == addedPod {
found = true
break
}
}
if !found {
meta.nodeNameToMatchingAntiAffinityPods[podNodeName] = append(meta.nodeNameToMatchingAntiAffinityPods[podNodeName], addedPod)
}
}
}
// If addedPod is in the same namespace as the meta.pod, update the list
// of matching pods if applicable.
if meta.serviceAffinityInUse && addedPod.Namespace == meta.pod.Namespace {
@ -200,9 +272,162 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
for k, v := range meta.matchingAntiAffinityTerms {
newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...)
}
newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod)
for k, v := range meta.nodeNameToMatchingAffinityPods {
newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...)
}
newPredMeta.nodeNameToMatchingAntiAffinityPods = make(map[string][]*v1.Pod)
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
}
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...)
return (algorithm.PredicateMetadata)(newPredMeta)
}
type affinityTermProperties struct {
namespaces sets.String
selector labels.Selector
}
// getAffinityTermProperties receives a Pod and affinity terms and returns the namespaces and
// selectors of the terms.
func getAffinityTermProperties(pod *v1.Pod, terms []v1.PodAffinityTerm) (properties []*affinityTermProperties, err error) {
if terms == nil {
return properties, nil
}
for _, term := range terms {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
}
properties = append(properties, &affinityTermProperties{namespaces: namespaces, selector: selector})
}
return properties, nil
}
// podMatchesAffinityTermProperties return true IFF the given pod matches all the given properties.
func podMatchesAffinityTermProperties(pod *v1.Pod, properties []*affinityTermProperties) bool {
if len(properties) == 0 {
return false
}
for _, property := range properties {
if !priorityutil.PodMatchesTermsNamespaceAndSelector(pod, property.namespaces, property.selector) {
return false
}
}
return true
}
// getPodsMatchingAffinity finds existing Pods that match affinity terms of the given "pod".
// It ignores topology. It returns a set of Pods that are checked later by the affinity
// predicate. With this set of pods available, the affinity predicate does not
// need to check all the pods in the cluster.
func getPodsMatchingAffinity(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (affinityPods map[string][]*v1.Pod, antiAffinityPods map[string][]*v1.Pod, err error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
affinity := pod.Spec.Affinity
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return nil, nil, nil
}
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
}
var lock sync.Mutex
var firstError error
affinityPods = make(map[string][]*v1.Pod)
antiAffinityPods = make(map[string][]*v1.Pod)
appendResult := func(nodeName string, affPods, antiAffPods []*v1.Pod) {
lock.Lock()
defer lock.Unlock()
if len(affPods) > 0 {
affinityPods[nodeName] = affPods
}
if len(antiAffPods) > 0 {
antiAffinityPods[nodeName] = antiAffPods
}
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
if firstError == nil {
firstError = err
}
}
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
return nil, nil, err
}
antiAffinityProperties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
return nil, nil, err
}
processNode := func(i int) {
nodeInfo := nodeInfoMap[allNodeNames[i]]
node := nodeInfo.Node()
if node == nil {
catchError(fmt.Errorf("nodeInfo.Node is nil"))
return
}
affPods := make([]*v1.Pod, 0, len(nodeInfo.Pods()))
antiAffPods := make([]*v1.Pod, 0, len(nodeInfo.Pods()))
for _, existingPod := range nodeInfo.Pods() {
// Check affinity properties.
if podMatchesAffinityTermProperties(existingPod, affinityProperties) {
affPods = append(affPods, existingPod)
}
// Check anti-affinity properties.
if podMatchesAffinityTermProperties(existingPod, antiAffinityProperties) {
antiAffPods = append(antiAffPods, existingPod)
}
}
if len(antiAffPods) > 0 || len(affPods) > 0 {
appendResult(node.Name, affPods, antiAffPods)
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
return affinityPods, antiAffinityPods, firstError
}
// podMatchesAffinity returns true if "targetPod" matches any affinity rule of
// "pod". Similar to getPodsMatchingAffinity, this function does not check topology.
// So, whether the targetPod actually matches or not needs further checks for a specific
// node.
func targetPodMatchesAffinityOfPod(pod, targetPod *v1.Pod) bool {
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAffinity == nil {
return false
}
affinityProperties, err := getAffinityTermProperties(pod, GetPodAffinityTerms(affinity.PodAffinity))
if err != nil {
glog.Errorf("error in getting affinity properties of Pod %v", pod.Name)
return false
}
return podMatchesAffinityTermProperties(targetPod, affinityProperties)
}
// targetPodMatchesAntiAffinityOfPod returns true if "targetPod" matches any anti-affinity
// rule of "pod". Similar to getPodsMatchingAffinity, this function does not check topology.
// So, whether the targetPod actually matches or not needs further checks for a specific
// node.
func targetPodMatchesAntiAffinityOfPod(pod, targetPod *v1.Pod) bool {
affinity := pod.Spec.Affinity
if affinity == nil || affinity.PodAntiAffinity == nil {
return false
}
properties, err := getAffinityTermProperties(pod, GetPodAntiAffinityTerms(affinity.PodAntiAffinity))
if err != nil {
glog.Errorf("error in getting anti-affinity properties of Pod %v", pod.Name)
return false
}
return podMatchesAffinityTermProperties(targetPod, properties)
}

View File

@ -88,6 +88,13 @@ func (s sortableServices) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
var _ = sort.Interface(&sortableServices{})
func sortNodePodMap(np map[string][]*v1.Pod) {
for _, pl := range np {
sortablePods := sortablePods(pl)
sort.Sort(sortablePods)
}
}
// predicateMetadataEquivalent returns true if the two metadata are equivalent.
// Note: this function does not compare podRequest.
func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
@ -111,6 +118,16 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) {
return fmt.Errorf("matchingAntiAffinityTerms are not euqal")
}
sortNodePodMap(meta1.nodeNameToMatchingAffinityPods)
sortNodePodMap(meta2.nodeNameToMatchingAffinityPods)
if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) {
return fmt.Errorf("nodeNameToMatchingAffinityPods are not euqal")
}
sortNodePodMap(meta1.nodeNameToMatchingAntiAffinityPods)
sortNodePodMap(meta2.nodeNameToMatchingAntiAffinityPods)
if !reflect.DeepEqual(meta1.nodeNameToMatchingAntiAffinityPods, meta2.nodeNameToMatchingAntiAffinityPods) {
return fmt.Errorf("nodeNameToMatchingAntiAffinityPods are not euqal")
}
if meta1.serviceAffinityInUse {
sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList)
sort.Sort(sortablePods1)
@ -189,6 +206,34 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
},
},
}
affinityComplex := &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"bar", "buzz"},
},
},
},
TopologyKey: "region",
},
{
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "service",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"bar", "security", "test"},
},
},
},
TopologyKey: "zone",
},
},
}
tests := []struct {
description string
@ -312,6 +357,41 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
{
description: "metadata matching pod affinity and anti-affinity are updated correctly after adding and removing a pod",
pendingPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "pending", Labels: selector1},
},
existingPods: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityFooBar,
PodAffinity: affinityComplex,
},
},
},
},
addedPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "addedPod", Labels: selector1},
Spec: v1.PodSpec{
NodeName: "nodeA",
Affinity: &v1.Affinity{
PodAntiAffinity: antiAffinityComplex,
},
},
},
services: []*v1.Service{{Spec: v1.ServiceSpec{Selector: selector1}}},
nodes: []*v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "nodeA", Labels: label1}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeB", Labels: label2}},
{ObjectMeta: metav1.ObjectMeta{Name: "nodeC", Labels: label3}},
},
},
}
for _, test := range tests {
@ -360,6 +440,7 @@ func TestPredicateMetadata_AddRemovePod(t *testing.T) {
// on the idea that shallow-copy should produce an object that is deep-equal to the original
// object.
func TestPredicateMetadata_ShallowCopy(t *testing.T) {
selector1 := map[string]string{"foo": "bar"}
source := predicateMetadata{
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -392,6 +473,45 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
},
},
},
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
"nodeA": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeA"},
},
},
"nodeC": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeC",
},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeC"},
},
},
},
nodeNameToMatchingAntiAffinityPods: map[string][]*v1.Pod{
"nodeN": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeN"},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2"},
Spec: v1.PodSpec{
NodeName: "nodeM",
},
},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p3"},
Spec: v1.PodSpec{
NodeName: "nodeM",
},
},
},
"nodeM": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p6", Labels: selector1},
Spec: v1.PodSpec{NodeName: "nodeM"},
},
},
},
serviceAffinityInUse: true,
serviceAffinityMatchingPodList: []*v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}},

View File

@ -1150,7 +1150,7 @@ func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta algorithm
if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
return true, nil, nil
}
if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, nodeInfo, affinity); failedPredicates != nil {
if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {
failedPredicates := append([]algorithm.PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
return false, failedPredicates, error
}
@ -1380,60 +1380,129 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
return nil, nil
}
// anyMatchingPodInTopology checks that any of the given Pods are in the
// topology specified by the affinity term.
func (c *PodAffinityChecker) anyMatchingPodInTopology(pod *v1.Pod, matchingPods map[string][]*v1.Pod, nodeInfo *schedulercache.NodeInfo, term *v1.PodAffinityTerm) (bool, error) {
if len(term.TopologyKey) == 0 {
return false, fmt.Errorf("empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
}
if len(matchingPods) == 0 {
return false, nil
}
// Special case: When the topological domain is node, we can limit our
// search to pods on that node without searching the entire cluster.
if term.TopologyKey == kubeletapis.LabelHostname {
if pods, ok := matchingPods[nodeInfo.Node().Name]; ok {
// It may seem odd that we are comparing a node with itself to see if it
// has the same topology key, but it is necessary to check extra conditions
// that the function performs, such as checking that node labels are not nil.
return len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), nodeInfo.Node(), term.TopologyKey), nil
}
return false, nil
}
// Topology key is not "Hostname". Checking all matching pods.
for nodeName, pods := range matchingPods {
matchingPodNodeInfo, err := c.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
if len(pods) > 0 && priorityutil.NodesHaveSameTopologyKey(nodeInfo.Node(), matchingPodNodeInfo, term.TopologyKey) {
return true, nil
}
}
return false, nil
}
// Checks if scheduling the pod onto this node would break any rules of this pod.
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo, affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) {
func (c *PodAffinityChecker) satisfiesPodsAffinityAntiAffinity(pod *v1.Pod,
meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo,
affinity *v1.Affinity) (algorithm.PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return ErrPodAffinityRulesNotMatch, fmt.Errorf("Node is nil")
}
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return ErrPodAffinityRulesNotMatch, err
}
// Check all affinity terms.
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if !termMatches {
// If the requirement matches a pod's own labels are namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
if matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if predicateMeta, ok := meta.(*predicateMetadata); ok {
// Check all affinity terms.
matchingPods := predicateMeta.nodeNameToMatchingAffinityPods
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err)
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Errorf(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
if !termMatches {
// This pod may the first pod in a series that have affinity to themselves. In order
// to not leave such pods in pending state forever, we check that if no other pod
// in the cluster matches the namespace and selector of this pod and the pod matches
// its own terms, then we allow the pod to pass the affinity check.
if !(len(matchingPods) == 0 && targetPodMatchesAffinityOfPod(pod, pod)) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
matchingPods = predicateMeta.nodeNameToMatchingAntiAffinityPods
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, err := c.anyMatchingPodInTopology(pod, matchingPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
} else { // We don't have precomputed metadata. We have to follow a slow path to check affinity rules.
filteredPods, err := c.podLister.FilteredList(nodeInfo.Filter, labels.Everything())
if err != nil {
return ErrPodAffinityRulesNotMatch, err
}
// Check all affinity terms.
for _, term := range GetPodAffinityTerms(affinity.PodAffinity) {
termMatches, matchingPodExists, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil {
errMessage := fmt.Sprintf("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v, err: %v", podName(pod), node.Name, term, err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if !match {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
if !termMatches {
// If the requirement matches a pod's own labels are namespace, and there are
// no other such pods, then disregard the requirement. This is necessary to
// not block forever because the first pod of the collection can't be scheduled.
if matchingPodExists {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(pod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
errMessage := fmt.Sprintf("Cannot parse selector on term %v for pod %v. Details %v", term, podName(pod), err)
glog.Error(errMessage)
return ErrPodAffinityRulesNotMatch, errors.New(errMessage)
}
match := priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector)
if !match {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAffinityTerm %v",
podName(pod), node.Name, term)
return ErrPodAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
}
// Check all anti-affinity terms.
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
termMatches, _, err := c.anyPodMatchesPodAffinityTerm(pod, filteredPods, nodeInfo, &term)
if err != nil || termMatches {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v, because of PodAntiAffinityTerm %v, err: %v",
podName(pod), node.Name, term, err)
return ErrPodAntiAffinityRulesNotMatch, nil
}
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.

View File

@ -2793,7 +2793,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
},
},
pods: []*v1.Pod{
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Labels: podLabelA}},
{Spec: v1.PodSpec{NodeName: "machine1"}, ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: podLabelA}},
},
nodes: []v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "machine1", Labels: labelRgChina}},
@ -3132,7 +3132,8 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
for indexTest, test := range tests {
nodeListInfo := FakeNodeListInfo(test.nodes)
for indexNode, node := range test.nodes {
nodeInfoMap := make(map[string]*schedulercache.NodeInfo)
for i, node := range test.nodes {
var podsOnNode []*v1.Pod
for _, pod := range test.pods {
if pod.Spec.NodeName == node.Name {
@ -3140,21 +3141,23 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) {
}
}
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&test.nodes[i])
nodeInfoMap[node.Name] = nodeInfo
}
for indexNode, node := range test.nodes {
testFit := PodAffinityChecker{
info: nodeListInfo,
podLister: schedulertesting.FakePodLister(test.pods),
}
nodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
nodeInfo.SetNode(&node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{node.Name: nodeInfo}
var meta algorithm.PredicateMetadata
if !test.nometa {
meta = PredicateMetadata(test.pod, nodeInfoMap)
}
fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfo)
fits, reasons, _ := testFit.InterPodAffinityMatches(test.pod, meta, nodeInfoMap[node.Name])
if !fits && !reflect.DeepEqual(reasons, test.nodesExpectAffinityFailureReasons[indexNode]) {
t.Errorf("index: %d test: %s unexpected failure reasons: %v expect: %v", indexTest, test.test, reasons, test.nodesExpectAffinityFailureReasons[indexNode])
}