add topologyValue map to reduce search space

pull/8/head
Mohamed Mehany 2018-07-30 04:59:26 +02:00 committed by Ahmad Diaa
parent 6b41352679
commit 3fb6912d08
3 changed files with 108 additions and 35 deletions

View File

@ -54,6 +54,9 @@ type predicateMetadata struct {
podPorts []*v1.ContainerPort
//key is a pod full name with the anti-affinity rules.
matchingAntiAffinityTerms map[string][]matchingPodAntiAffinityTerm
// A map of antiffinity terms' topology ke values to the pods' names
// that can potentially match the affinity rules of the pod
topologyValueToAntiAffinityPods map[string][]string
// A map of node name to a list of Pods on the node that can potentially match
// the affinity rules of the "pod".
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
@ -113,7 +116,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
if pod == nil {
return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
matchingTerms, topologyValues, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
if err != nil {
return nil
}
@ -130,6 +133,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
matchingAntiAffinityTerms: matchingTerms,
nodeNameToMatchingAffinityPods: affinityPods,
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
topologyValueToAntiAffinityPods: topologyValues,
}
for predicateName, precomputeFunc := range predicateMetadataProducers {
glog.V(10).Infof("Precompute: %v", predicateName)
@ -145,6 +149,21 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
return fmt.Errorf("deletedPod and meta.pod must not be the same")
}
// Delete pod from matching topology values map
for _, term := range meta.matchingAntiAffinityTerms[deletedPodFullName] {
if topologyValue, ok := term.node.Labels[term.term.TopologyKey]; ok {
for index, podName := range meta.topologyValueToAntiAffinityPods[topologyValue] {
if podName == deletedPodFullName {
podsList := meta.topologyValueToAntiAffinityPods[topologyValue]
meta.topologyValueToAntiAffinityPods[topologyValue] = append(podsList[:index],
podsList[index+1:]...)
break
}
}
}
}
// Delete any anti-affinity rule from the deletedPod.
delete(meta.matchingAntiAffinityTerms, deletedPodFullName)
// Delete pod from the matching affinity or anti-affinity pods if exists.
@ -203,7 +222,7 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
return fmt.Errorf("invalid node in nodeInfo")
}
// Add matching anti-affinity terms of the addedPod to the map.
podMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
podMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
if err != nil {
return err
}
@ -215,6 +234,10 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
} else {
meta.matchingAntiAffinityTerms[addedPodFullName] = podMatchingTerms
}
for topologyValue, pods := range podTopologyValuesToMatchingPods {
meta.topologyValueToAntiAffinityPods[topologyValue] = append(meta.topologyValueToAntiAffinityPods[topologyValue], pods...)
}
}
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
affinity := meta.pod.Spec.Affinity
@ -280,10 +303,15 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
}
newPredMeta.topologyValueToAntiAffinityPods = make(map[string][]string)
for k, v := range meta.topologyValueToAntiAffinityPods {
newPredMeta.topologyValueToAntiAffinityPods[k] = append([]string(nil), v...)
}
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
meta.serviceAffinityMatchingPodServices...)
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
meta.serviceAffinityMatchingPodList...)
return (algorithm.PredicateMetadata)(newPredMeta)
}

View File

@ -475,6 +475,10 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
},
},
},
topologyValueToAntiAffinityPods: map[string][]string{
"machine1": {"p1", "p2"},
"machine2": {"p3"},
},
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
"nodeA": {
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},

View File

