From f6659e454344775784d30ae15dd6995a62bb7012 Mon Sep 17 00:00:00 2001 From: Ahmad Diaa Date: Tue, 7 Aug 2018 03:17:21 +0200 Subject: [PATCH] further enhancements removing matchingTerms from metadata --- .../algorithm/predicates/metadata.go | 77 ++++++------ .../algorithm/predicates/metadata_test.go | 31 ++--- .../algorithm/predicates/predicates.go | 111 ++++++++---------- 3 files changed, 102 insertions(+), 117 deletions(-) diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 80db0e65e4..ea822ac843 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -38,6 +38,12 @@ type PredicateMetadataFactory struct { podLister algorithm.PodLister } +// AntiAffinityTerm's topology key value used in predicate metadata +type topologyPair struct { + key string + value string +} + // Note that predicateMetadata and matchingPodAntiAffinityTerm need to be declared in the same file // due to the way declarations are processed in predicate declaration unit tests. type matchingPodAntiAffinityTerm struct { @@ -52,11 +58,11 @@ type predicateMetadata struct { podBestEffort bool podRequest *schedulercache.Resource podPorts []*v1.ContainerPort - //key is a pod full name with the anti-affinity rules. - matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm - // A map of antiffinity terms' topology ke values to the pods' names + // A map of antiffinity terms' topology pairs to the pods' // that can potentially match the affinity rules of the pod - topologyValueToAntiAffinityPods map[string][]string + topologyPairToAntiAffinityPods map[topologyPair][]*v1.Pod + // Reverse map for topologyPairToAntiAffinityPods to reduce deletion time + antiAffinityPodToTopologyPairs map[string][]topologyPair // A map of node name to a list of Pods on the node that can potentially match // the affinity rules of the "pod". nodeNameToMatchingAffinityPods map[string][]*v1.Pod @@ -116,7 +122,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf if pod == nil { return nil } - matchingTerms, topologyValues, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap) + podToTopolgyPair, topologyPairToPods, err := getMatchingTopologyPairs(pod, nodeNameToInfoMap) if err != nil { return nil } @@ -130,10 +136,10 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf podBestEffort: isPodBestEffort(pod), podRequest: GetResourceRequest(pod), podPorts: schedutil.GetContainerPorts(pod), - matchingAntiAffinityTerms: matchingTerms, nodeNameToMatchingAffinityPods: affinityPods, nodeNameToMatchingAntiAffinityPods: antiAffinityPods, - topologyValueToAntiAffinityPods: topologyValues, + topologyPairToAntiAffinityPods: topologyPairToPods, + antiAffinityPodToTopologyPairs: podToTopolgyPair, } for predicateName, precomputeFunc := range predicateMetadataProducers { glog.V(10).Infof("Precompute: %v", predicateName) @@ -149,23 +155,22 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error { if deletedPodFullName == schedutil.GetPodFullName(meta.pod) { return fmt.Errorf("deletedPod and meta.pod must not be the same") } - - // Delete pod from matching topology values map - for _, term := range meta.matchingAntiAffinityTerms[deletedPodFullName] { - if topologyValue, ok := term.node.Labels[term.term.TopologyKey]; ok { - for index, podName := range meta.topologyValueToAntiAffinityPods[topologyValue] { - if podName == deletedPodFullName { - podsList := meta.topologyValueToAntiAffinityPods[topologyValue] - meta.topologyValueToAntiAffinityPods[topologyValue] = append(podsList[:index], - podsList[index+1:]...) - break + // Delete pod from matching topology pairs map + for _, pair := range meta.antiAffinityPodToTopologyPairs[deletedPodFullName] { + for index, pod := range meta.topologyPairToAntiAffinityPods[pair] { + if schedutil.GetPodFullName(pod) == deletedPodFullName { + podsList := meta.topologyPairToAntiAffinityPods[pair] + podsList[index] = podsList[len(podsList)-1] + if len(podsList) <= 1 { + delete(meta.topologyPairToAntiAffinityPods, pair) + } else { + meta.topologyPairToAntiAffinityPods[pair] = podsList[:len(podsList)-1] } + break } } } - - // Delete any anti-affinity rule from the deletedPod. - delete(meta.matchingAntiAffinityTerms, deletedPodFullName) + delete(meta.antiAffinityPodToTopologyPairs, deletedPodFullName) // Delete pod from the matching affinity or anti-affinity pods if exists. affinity := meta.pod.Spec.Affinity podNodeName := deletedPod.Spec.NodeName @@ -222,21 +227,16 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache return fmt.Errorf("invalid node in nodeInfo") } // Add matching anti-affinity terms of the addedPod to the map. - podMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) + matchingPodToTopologyPairs, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) if err != nil { return err } - if len(podMatchingTerms) > 0 { - existingTerms, found := meta.matchingAntiAffinityTerms[addedPodFullName] - if found { - meta.matchingAntiAffinityTerms[addedPodFullName] = append(existingTerms, - podMatchingTerms...) - } else { - meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms + if len(matchingPodToTopologyPairs) > 0 { + for pair, pods := range podTopologyPairToMatchingPods { + meta.topologyPairToAntiAffinityPods[pair] = append(meta.topologyPairToAntiAffinityPods[pair], pods...) } - - for topologyValue, pods := range podTopologyValuesToMatchingPods { - meta.topologyValueToAntiAffinityPods[topologyValue] = append(meta.topologyValueToAntiAffinityPods[topologyValue], pods...) + for pod, pairs := range matchingPodToTopologyPairs { + meta.antiAffinityPodToTopologyPairs[pod] = append(meta.antiAffinityPodToTopologyPairs[pod], pairs...) } } // Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed. @@ -291,10 +291,6 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { ignoredExtendedResources: meta.ignoredExtendedResources, } newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...) - newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{} - for k, v := range meta.matchingAntiAffinityTerms { - newPredMeta.matchingAntiAffinityTerms[k] = append([]matchingPodAntiAffinityTerm(nil), v...) - } newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod) for k, v := range meta.nodeNameToMatchingAffinityPods { newPredMeta.nodeNameToMatchingAffinityPods[k] = append([]*v1.Pod(nil), v...) @@ -303,15 +299,18 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { for k, v := range meta.nodeNameToMatchingAntiAffinityPods { newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...) } - newPredMeta.topologyValueToAntiAffinityPods = make(map[string][]string) - for k, v := range meta.topologyValueToAntiAffinityPods { - newPredMeta.topologyValueToAntiAffinityPods[k] = append([]string(nil), v...) + newPredMeta.topologyPairToAntiAffinityPods = make(map[topologyPair][]*v1.Pod) + for k, v := range meta.topologyPairToAntiAffinityPods { + newPredMeta.topologyPairToAntiAffinityPods[k] = append([]*v1.Pod(nil), v...) + } + newPredMeta.antiAffinityPodToTopologyPairs = make(map[string][]topologyPair) + for k, v := range meta.antiAffinityPodToTopologyPairs { + newPredMeta.antiAffinityPodToTopologyPairs[k] = append([]topologyPair(nil), v...) } newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...) newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), meta.serviceAffinityMatchingPodList...) - return (algorithm.PredicateMetadata)(newPredMeta) } diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index 84e7cc55c3..287a94d10c 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -113,11 +113,6 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error { for !reflect.DeepEqual(meta1.podPorts, meta2.podPorts) { return fmt.Errorf("podPorts are not equal") } - sortAntiAffinityTerms(meta1.matchingAntiAffinityTerms) - sortAntiAffinityTerms(meta2.matchingAntiAffinityTerms) - if !reflect.DeepEqual(meta1.matchingAntiAffinityTerms, meta2.matchingAntiAffinityTerms) { - return fmt.Errorf("matchingAntiAffinityTerms are not euqal") - } sortNodePodMap(meta1.nodeNameToMatchingAffinityPods) sortNodePodMap(meta2.nodeNameToMatchingAffinityPods) if !reflect.DeepEqual(meta1.nodeNameToMatchingAffinityPods, meta2.nodeNameToMatchingAffinityPods) { @@ -465,19 +460,25 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { HostIP: "1.2.3.4", }, }, - matchingAntiAffinityTerms: map[string][]matchingPodAntiAffinityTerm{ - "term1": { - { - term: &v1.PodAffinityTerm{TopologyKey: "node"}, - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Name: "machine1"}, - }, + topologyPairToAntiAffinityPods: map[topologyPair][]*v1.Pod{ + {key: "name", value: "machine1"}: { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeC"}, + }, + }, + {key: "name", value: "machine2"}: { + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, + Spec: v1.PodSpec{NodeName: "nodeA"}, }, }, }, - topologyValueToAntiAffinityPods: map[string][]string{ - "machine1": {"p1", "p2"}, - "machine2": {"p3"}, + antiAffinityPodToTopologyPairs: map[string][]topologyPair{ + "p2": { + topologyPair{key: "name", value: "machine1"}, + }, + "p1": { + topologyPair{key: "name", value: "machine2"}, + }, }, nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{ "nodeA": { diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 477bd0753b..b280c71e11 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po return terms } -func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) { +func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) { allNodeNames := make([]string, 0, len(nodeInfoMap)) for name := range nodeInfoMap { allNodeNames = append(allNodeNames, name) @@ -1254,25 +1254,25 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler var lock sync.Mutex var firstError error - podsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm) - topologyValuesToMatchingPods := make(map[string][]string) - appendPodsMatchingAntiAffinityTerms := func(toAppend map[string][]matchingPodAntiAffinityTerm) { + topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod) + matchingPodToTopologyPair := make(map[string][]topologyPair) + + appendTopologyPairToMatchingPods := func(toAppend map[topologyPair][]*v1.Pod) { lock.Lock() defer lock.Unlock() - for uid, terms := range toAppend { - podsToMatchingAntiAffinityTerms[uid] = append(podsToMatchingAntiAffinityTerms[uid], terms...) + for pair, pods := range toAppend { + topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...) } } - appendTopologyValuesMatchingPods := func(toAppend map[string][]string) { + appendMatchingPodToTopologyPair := func(toAppend map[string][]topologyPair) { lock.Lock() defer lock.Unlock() - for topologyValue, pods := range toAppend { - topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...) + for pod, pairs := range toAppend { + matchingPodToTopologyPair[pod] = append(matchingPodToTopologyPair[pod], pairs...) } } - catchError := func(err error) { lock.Lock() defer lock.Unlock() @@ -1288,9 +1288,8 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler catchError(fmt.Errorf("node not found")) return } - nodePodsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm) - nodeTopologyValuesToMatchingPods := make(map[string][]string) - + nodeTopologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod) + nodeMatchingPodToTopologyPairs := make(map[string][]topologyPair) for _, existingPod := range nodeInfo.PodsWithAffinity() { affinity := existingPod.Spec.Affinity if affinity == nil { @@ -1304,31 +1303,27 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler return } if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { - existingPodFullName := schedutil.GetPodFullName(existingPod) - nodePodsToMatchingAntiAffinityTerms[existingPodFullName] = append( - nodePodsToMatchingAntiAffinityTerms[existingPodFullName], - matchingPodAntiAffinityTerm{term: &term, node: node}) - if topologyValue, ok := node.Labels[term.TopologyKey]; ok { - nodeTopologyValuesToMatchingPods[topologyValue] = append(nodeTopologyValuesToMatchingPods[topologyValue], existingPodFullName) + pair := topologyPair{key: term.TopologyKey, value: topologyValue} + nodeTopologyPairToMatchingPods[pair] = append(nodeTopologyPairToMatchingPods[pair], existingPod) + existingPodFullName := schedutil.GetPodFullName(existingPod) + nodeMatchingPodToTopologyPairs[existingPodFullName] = append(nodeMatchingPodToTopologyPairs[existingPodFullName], pair) } } } } - if len(nodePodsToMatchingAntiAffinityTerms) > 0 { - appendPodsMatchingAntiAffinityTerms(nodePodsToMatchingAntiAffinityTerms) - } - if len(nodeTopologyValuesToMatchingPods) > 0 { - appendTopologyValuesMatchingPods(nodeTopologyValuesToMatchingPods) + if len(nodeTopologyPairToMatchingPods) > 0 { + appendTopologyPairToMatchingPods(nodeTopologyPairToMatchingPods) + appendMatchingPodToTopologyPair(nodeMatchingPodToTopologyPairs) } } workqueue.Parallelize(16, len(allNodeNames), processNode) - return podsToMatchingAntiAffinityTerms, topologyValuesToMatchingPods, firstError + return matchingPodToTopologyPair, topologyPairToMatchingPods, firstError } -func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, map[string][]string, error) { - var podMatchingTerms []matchingPodAntiAffinityTerm - topologyValuesToMatchingPods := make(map[string][]string) +func getMatchingTopologyPairsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) { + topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod) + matchingPodToTopologyPairs := make(map[string][]topologyPair) affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { @@ -1339,21 +1334,21 @@ func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.P return nil, nil, err } if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) { - podMatchingTerms = append(podMatchingTerms, matchingPodAntiAffinityTerm{term: &term, node: node}) - existingPodFullName := schedutil.GetPodFullName(existingPod) if topologyValue, ok := node.Labels[term.TopologyKey]; ok { - topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], existingPodFullName) + pair := topologyPair{key: term.TopologyKey, value: topologyValue} + topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], existingPod) + existingPodFullName := schedutil.GetPodFullName(existingPod) + matchingPodToTopologyPairs[existingPodFullName] = append(matchingPodToTopologyPairs[existingPodFullName], pair) } } } } - return podMatchingTerms, topologyValuesToMatchingPods, nil + return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil } +func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, allPods []*v1.Pod) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) { -func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) { - result := make(map[string][]matchingPodAntiAffinityTerm) - topologyValuesToMatchingPods := make(map[string][]string) - + topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod) + matchingPodToTopologyPairs := make(map[string][]topologyPair) for _, existingPod := range allPods { affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { @@ -1365,20 +1360,19 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [ } return nil, nil, err } - existingPodMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) + existingPodTopologyTerms, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(pod, existingPod, existingPodNode) if err != nil { return nil, nil, err } - if len(existingPodMatchingTerms) > 0 { - existingPodFullName := schedutil.GetPodFullName(existingPod) - result[existingPodFullName] = existingPodMatchingTerms + for pair, pods := range podTopologyPairToMatchingPods { + topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...) } - for topologyValue, pods := range podTopologyValuesToMatchingPods { - topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...) + for pod, pairs := range existingPodTopologyTerms { + matchingPodToTopologyPairs[pod] = append(matchingPodToTopologyPairs[pod], pairs...) } } } - return result, topologyValuesToMatchingPods, nil + return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil } // Checks if scheduling the pod onto this node would break any anti-affinity @@ -1388,12 +1382,10 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta if node == nil { return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil") } - var matchingTerms map[string][]matchingPodAntiAffinityTerm - var topologyValuesToMatchingPods map[string][]string + var topologyPairToMatchingPods map[topologyPair][]*v1.Pod if predicateMeta, ok := meta.(*predicateMetadata); ok { - matchingTerms = predicateMeta.matchingAntiAffinityTerms - topologyValuesToMatchingPods = predicateMeta.topologyValueToAntiAffinityPods + topologyPairToMatchingPods = predicateMeta.topologyPairToAntiAffinityPods } else { // Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not // present in nodeInfo. Pods on other nodes pass the filter. @@ -1403,29 +1395,22 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } - if matchingTerms, topologyValuesToMatchingPods, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil { + if _, topologyPairToMatchingPods, err = c.getMatchingAntiAffinityTopologyPairs(pod, filteredPods); err != nil { errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err) glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } } - // Iterate over topology values, to get matching pods and get their matching terms to check for same topolgy key - // currently ignored if predicateMetadata is not precomputed - for _, topologyValue := range node.Labels { - potentialPods := topologyValuesToMatchingPods[topologyValue] - for _, matchingPod := range potentialPods { - podTerms := matchingTerms[matchingPod] - for i := range podTerms { - term := &podTerms[i] - if len(term.term.TopologyKey) == 0 { - errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") - glog.Error(errMessage) - return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) - } - if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) { + // Iterate over topology topology pairs to get any of the pods being affected by + // the scheduled pod anti-affinity rules + for topologyKey, topologyValue := range node.Labels { + if violatedPods, ok := topologyPairToMatchingPods[topologyPair{key: topologyKey, value: topologyValue}]; ok { + affinity := violatedPods[0].Spec.Affinity + for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { + if term.TopologyKey == topologyKey { glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v", - podName(pod), node.Name, term.term) + podName(pod), node.Name, term) return ErrExistingPodsAntiAffinityRulesNotMatch, nil } }