From 3fb6912d08194da623bd7680947b31c44359ef5a Mon Sep 17 00:00:00 2001 From: Mohamed Mehany <7327188+mohamed-mehany@users.noreply.github.com> Date: Mon, 30 Jul 2018 04:59:26 +0200 Subject: [PATCH] add topologyValue map to reduce search space --- .../algorithm/predicates/metadata.go | 32 +++++- .../algorithm/predicates/metadata_test.go | 4 + .../algorithm/predicates/predicates.go | 107 ++++++++++++------ 3 files changed, 108 insertions(+), 35 deletions(-) diff --git a/pkg/scheduler/algorithm/predicates/metadata.go b/pkg/scheduler/algorithm/predicates/metadata.go index 2b276ede61..80db0e65e4 100644 --- a/pkg/scheduler/algorithm/predicates/metadata.go +++ b/pkg/scheduler/algorithm/predicates/metadata.go @@ -54,6 +54,9 @@ type predicateMetadata struct { podPorts []*v1.ContainerPort //key is a pod full name with the anti-affinity rules. matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm + // A map of antiffinity terms' topology ke values to the pods' names + // that can potentially match the affinity rules of the pod + topologyValueToAntiAffinityPods map[string][]string // A map of node name to a list of Pods on the node that can potentially match // the affinity rules of the "pod". nodeNameToMatchingAffinityPods map[string][]*v1.Pod @@ -113,7 +116,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf if pod == nil { return nil } - matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap) + matchingTerms, topologyValues, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap) if err != nil { return nil } @@ -130,6 +133,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf matchingAntiAffinityTerms: matchingTerms, nodeNameToMatchingAffinityPods: affinityPods, nodeNameToMatchingAntiAffinityPods: antiAffinityPods, + topologyValueToAntiAffinityPods: topologyValues, } for predicateName, precomputeFunc := range predicateMetadataProducers { glog.V(10).Infof("Precompute: %v", predicateName) @@ -145,6 +149,21 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error { if deletedPodFullName == schedutil.GetPodFullName(meta.pod) { return fmt.Errorf("deletedPod and meta.pod must not be the same") } + + // Delete pod from matching topology values map + for _, term := range meta.matchingAntiAffinityTerms[deletedPodFullName] { + if topologyValue, ok := term.node.Labels[term.term.TopologyKey]; ok { + for index, podName := range meta.topologyValueToAntiAffinityPods[topologyValue] { + if podName == deletedPodFullName { + podsList := meta.topologyValueToAntiAffinityPods[topologyValue] + meta.topologyValueToAntiAffinityPods[topologyValue] = append(podsList[:index], + podsList[index+1:]...) + break + } + } + } + } + // Delete any anti-affinity rule from the deletedPod. delete(meta.matchingAntiAffinityTerms, deletedPodFullName) // Delete pod from the matching affinity or anti-affinity pods if exists. @@ -203,7 +222,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache return fmt.Errorf("invalid node in nodeInfo") } // Add matching anti-affinity terms of the addedPod to the map. - podMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) + podMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node()) if err != nil { return err } @@ -215,6 +234,10 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache } else { meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms } + + for topologyValue, pods := range podTopologyValuesToMatchingPods { + meta.topologyValueToAntiAffinityPods[topologyValue] = append(meta.topologyValueToAntiAffinityPods[topologyValue], pods...) + } } // Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed. affinity := meta.pod.Spec.Affinity @@ -280,10 +303,15 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata { for k, v := range meta.nodeNameToMatchingAntiAffinityPods { newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...) } + newPredMeta.topologyValueToAntiAffinityPods = make(map[string][]string) + for k, v := range meta.topologyValueToAntiAffinityPods { + newPredMeta.topologyValueToAntiAffinityPods[k] = append([]string(nil), v...) + } newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil), meta.serviceAffinityMatchingPodServices...) newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil), meta.serviceAffinityMatchingPodList...) + return (algorithm.PredicateMetadata)(newPredMeta) } diff --git a/pkg/scheduler/algorithm/predicates/metadata_test.go b/pkg/scheduler/algorithm/predicates/metadata_test.go index fde740f745..84e7cc55c3 100644 --- a/pkg/scheduler/algorithm/predicates/metadata_test.go +++ b/pkg/scheduler/algorithm/predicates/metadata_test.go @@ -475,6 +475,10 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) { }, }, }, + topologyValueToAntiAffinityPods: map[string][]string{ + "machine1": {"p1", "p2"}, + "machine2": {"p3"}, + }, nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{ "nodeA": { &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1}, diff --git a/pkg/scheduler/algorithm/predicates/predicates.go b/pkg/scheduler/algorithm/predicates/predicates.go index 17d8f0591d..477bd0753b 100644 --- a/pkg/scheduler/algorithm/predicates/predicates.go +++ b/pkg/scheduler/algorithm/predicates/predicates.go @@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po return terms } -func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, error) { +func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) { allNodeNames := make([]string, 0, len(nodeInfoMap)) for name := range nodeInfoMap { allNodeNames = append(allNodeNames, name) @@ -1254,14 +1254,25 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler var lock sync.Mutex var firstError error - result := make(map[string][]matchingPodAntiAffinityTerm) - appendResult := func(toAppend map[string][]matchingPodAntiAffinityTerm) { + podsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm) + topologyValuesToMatchingPods := make(map[string][]string) + + appendPodsMatchingAntiAffinityTerms := func(toAppend map[string][]matchingPodAntiAffinityTerm) { lock.Lock() defer lock.Unlock() for uid, terms := range toAppend { - result[uid] = append(result[uid], terms...) + podsToMatchingAntiAffinityTerms[uid] = append(podsToMatchingAntiAffinityTerms[uid], terms...) } } + + appendTopologyValuesMatchingPods := func(toAppend map[string][]string) { + lock.Lock() + defer lock.Unlock() + for topologyValue, pods := range toAppend { + topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...) + } + } + catchError := func(err error) { lock.Lock() defer lock.Unlock() @@ -1277,7 +1288,9 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler catchError(fmt.Errorf("node not found")) return } - nodeResult := make(map[string][]matchingPodAntiAffinityTerm) + nodePodsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm) + nodeTopologyValuesToMatchingPods := make(map[string][]string) + for _, existingPod := range nodeInfo.PodsWithAffinity() { affinity := existingPod.Spec.Affinity if affinity == nil { @@ -1292,40 +1305,55 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler } if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) { existingPodFullName := schedutil.GetPodFullName(existingPod) - nodeResult[existingPodFullName] = append( - nodeResult[existingPodFullName], + nodePodsToMatchingAntiAffinityTerms[existingPodFullName] = append( + nodePodsToMatchingAntiAffinityTerms[existingPodFullName], matchingPodAntiAffinityTerm{term: &term, node: node}) + + if topologyValue, ok := node.Labels[term.TopologyKey]; ok { + nodeTopologyValuesToMatchingPods[topologyValue] = append(nodeTopologyValuesToMatchingPods[topologyValue], existingPodFullName) + } } } } - if len(nodeResult) > 0 { - appendResult(nodeResult) + if len(nodePodsToMatchingAntiAffinityTerms) > 0 { + appendPodsMatchingAntiAffinityTerms(nodePodsToMatchingAntiAffinityTerms) + } + if len(nodeTopologyValuesToMatchingPods) > 0 { + appendTopologyValuesMatchingPods(nodeTopologyValuesToMatchingPods) } } workqueue.Parallelize(16, len(allNodeNames), processNode) - return result, firstError + return podsToMatchingAntiAffinityTerms, topologyValuesToMatchingPods, firstError } -func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, error) { - var result []matchingPodAntiAffinityTerm +func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, map[string][]string, error) { + var podMatchingTerms []matchingPodAntiAffinityTerm + topologyValuesToMatchingPods := make(map[string][]string) + affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) { namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term) selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector) if err != nil { - return nil, err + return nil, nil, err } if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) { - result = append(result, matchingPodAntiAffinityTerm{term: &term, node: node}) + podMatchingTerms = append(podMatchingTerms, matchingPodAntiAffinityTerm{term: &term, node: node}) + existingPodFullName := schedutil.GetPodFullName(existingPod) + if topologyValue, ok := node.Labels[term.TopologyKey]; ok { + topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], existingPodFullName) + } } } } - return result, nil + return podMatchingTerms, topologyValuesToMatchingPods, nil } -func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, error) { +func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) { result := make(map[string][]matchingPodAntiAffinityTerm) + topologyValuesToMatchingPods := make(map[string][]string) + for _, existingPod := range allPods { affinity := existingPod.Spec.Affinity if affinity != nil && affinity.PodAntiAffinity != nil { @@ -1335,19 +1363,22 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [ glog.Errorf("Node not found, %v", existingPod.Spec.NodeName) continue } - return nil, err + return nil, nil, err } - existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) + existingPodMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode) if err != nil { - return nil, err + return nil, nil, err } if len(existingPodMatchingTerms) > 0 { existingPodFullName := schedutil.GetPodFullName(existingPod) result[existingPodFullName] = existingPodMatchingTerms } + for topologyValue, pods := range podTopologyValuesToMatchingPods { + topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...) + } } } - return result, nil + return result, topologyValuesToMatchingPods, nil } // Checks if scheduling the pod onto this node would break any anti-affinity @@ -1358,8 +1389,11 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil") } var matchingTerms map[string][]matchingPodAntiAffinityTerm + var topologyValuesToMatchingPods map[string][]string + if predicateMeta, ok := meta.(*predicateMetadata); ok { matchingTerms = predicateMeta.matchingAntiAffinityTerms + topologyValuesToMatchingPods = predicateMeta.topologyValueToAntiAffinityPods } else { // Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not // present in nodeInfo. Pods on other nodes pass the filter. @@ -1369,24 +1403,31 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } - if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil { + if matchingTerms, topologyValuesToMatchingPods, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil { errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err) glog.Error(errMessage) return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) } } - for _, terms := range matchingTerms { - for i := range terms { - term := &terms[i] - if len(term.term.TopologyKey) == 0 { - errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") - glog.Error(errMessage) - return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) - } - if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) { - glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v", - podName(pod), node.Name, term.term) - return ErrExistingPodsAntiAffinityRulesNotMatch, nil + + // Iterate over topology values, to get matching pods and get their matching terms to check for same topolgy key + // currently ignored if predicateMetadata is not precomputed + for _, topologyValue := range node.Labels { + potentialPods := topologyValuesToMatchingPods[topologyValue] + for _, matchingPod := range potentialPods { + podTerms := matchingTerms[matchingPod] + for i := range podTerms { + term := &podTerms[i] + if len(term.term.TopologyKey) == 0 { + errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity") + glog.Error(errMessage) + return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage) + } + if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) { + glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v", + podName(pod), node.Name, term.term) + return ErrExistingPodsAntiAffinityRulesNotMatch, nil + } } } }