@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
return terms
}
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, error) {
func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) {
allNodeNames := make([]string, 0, len(nodeInfoMap))
for name := range nodeInfoMap {
allNodeNames = append(allNodeNames, name)
@ -1254,14 +1254,25 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
var lock sync.Mutex
var firstError error
result := make(map[string][]matchingPodAntiAffinityTerm)
appendResult := func(toAppend map[string][]matchingPodAntiAffinityTerm) {
podsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm)
topologyValuesToMatchingPods := make(map[string][]string)
appendPodsMatchingAntiAffinityTerms := func(toAppend map[string][]matchingPodAntiAffinityTerm) {
lock.Lock()
defer lock.Unlock()
for uid, terms := range toAppend {
result[uid] = append(result[uid], terms...)
podsToMatchingAntiAffinityTerms[uid] = append(podsToMatchingAntiAffinityTerms[uid], terms...)
}
}
appendTopologyValuesMatchingPods := func(toAppend map[string][]string) {
lock.Lock()
defer lock.Unlock()
for topologyValue, pods := range toAppend {
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...)
}
}
catchError := func(err error) {
lock.Lock()
defer lock.Unlock()
@ -1277,7 +1288,9 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
catchError(fmt.Errorf("node not found"))
return
}
nodeResult := make(map[string][]matchingPodAntiAffinityTerm)
nodePodsToMatchingAntiAffinityTerms := make(map[string][]matchingPodAntiAffinityTerm)
nodeTopologyValuesToMatchingPods := make(map[string][]string)
for _, existingPod := range nodeInfo.PodsWithAffinity() {
affinity := existingPod.Spec.Affinity
if affinity == nil {
@ -1292,40 +1305,55 @@ func getMatchingAntiAffinityTerms(pod *v1.Pod, nodeInfoMap map[string]*scheduler
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
existingPodFullName := schedutil.GetPodFullName(existingPod)
nodeResult[existingPodFullName] = append(
nodeResult[existingPodFullName],
nodePodsToMatchingAntiAffinityTerms[existingPodFullName] = append(
nodePodsToMatchingAntiAffinityTerms[existingPodFullName],
matchingPodAntiAffinityTerm{term: &term, node: node})
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
nodeTopologyValuesToMatchingPods[topologyValue] = append(nodeTopologyValuesToMatchingPods[topologyValue], existingPodFullName)
}
}
}
}
if len(nodeResult) > 0 {
appendResult(nodeResult)
if len(nodePodsToMatchingAntiAffinityTerms) > 0 {
appendPodsMatchingAntiAffinityTerms(nodePodsToMatchingAntiAffinityTerms)
}
if len(nodeTopologyValuesToMatchingPods) > 0 {
appendTopologyValuesMatchingPods(nodeTopologyValuesToMatchingPods)
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
return result, firstError
return podsToMatchingAntiAffinityTerms, topologyValuesToMatchingPods, firstError
}
func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, error) {
var result []matchingPodAntiAffinityTerm
func getMatchingAntiAffinityTermsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) ([]matchingPodAntiAffinityTerm, map[string][]string, error) {
var podMatchingTerms []matchingPodAntiAffinityTerm
topologyValuesToMatchingPods := make(map[string][]string)
affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil {
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
return nil, err
return nil, nil, err
}
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
result = append(result, matchingPodAntiAffinityTerm{term: &term, node: node})
podMatchingTerms = append(podMatchingTerms, matchingPodAntiAffinityTerm{term: &term, node: node})
existingPodFullName := schedutil.GetPodFullName(existingPod)
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], existingPodFullName)
}
}
}
}
return result, nil
return podMatchingTerms, topologyValuesToMatchingPods, nil
}
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, error) {
func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods []*v1.Pod) (map[string][]matchingPodAntiAffinityTerm, map[string][]string, error) {
result := make(map[string][]matchingPodAntiAffinityTerm)
topologyValuesToMatchingPods := make(map[string][]string)
for _, existingPod := range allPods {
affinity := existingPod.Spec.Affinity
if affinity != nil && affinity.PodAntiAffinity != nil {
@ -1335,19 +1363,22 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTerms(pod *v1.Pod, allPods [
glog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
continue
}
return nil, err
return nil, nil, err
}
existingPodMatchingTerms, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode)
existingPodMatchingTerms, podTopologyValuesToMatchingPods, err := getMatchingAntiAffinityTermsOfExistingPod(pod, existingPod, existingPodNode)
if err != nil {
return nil, err
return nil, nil, err
}
if len(existingPodMatchingTerms) > 0 {
existingPodFullName := schedutil.GetPodFullName(existingPod)
result[existingPodFullName] = existingPodMatchingTerms
}
for topologyValue, pods := range podTopologyValuesToMatchingPods {
topologyValuesToMatchingPods[topologyValue] = append(topologyValuesToMatchingPods[topologyValue], pods...)
}
}
}
return result, nil
return result, topologyValuesToMatchingPods, nil
}
// Checks if scheduling the pod onto this node would break any anti-affinity
@ -1358,8 +1389,11 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
}
var matchingTerms map[string][]matchingPodAntiAffinityTerm
var topologyValuesToMatchingPods map[string][]string
if predicateMeta, ok := meta.(*predicateMetadata); ok {
matchingTerms = predicateMeta.matchingAntiAffinityTerms
topologyValuesToMatchingPods = predicateMeta.topologyValueToAntiAffinityPods
} else {
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
// present in nodeInfo. Pods on other nodes pass the filter.
@ -1369,24 +1403,31 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
glog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
if matchingTerms, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil {
if matchingTerms, topologyValuesToMatchingPods, err = c.getMatchingAntiAffinityTerms(pod, filteredPods); err != nil {
errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
glog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
}
for _, terms := range matchingTerms {
for i := range terms {
term := &terms[i]
if len(term.term.TopologyKey) == 0 {
errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
glog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
// Iterate over topology values, to get matching pods and get their matching terms to check for same topolgy key
// currently ignored if predicateMetadata is not precomputed
for _, topologyValue := range node.Labels {
potentialPods := topologyValuesToMatchingPods[topologyValue]
for _, matchingPod := range potentialPods {
podTerms := matchingTerms[matchingPod]
for i := range podTerms {
term := &podTerms[i]
if len(term.term.TopologyKey) == 0 {
errMessage := fmt.Sprintf("Empty topologyKey is not allowed except for PreferredDuringScheduling pod anti-affinity")
glog.Error(errMessage)
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
}
if priorityutil.NodesHaveSameTopologyKey(node, term.node, term.term.TopologyKey) {
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
podName(pod), node.Name, term.term)
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
}
}
}
}