diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 2b276ede61..163ad90ba2 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -38,6 +38,12 @@ type PredicateMetadataFactory struct { podLister algorithm.PodLister } +// AntiAffinityTerm's topology key value used in predicate metadata +type topologyPair struct { + key string + value string +} + // Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file // due to the way declarations are processed in predicate declaration unit tests. type matchingPodAntiAffinityTerm struct { @@ -45,6 +51,17 @@ type matchingPodAntiAffinityTerm struct { node *v1.Node } +type podSet map[*v1.Pod]struct{} + +type topologyPairSet map[topologyPair]struct{} + +// topologyPairsMaps keeps topologyPairToAntiAffinityPods and antiAffinityPodToTopologyPairs in sync +// as they are the inverse of each others. +type topologyPairsMaps struct { + topologyPairToPods map[topologyPair]podSet + podToTopologyPairs map[string]topologyPairSet +} + // NOTE: When new fields are added/removed or logic is changed, please make sure that // RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes. type predicateMetadata struct { @@ -52,8 +69,8 @@ type predicateMetadata struct { podBestEffort bool podRequest *schedulercache.Resource podPorts []*v1.ContainerPort - //key is a pod full name with the anti-affinity rules. - matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm + + topologyPairsAntiAffinityPodsMap *topologyPairsMaps // A map of node name to a list of Pods on the node that can potentially match // the affinity rules of the "pod". nodeNameToMatchingAffinityPods map[string][]*v1.Pod @@ -113,7 +130,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf if pod == nil { return nil } - matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap) + topologyPairsMaps, err := getMatchingTopologyPairs(pod, nodeNameToInfoMap) if err != nil { return nil } @@ -127,9 +144,9 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf podBestEffort: isPodBestEffort(pod), podRequest: GetResourceRequest(pod), podPorts: schedutil.GetContainerPorts(pod), - matchingAntiAffinityTerms: matchingTerms, nodeNameToMatchingAffinityPods: affinityPods, nodeNameToMatchingAntiAffinityPods: antiAffinityPods, + topologyPairsAntiAffinityPodsMap: topologyPairsMaps, } for predicateName, precomputeFunc := range predicateMetadataProducers { glog.V(10).Infof("Precompute: %v", predicateName) @@ -138,6 +155,43 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf return predicateMetadata } +// returns a pointer to a new topologyPairsMaps +func newTopologyPairsMaps() *topologyPairsMaps { + return &topologyPairsMaps{topologyPairToPods: make(map[topologyPair]podSet), + podToTopologyPairs: make(map[string]topologyPairSet)} +} + +func (topologyPairsMaps *topologyPairsMaps) addTopologyPair(pair topologyPair, pod *v1.Pod) { + podFullName := schedutil.GetPodFullName(pod) + if topologyPairsMaps.topologyPairToPods[pair] == nil { + topologyPairsMaps.topologyPairToPods[pair] = make(map[*v1.Pod]struct{}) + } + topologyPairsMaps.topologyPairToPods[pair][pod] = struct{}{} + if topologyPairsMaps.podToTopologyPairs[podFullName] == nil { + topologyPairsMaps.podToTopologyPairs[podFullName] = make(map[topologyPair]struct{}) + } + topologyPairsMaps.podToTopologyPairs[podFullName][pair] = struct{}{} +} + +func (topologyPairsMaps *topologyPairsMaps) removePod(deletedPod *v1.Pod) { + deletedPodFullName := schedutil.GetPodFullName(deletedPod) + for pair := range topologyPairsMaps.podToTopologyPairs[deletedPodFullName] { + delete(topologyPairsMaps.topologyPairToPods[pair], deletedPod) + if len(topologyPairsMaps.topologyPairToPods[pair]) == 0 { + delete(topologyPairsMaps.topologyPairToPods, pair) + } + } + delete(topologyPairsMaps.podToTopologyPairs, deletedPodFullName) +} + +func (topologyPairsMaps *topologyPairsMaps) appendMaps(toAppend *topologyPairsMaps) { + for pair := range toAppend.topologyPairToPods { + for pod := range toAppend.topologyPairToPods[pair] { + topologyPairsMaps.addTopologyPair(pair, pod) + } + } +} + // RemovePod changes predicateMetadata assuming that the given `deletedPod` is // deleted from the system. func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error { @@ -145,8 +199,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error { if deletedPodFullName == schedutil.GetPodFullName(meta.pod) { return fmt.Errorf("deletedPod and meta.pod must not be the same") } - // Delete any anti-affinity rule from the deletedPod. - delete(meta.matchingAntiAffinityTerms, deletedPodFullName) + meta.topologyPairsAntiAffinityPodsMap.removePod(deletedPod) // Delete pod from the matching affinity or anti-affinity pods if exists. affinity := meta.pod.Spec.Affinity podNodeName := deletedPod.Spec.NodeName @@ -203,18 +256,12 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache return fmt.Errorf("invalid node in nodeInfo") } // Add matching anti-affinity terms of the addedPod to the map. - podMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) + topologyPairsMaps, err := getMatchingTopologyPairsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) if err != nil { return err } - if len(podMatchingTerms) > 0 { - existingTerms, found := meta.matchingAntiAffinityTerms[addedPodFullName] - if found { - meta.matchingAntiAffinityTerms[addedPodFullName] = append(existingTerms, - podMatchingTerms...) - } else { - meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms - } + if len(topologyPairsMaps.podToTopologyPairs) > 0 { + meta.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps) } // Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed. affinity := meta.pod.Spec.Affinity @@ -261,17 +308,14 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache // its maps and slices, but it does not copy the contents of pointer values. func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { newPredMeta := &predicateMetadata{ - pod: meta.pod, - podBestEffort: meta.podBestEffort, - podRequest: meta.podRequest, - serviceAffinityInUse: meta.serviceAffinityInUse, - ignoredExtendedResources: meta.ignoredExtendedResources, + pod: meta.pod, + podBestEffort: meta.podBestEffort, + podRequest: meta.podRequest, + serviceAffinityInUse: meta.serviceAffinityInUse, + ignoredExtendedResources: meta.ignoredExtendedResources, + topologyPairsAntiAffinityPodsMap: meta.topologyPairsAntiAffinityPodsMap, } newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...) - newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{} - for k, v := range meta.matchingAntiAffinityTerms { - newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) - } newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod) for k, v := range meta.nodeNameToMatchingAffinityPods { newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...) @@ -280,6 +324,8 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { for k, v := range meta.nodeNameToMatchingAntiAffinityPods { newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...) } + newPredMeta.topologyPairsAntiAffinityPodsMap = newTopologyPairsMaps() + newPredMeta.topologyPairsAntiAffinityPodsMap.appendMaps(meta.topologyPairsAntiAffinityPodsMap) newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...) newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index fde740f745..04da8ed5c0 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -28,42 +28,6 @@ import ( schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) -// sortableAntiAffinityTerms lets us to sort anti-affinity terms. -type sortableAntiAffinityTerms []matchingPodAntiAffinityTerm - -// Less establishes some ordering between two matchingPodAntiAffinityTerms for -// sorting. -func (s sortableAntiAffinityTerms) Less(i, j int) bool { - t1, t2 := s[i], s[j] - if t1.node.Name != t2.node.Name { - return t1.node.Name < t2.node.Name - } - if len(t1.term.Namespaces) != len(t2.term.Namespaces) { - return len(t1.term.Namespaces) < len(t2.term.Namespaces) - } - if t1.term.TopologyKey != t2.term.TopologyKey { - return t1.term.TopologyKey < t2.term.TopologyKey - } - if len(t1.term.LabelSelector.MatchLabels) != len(t2.term.LabelSelector.MatchLabels) { - return len(t1.term.LabelSelector.MatchLabels) < len(t2.term.LabelSelector.MatchLabels) - } - return false -} -func (s sortableAntiAffinityTerms) Len() int { return len(s) } -func (s sortableAntiAffinityTerms) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -var _ = sort.Interface(sortableAntiAffinityTerms{}) - -func sortAntiAffinityTerms(terms map[string][]matchingPodAntiAffinityTerm) { - for k, v := range terms { - sortableTerms := sortableAntiAffinityTerms(v) - sort.Sort(sortableTerms) - terms[k] = sortableTerms - } -} - // sortablePods lets us to sort pods. type sortablePods []*v1.Pod @@ -113,11 +77,6 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { for !reflect.DeepEqual(meta1.podPorts, meta2.podPorts) { return fmt.Errorf("podPorts are not equal") } - sortAntiAffinityTerms(meta1.matchingAntiAffinityTerms) - sortAntiAffinityTerms(meta2.matchingAntiAffinityTerms) - if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) { - return fmt.Errorf("matchingAntiAffinityTerms are not euqal") - } sortNodePodMap(meta1.nodeNameToMatchingAffinityPods) sortNodePodMap(meta2.nodeNameToMatchingAffinityPods) if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) { @@ -128,6 +87,14 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { if !reflect.DeepEqual(meta1.nodeNameToMatchingAntiAffinityPods, meta2.nodeNameToMatchingAntiAffinityPods) { return fmt.Errorf("nodeNameToMatchingAntiAffinityPods are not euqal") } + if !reflect.DeepEqual(meta1.topologyPairsAntiAffinityPodsMap.podToTopologyPairs, + meta2.topologyPairsAntiAffinityPodsMap.podToTopologyPairs) { + return fmt.Errorf("topologyPairsAntiAffinityPodsMap.podToTopologyPairs are not equal") + } + if !reflect.DeepEqual(meta1.topologyPairsAntiAffinityPodsMap.topologyPairToPods, + meta2.topologyPairsAntiAffinityPodsMap.topologyPairToPods) { + return fmt.Errorf("topologyPairsAntiAffinityPodsMap.topologyPairToPods are not equal") + } if meta1.serviceAffinityInUse { sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList) sort.Sort(sortablePods1) @@ -465,13 +432,25 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { HostIP: "1.2.3.4", }, }, - matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{ - "term1": { - { - term: &v1.PodAffinityTerm{TopologyKey: "node"}, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - }, + topologyPairsAntiAffinityPodsMap: &topologyPairsMaps{ + topologyPairToPods: map[topologyPair]podSet{ + {key: "name", value: "machine1"}: { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeC"}, + }: struct{}{}, + }, + {key: "name", value: "machine2"}: { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, + }: struct{}{}, + }, + }, + podToTopologyPairs: map[string]topologyPairSet{ + "p2_": { + topologyPair{key: "name", value: "machine1"}: struct{}{}, + }, + "p1_": { + topologyPair{key: "name", value: "machine2"}: struct{}{}, }, }, }, diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 17d8f0591d..c919282c83 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po return terms } -func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, error) { +func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (*topologyPairsMaps, error) { allNodeNames := make([]string, 0, len(nodeInfoMap)) for name := range nodeInfoMap { allNodeNames = append(allNodeNames, name) @@ -1254,13 +1254,13 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler var lock sync.Mutex var firstError error - result := make(map[string][]matchingPodAntiAffinityTerm) - appendResult := func(toAppend map[string][]matchingPodAntiAffinityTerm) { + + topologyMaps := newTopologyPairsMaps() + + appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) { lock.Lock() defer lock.Unlock() - for uid, terms := range toAppend { - result[uid] = append(result[uid], terms...) - } + topologyMaps.appendMaps(toAppend) } catchError := func(err error) { lock.Lock() @@ -1277,7 +1277,7 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler catchError(fmt.Errorf("node not found")) return } - nodeResult := make(map[string][]matchingPodAntiAffinityTerm) + nodeTopologyMaps := newTopologyPairsMaps() for _, existingPod := range nodeInfo.PodsWithAffinity() { affinity := existingPod.Spec.Affinity if affinity == nil { @@ -1291,23 +1291,23 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler return } if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { - existingPodFullName := schedutil.GetPodFullName(existingPod) - nodeResult[existingPodFullName] = append( - nodeResult[existingPodFullName], - matchingPodAntiAffinityTerm{term: &term, node: node}) + if topologyValue, ok := node.Labels[term.TopologyKey]; ok { + pair := topologyPair{key: term.TopologyKey, value: topologyValue} + nodeTopologyMaps.addTopologyPair(pair, existingPod) + } } } - } - if len(nodeResult) > 0 { - appendResult(nodeResult) + if len(nodeTopologyMaps.podToTopologyPairs) > 0 { + appendTopologyPairsMaps(nodeTopologyMaps) + } } } workqueue.Parallelize(16, len(allNodeNames), processNode) - return result, firstError + return topologyMaps, firstError } -func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, error) { - var result []matchingPodAntiAffinityTerm +func getMatchingTopologyPairsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (*topologyPairsMaps, error) { + topologyMaps := newTopologyPairsMaps() affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { @@ -1317,15 +1317,18 @@ func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.P return nil, err } if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) { - result = append(result, matchingPodAntiAffinityTerm{term: &term, node: node}) + if topologyValue, ok := node.Labels[term.TopologyKey]; ok { + pair := topologyPair{key: term.TopologyKey, value: topologyValue} + topologyMaps.addTopologyPair(pair, existingPod) + } } } } - return result, nil + return topologyMaps, nil } +func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, allPods []*v1.Pod) (*topologyPairsMaps, error) { + topologyMaps := newTopologyPairsMaps() -func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, error) { - result := make(map[string][]matchingPodAntiAffinityTerm) for _, existingPod := range allPods { affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { @@ -1337,17 +1340,14 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [ } return nil, err } - existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) + existingPodsTopologyMaps, err := getMatchingTopologyPairsOfExistingPod(pod, existingPod, existingPodNode) if err != nil { return nil, err } - if len(existingPodMatchingTerms) > 0 { - existingPodFullName := schedutil.GetPodFullName(existingPod) - result[existingPodFullName] = existingPodMatchingTerms - } + topologyMaps.appendMaps(existingPodsTopologyMaps) } } - return result, nil + return topologyMaps, nil } // Checks if scheduling the pod onto this node would break any anti-affinity @@ -1357,9 +1357,9 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta if node == nil { return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil") } - var matchingTerms map[string][]matchingPodAntiAffinityTerm + var topologyMaps *topologyPairsMaps if predicateMeta, ok := meta.(*predicateMetadata); ok { - matchingTerms = predicateMeta.matchingAntiAffinityTerms + topologyMaps = predicateMeta.topologyPairsAntiAffinityPodsMap } else { // Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not // present in nodeInfo. Pods on other nodes pass the filter. @@ -1369,25 +1369,19 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } - if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil { + if topologyMaps, err = c.getMatchingAntiAffinityTopologyPairs(pod, filteredPods); err != nil { errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err) glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } } - for _, terms := range matchingTerms { - for i := range terms { - term := &terms[i] - if len(term.term.TopologyKey) == 0 { - errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") - glog.Error(errMessage) - return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) - } - if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) { - glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v", - podName(pod), node.Name, term.term) - return ErrExistingPodsAntiAffinityRulesNotMatch, nil - } + + // Iterate over topology pairs to get any of the pods being affected by + // the scheduled pod anti-affinity rules + for topologyKey, topologyValue := range node.Labels { + if topologyMaps.topologyPairToPods[topologyPair{key: topologyKey, value: topologyValue}] != nil { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v", podName(pod), node.Name) + return ErrExistingPodsAntiAffinityRulesNotMatch, nil } } if glog.V(10) {