mirror of https://github.com/k3s-io/k3s
created struct for topologyPairs maps
parent
f6659e4543
commit
0f4c3064fd
|
@ -51,6 +51,13 @@ type matchingPodAntiAffinityTerm struct {
|
||||||
node *v1.Node
|
node *v1.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// topologyPairsMaps keeps topologyPairToAntiAffinityPods and antiAffinityPodToTopologyPairs in sync
|
||||||
|
// as they are the inverse of each others.
|
||||||
|
type topologyPairsMaps struct {
|
||||||
|
topologyPairToPods map[topologyPair][]*v1.Pod
|
||||||
|
podToTopologyPairs map[string][]topologyPair
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: When new fields are added/removed or logic is changed, please make sure that
|
// NOTE: When new fields are added/removed or logic is changed, please make sure that
|
||||||
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
|
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
|
||||||
type predicateMetadata struct {
|
type predicateMetadata struct {
|
||||||
|
@ -58,11 +65,8 @@ type predicateMetadata struct {
|
||||||
podBestEffort bool
|
podBestEffort bool
|
||||||
podRequest *schedulercache.Resource
|
podRequest *schedulercache.Resource
|
||||||
podPorts []*v1.ContainerPort
|
podPorts []*v1.ContainerPort
|
||||||
// A map of antiffinity terms' topology pairs to the pods'
|
|
||||||
// that can potentially match the affinity rules of the pod
|
topologyPairsAntiAffinityPodsMap *topologyPairsMaps
|
||||||
topologyPairToAntiAffinityPods map[topologyPair][]*v1.Pod
|
|
||||||
// Reverse map for topologyPairToAntiAffinityPods to reduce deletion time
|
|
||||||
antiAffinityPodToTopologyPairs map[string][]topologyPair
|
|
||||||
// A map of node name to a list of Pods on the node that can potentially match
|
// A map of node name to a list of Pods on the node that can potentially match
|
||||||
// the affinity rules of the "pod".
|
// the affinity rules of the "pod".
|
||||||
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
|
nodeNameToMatchingAffinityPods map[string][]*v1.Pod
|
||||||
|
@ -122,7 +126,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
||||||
if pod == nil {
|
if pod == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
podToTopolgyPair, topologyPairToPods, err := getMatchingTopologyPairs(pod, nodeNameToInfoMap)
|
topologyPairsMaps, err := getMatchingTopologyPairs(pod, nodeNameToInfoMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -138,8 +142,7 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
||||||
podPorts: schedutil.GetContainerPorts(pod),
|
podPorts: schedutil.GetContainerPorts(pod),
|
||||||
nodeNameToMatchingAffinityPods: affinityPods,
|
nodeNameToMatchingAffinityPods: affinityPods,
|
||||||
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
|
nodeNameToMatchingAntiAffinityPods: antiAffinityPods,
|
||||||
topologyPairToAntiAffinityPods: topologyPairToPods,
|
topologyPairsAntiAffinityPodsMap: topologyPairsMaps,
|
||||||
antiAffinityPodToTopologyPairs: podToTopolgyPair,
|
|
||||||
}
|
}
|
||||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||||
glog.V(10).Infof("Precompute: %v", predicateName)
|
glog.V(10).Infof("Precompute: %v", predicateName)
|
||||||
|
@ -148,6 +151,47 @@ func (pfactory *PredicateMetadataFactory) GetMetadata(pod *v1.Pod, nodeNameToInf
|
||||||
return predicateMetadata
|
return predicateMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (topologyPairsMaps *topologyPairsMaps) AddTopologyPair(pair topologyPair, pod *v1.Pod) {
|
||||||
|
found := false
|
||||||
|
for _, existingPod := range topologyPairsMaps.topologyPairToPods[pair] {
|
||||||
|
if existingPod == pod {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
topologyPairsMaps.topologyPairToPods[pair] = append(topologyPairsMaps.topologyPairToPods[pair], pod)
|
||||||
|
topologyPairsMaps.podToTopologyPairs[schedutil.GetPodFullName(pod)] = append(topologyPairsMaps.podToTopologyPairs[schedutil.GetPodFullName(pod)], pair)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (topologyPairsMaps *topologyPairsMaps) RemovePod(podName string) {
|
||||||
|
for _, pair := range topologyPairsMaps.podToTopologyPairs[podName] {
|
||||||
|
for index, pod := range topologyPairsMaps.topologyPairToPods[pair] {
|
||||||
|
if schedutil.GetPodFullName(pod) == podName {
|
||||||
|
podsList := topologyPairsMaps.topologyPairToPods[pair]
|
||||||
|
podsList[index] = podsList[len(podsList)-1]
|
||||||
|
if len(podsList) <= 1 {
|
||||||
|
delete(topologyPairsMaps.topologyPairToPods, pair)
|
||||||
|
} else {
|
||||||
|
topologyPairsMaps.topologyPairToPods[pair] = podsList[:len(podsList)-1]
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(topologyPairsMaps.podToTopologyPairs, podName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (topologyPairsMaps *topologyPairsMaps) appendMaps(toAppend *topologyPairsMaps) {
|
||||||
|
for pod, pairs := range toAppend.podToTopologyPairs {
|
||||||
|
topologyPairsMaps.podToTopologyPairs[pod] = append(topologyPairsMaps.podToTopologyPairs[pod], pairs...)
|
||||||
|
}
|
||||||
|
for pair, pods := range toAppend.topologyPairToPods {
|
||||||
|
topologyPairsMaps.topologyPairToPods[pair] = append(topologyPairsMaps.topologyPairToPods[pair], pods...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
|
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
|
||||||
// deleted from the system.
|
// deleted from the system.
|
||||||
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
|
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
|
||||||
|
@ -155,22 +199,7 @@ func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod) error {
|
||||||
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
|
if deletedPodFullName == schedutil.GetPodFullName(meta.pod) {
|
||||||
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
return fmt.Errorf("deletedPod and meta.pod must not be the same")
|
||||||
}
|
}
|
||||||
// Delete pod from matching topology pairs map
|
meta.topologyPairsAntiAffinityPodsMap.RemovePod(deletedPodFullName)
|
||||||
for _, pair := range meta.antiAffinityPodToTopologyPairs[deletedPodFullName] {
|
|
||||||
for index, pod := range meta.topologyPairToAntiAffinityPods[pair] {
|
|
||||||
if schedutil.GetPodFullName(pod) == deletedPodFullName {
|
|
||||||
podsList := meta.topologyPairToAntiAffinityPods[pair]
|
|
||||||
podsList[index] = podsList[len(podsList)-1]
|
|
||||||
if len(podsList) <= 1 {
|
|
||||||
delete(meta.topologyPairToAntiAffinityPods, pair)
|
|
||||||
} else {
|
|
||||||
meta.topologyPairToAntiAffinityPods[pair] = podsList[:len(podsList)-1]
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
delete(meta.antiAffinityPodToTopologyPairs, deletedPodFullName)
|
|
||||||
// Delete pod from the matching affinity or anti-affinity pods if exists.
|
// Delete pod from the matching affinity or anti-affinity pods if exists.
|
||||||
affinity := meta.pod.Spec.Affinity
|
affinity := meta.pod.Spec.Affinity
|
||||||
podNodeName := deletedPod.Spec.NodeName
|
podNodeName := deletedPod.Spec.NodeName
|
||||||
|
@ -227,17 +256,12 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
|
||||||
return fmt.Errorf("invalid node in nodeInfo")
|
return fmt.Errorf("invalid node in nodeInfo")
|
||||||
}
|
}
|
||||||
// Add matching anti-affinity terms of the addedPod to the map.
|
// Add matching anti-affinity terms of the addedPod to the map.
|
||||||
matchingPodToTopologyPairs, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
|
topologyPairsMaps, err := getMatchingTopologyPairsOfExistingPod(meta.pod, addedPod, nodeInfo.Node())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(matchingPodToTopologyPairs) > 0 {
|
if len(topologyPairsMaps.podToTopologyPairs) > 0 {
|
||||||
for pair, pods := range podTopologyPairToMatchingPods {
|
meta.topologyPairsAntiAffinityPodsMap.appendMaps(topologyPairsMaps)
|
||||||
meta.topologyPairToAntiAffinityPods[pair] = append(meta.topologyPairToAntiAffinityPods[pair], pods...)
|
|
||||||
}
|
|
||||||
for pod, pairs := range matchingPodToTopologyPairs {
|
|
||||||
meta.antiAffinityPodToTopologyPairs[pod] = append(meta.antiAffinityPodToTopologyPairs[pod], pairs...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
|
// Add the pod to nodeNameToMatchingAffinityPods and nodeNameToMatchingAntiAffinityPods if needed.
|
||||||
affinity := meta.pod.Spec.Affinity
|
affinity := meta.pod.Spec.Affinity
|
||||||
|
@ -284,11 +308,12 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
|
||||||
// its maps and slices, but it does not copy the contents of pointer values.
|
// its maps and slices, but it does not copy the contents of pointer values.
|
||||||
func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
||||||
newPredMeta := &predicateMetadata{
|
newPredMeta := &predicateMetadata{
|
||||||
pod: meta.pod,
|
pod: meta.pod,
|
||||||
podBestEffort: meta.podBestEffort,
|
podBestEffort: meta.podBestEffort,
|
||||||
podRequest: meta.podRequest,
|
podRequest: meta.podRequest,
|
||||||
serviceAffinityInUse: meta.serviceAffinityInUse,
|
serviceAffinityInUse: meta.serviceAffinityInUse,
|
||||||
ignoredExtendedResources: meta.ignoredExtendedResources,
|
ignoredExtendedResources: meta.ignoredExtendedResources,
|
||||||
|
topologyPairsAntiAffinityPodsMap: meta.topologyPairsAntiAffinityPodsMap,
|
||||||
}
|
}
|
||||||
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
||||||
newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod)
|
newPredMeta.nodeNameToMatchingAffinityPods = make(map[string][]*v1.Pod)
|
||||||
|
@ -299,14 +324,9 @@ func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
||||||
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
|
for k, v := range meta.nodeNameToMatchingAntiAffinityPods {
|
||||||
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
newPredMeta.nodeNameToMatchingAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
||||||
}
|
}
|
||||||
newPredMeta.topologyPairToAntiAffinityPods = make(map[topologyPair][]*v1.Pod)
|
newPredMeta.topologyPairsAntiAffinityPodsMap = &topologyPairsMaps{topologyPairToPods: make(map[topologyPair][]*v1.Pod),
|
||||||
for k, v := range meta.topologyPairToAntiAffinityPods {
|
podToTopologyPairs: make(map[string][]topologyPair)}
|
||||||
newPredMeta.topologyPairToAntiAffinityPods[k] = append([]*v1.Pod(nil), v...)
|
newPredMeta.topologyPairsAntiAffinityPodsMap.appendMaps(meta.topologyPairsAntiAffinityPodsMap)
|
||||||
}
|
|
||||||
newPredMeta.antiAffinityPodToTopologyPairs = make(map[string][]topologyPair)
|
|
||||||
for k, v := range meta.antiAffinityPodToTopologyPairs {
|
|
||||||
newPredMeta.antiAffinityPodToTopologyPairs[k] = append([]topologyPair(nil), v...)
|
|
||||||
}
|
|
||||||
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
|
newPredMeta.serviceAffinityMatchingPodServices = append([]*v1.Service(nil),
|
||||||
meta.serviceAffinityMatchingPodServices...)
|
meta.serviceAffinityMatchingPodServices...)
|
||||||
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
|
newPredMeta.serviceAffinityMatchingPodList = append([]*v1.Pod(nil),
|
||||||
|
|
|
@ -28,39 +28,30 @@ import (
|
||||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
// sortableAntiAffinityTerms lets us to sort anti-affinity terms.
|
// sortableTopologyPairs lets us sort topology pairs
|
||||||
type sortableAntiAffinityTerms []matchingPodAntiAffinityTerm
|
type sortableTopologyPairs []topologyPair
|
||||||
|
|
||||||
// Less establishes some ordering between two matchingPodAntiAffinityTerms for
|
// Less establishes some ordering between two topologyPairs for sorting.
|
||||||
// sorting.
|
func (s sortableTopologyPairs) Less(i, j int) bool {
|
||||||
func (s sortableAntiAffinityTerms) Less(i, j int) bool {
|
|
||||||
t1, t2 := s[i], s[j]
|
t1, t2 := s[i], s[j]
|
||||||
if t1.node.Name != t2.node.Name {
|
return t1.key < t2.key || (t1.key == t2.key && t1.value < t2.value)
|
||||||
return t1.node.Name < t2.node.Name
|
|
||||||
}
|
|
||||||
if len(t1.term.Namespaces) != len(t2.term.Namespaces) {
|
|
||||||
return len(t1.term.Namespaces) < len(t2.term.Namespaces)
|
|
||||||
}
|
|
||||||
if t1.term.TopologyKey != t2.term.TopologyKey {
|
|
||||||
return t1.term.TopologyKey < t2.term.TopologyKey
|
|
||||||
}
|
|
||||||
if len(t1.term.LabelSelector.MatchLabels) != len(t2.term.LabelSelector.MatchLabels) {
|
|
||||||
return len(t1.term.LabelSelector.MatchLabels) < len(t2.term.LabelSelector.MatchLabels)
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
func (s sortableAntiAffinityTerms) Len() int { return len(s) }
|
|
||||||
func (s sortableAntiAffinityTerms) Swap(i, j int) {
|
|
||||||
s[i], s[j] = s[j], s[i]
|
|
||||||
}
|
}
|
||||||
|
func (s sortableTopologyPairs) Len() int { return len(s) }
|
||||||
|
func (s sortableTopologyPairs) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||||
|
|
||||||
var _ = sort.Interface(sortableAntiAffinityTerms{})
|
var _ = sort.Interface(sortableTopologyPairs{})
|
||||||
|
|
||||||
func sortAntiAffinityTerms(terms map[string][]matchingPodAntiAffinityTerm) {
|
func sortTopologyPairs(pairs map[string][]topologyPair) {
|
||||||
for k, v := range terms {
|
for k, v := range pairs {
|
||||||
sortableTerms := sortableAntiAffinityTerms(v)
|
sortableTerms := sortableTopologyPairs(v)
|
||||||
sort.Sort(sortableTerms)
|
sort.Sort(sortableTerms)
|
||||||
terms[k] = sortableTerms
|
pairs[k] = sortableTerms
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func sortTopologyPairPods(np map[topologyPair][]*v1.Pod) {
|
||||||
|
for _, pl := range np {
|
||||||
|
sortablePods := sortablePods(pl)
|
||||||
|
sort.Sort(sortablePods)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +114,18 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
|
||||||
if !reflect.DeepEqual(meta1.nodeNameToMatchingAntiAffinityPods, meta2.nodeNameToMatchingAntiAffinityPods) {
|
if !reflect.DeepEqual(meta1.nodeNameToMatchingAntiAffinityPods, meta2.nodeNameToMatchingAntiAffinityPods) {
|
||||||
return fmt.Errorf("nodeNameToMatchingAntiAffinityPods are not euqal")
|
return fmt.Errorf("nodeNameToMatchingAntiAffinityPods are not euqal")
|
||||||
}
|
}
|
||||||
|
sortTopologyPairs(meta1.topologyPairsAntiAffinityPodsMap.podToTopologyPairs)
|
||||||
|
sortTopologyPairs(meta2.topologyPairsAntiAffinityPodsMap.podToTopologyPairs)
|
||||||
|
if !reflect.DeepEqual(meta1.topologyPairsAntiAffinityPodsMap.podToTopologyPairs,
|
||||||
|
meta2.topologyPairsAntiAffinityPodsMap.podToTopologyPairs) {
|
||||||
|
return fmt.Errorf("topologyPairsAntiAffinityPodsMap.antiAffinityPodToTopologyPairs are not equal")
|
||||||
|
}
|
||||||
|
sortTopologyPairPods(meta1.topologyPairsAntiAffinityPodsMap.topologyPairToPods)
|
||||||
|
sortTopologyPairPods(meta2.topologyPairsAntiAffinityPodsMap.topologyPairToPods)
|
||||||
|
if !reflect.DeepEqual(meta1.topologyPairsAntiAffinityPodsMap.topologyPairToPods,
|
||||||
|
meta2.topologyPairsAntiAffinityPodsMap.topologyPairToPods) {
|
||||||
|
return fmt.Errorf("topologyPairsAntiAffinityPodsMap.topologyPairToAntiAffinityPods are not equal")
|
||||||
|
}
|
||||||
if meta1.serviceAffinityInUse {
|
if meta1.serviceAffinityInUse {
|
||||||
sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList)
|
sortablePods1 := sortablePods(meta1.serviceAffinityMatchingPodList)
|
||||||
sort.Sort(sortablePods1)
|
sort.Sort(sortablePods1)
|
||||||
|
@ -460,24 +463,26 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||||
HostIP: "1.2.3.4",
|
HostIP: "1.2.3.4",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
topologyPairToAntiAffinityPods: map[topologyPair][]*v1.Pod{
|
topologyPairsAntiAffinityPodsMap: &topologyPairsMaps{
|
||||||
{key: "name", value: "machine1"}: {
|
topologyPairToPods: map[topologyPair][]*v1.Pod{
|
||||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1},
|
{key: "name", value: "machine1"}: {
|
||||||
Spec: v1.PodSpec{NodeName: "nodeC"},
|
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p2", Labels: selector1},
|
||||||
|
Spec: v1.PodSpec{NodeName: "nodeC"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{key: "name", value: "machine2"}: {
|
||||||
|
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
||||||
|
Spec: v1.PodSpec{NodeName: "nodeA"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{key: "name", value: "machine2"}: {
|
podToTopologyPairs: map[string][]topologyPair{
|
||||||
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", Labels: selector1},
|
"p2": {
|
||||||
Spec: v1.PodSpec{NodeName: "nodeA"},
|
topologyPair{key: "name", value: "machine1"},
|
||||||
|
},
|
||||||
|
"p1": {
|
||||||
|
topologyPair{key: "name", value: "machine2"},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
},
|
|
||||||
antiAffinityPodToTopologyPairs: map[string][]topologyPair{
|
|
||||||
"p2": {
|
|
||||||
topologyPair{key: "name", value: "machine1"},
|
|
||||||
},
|
|
||||||
"p1": {
|
|
||||||
topologyPair{key: "name", value: "machine2"},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
|
nodeNameToMatchingAffinityPods: map[string][]*v1.Pod{
|
||||||
|
|
|
@ -1246,7 +1246,7 @@ func GetPodAntiAffinityTerms(podAntiAffinity *v1.PodAntiAffinity) (terms []v1.Po
|
||||||
return terms
|
return terms
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) (*topologyPairsMaps, error) {
|
||||||
allNodeNames := make([]string, 0, len(nodeInfoMap))
|
allNodeNames := make([]string, 0, len(nodeInfoMap))
|
||||||
for name := range nodeInfoMap {
|
for name := range nodeInfoMap {
|
||||||
allNodeNames = append(allNodeNames, name)
|
allNodeNames = append(allNodeNames, name)
|
||||||
|
@ -1255,23 +1255,13 @@ func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercach
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
var firstError error
|
var firstError error
|
||||||
|
|
||||||
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
topologyMaps := &topologyPairsMaps{topologyPairToPods: make(map[topologyPair][]*v1.Pod),
|
||||||
matchingPodToTopologyPair := make(map[string][]topologyPair)
|
podToTopologyPairs: make(map[string][]topologyPair)}
|
||||||
|
|
||||||
appendTopologyPairToMatchingPods := func(toAppend map[topologyPair][]*v1.Pod) {
|
appendTopologyPairsMaps := func(toAppend *topologyPairsMaps) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
for pair, pods := range toAppend {
|
topologyMaps.appendMaps(toAppend)
|
||||||
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
appendMatchingPodToTopologyPair := func(toAppend map[string][]topologyPair) {
|
|
||||||
lock.Lock()
|
|
||||||
defer lock.Unlock()
|
|
||||||
for pod, pairs := range toAppend {
|
|
||||||
matchingPodToTopologyPair[pod] = append(matchingPodToTopologyPair[pod], pairs...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catchError := func(err error) {
|
catchError := func(err error) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
|
@ -1288,8 +1278,8 @@ func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercach
|
||||||
catchError(fmt.Errorf("node not found"))
|
catchError(fmt.Errorf("node not found"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
nodeTopologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
nodeTopologyMaps := &topologyPairsMaps{topologyPairToPods: make(map[topologyPair][]*v1.Pod),
|
||||||
nodeMatchingPodToTopologyPairs := make(map[string][]topologyPair)
|
podToTopologyPairs: make(map[string][]topologyPair)}
|
||||||
for _, existingPod := range nodeInfo.PodsWithAffinity() {
|
for _, existingPod := range nodeInfo.PodsWithAffinity() {
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity == nil {
|
if affinity == nil {
|
||||||
|
@ -1305,50 +1295,44 @@ func getMatchingTopologyPairs(pod *v1.Pod, nodeInfoMap map[string]*schedulercach
|
||||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
|
if priorityutil.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
|
||||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||||
nodeTopologyPairToMatchingPods[pair] = append(nodeTopologyPairToMatchingPods[pair], existingPod)
|
nodeTopologyMaps.AddTopologyPair(pair, existingPod)
|
||||||
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
|
||||||
nodeMatchingPodToTopologyPairs[existingPodFullName] = append(nodeMatchingPodToTopologyPairs[existingPodFullName], pair)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if len(nodeTopologyMaps.podToTopologyPairs) > 0 {
|
||||||
if len(nodeTopologyPairToMatchingPods) > 0 {
|
appendTopologyPairsMaps(nodeTopologyMaps)
|
||||||
appendTopologyPairToMatchingPods(nodeTopologyPairToMatchingPods)
|
}
|
||||||
appendMatchingPodToTopologyPair(nodeMatchingPodToTopologyPairs)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
workqueue.Parallelize(16, len(allNodeNames), processNode)
|
workqueue.Parallelize(16, len(allNodeNames), processNode)
|
||||||
return matchingPodToTopologyPair, topologyPairToMatchingPods, firstError
|
return topologyMaps, firstError
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMatchingTopologyPairsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
func getMatchingTopologyPairsOfExistingPod(newPod *v1.Pod, existingPod *v1.Pod, node *v1.Node) (*topologyPairsMaps, error) {
|
||||||
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
topologyMaps := &topologyPairsMaps{topologyPairToPods: make(map[topologyPair][]*v1.Pod),
|
||||||
matchingPodToTopologyPairs := make(map[string][]topologyPair)
|
podToTopologyPairs: make(map[string][]topologyPair)}
|
||||||
|
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity != nil && affinity.PodAntiAffinity != nil {
|
if affinity != nil && affinity.PodAntiAffinity != nil {
|
||||||
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
|
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
|
||||||
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
|
namespaces := priorityutil.GetNamespacesFromPodAffinityTerm(existingPod, &term)
|
||||||
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
|
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
|
if priorityutil.PodMatchesTermsNamespaceAndSelector(newPod, namespaces, selector) {
|
||||||
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
if topologyValue, ok := node.Labels[term.TopologyKey]; ok {
|
||||||
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
pair := topologyPair{key: term.TopologyKey, value: topologyValue}
|
||||||
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], existingPod)
|
topologyMaps.AddTopologyPair(pair, existingPod)
|
||||||
existingPodFullName := schedutil.GetPodFullName(existingPod)
|
|
||||||
matchingPodToTopologyPairs[existingPodFullName] = append(matchingPodToTopologyPairs[existingPodFullName], pair)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil
|
return topologyMaps, nil
|
||||||
}
|
}
|
||||||
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, allPods []*v1.Pod) (map[string][]topologyPair, map[topologyPair][]*v1.Pod, error) {
|
func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, allPods []*v1.Pod) (*topologyPairsMaps, error) {
|
||||||
|
topologyMaps := &topologyPairsMaps{topologyPairToPods: make(map[topologyPair][]*v1.Pod),
|
||||||
|
podToTopologyPairs: make(map[string][]topologyPair)}
|
||||||
|
|
||||||
topologyPairToMatchingPods := make(map[topologyPair][]*v1.Pod)
|
|
||||||
matchingPodToTopologyPairs := make(map[string][]topologyPair)
|
|
||||||
for _, existingPod := range allPods {
|
for _, existingPod := range allPods {
|
||||||
affinity := existingPod.Spec.Affinity
|
affinity := existingPod.Spec.Affinity
|
||||||
if affinity != nil && affinity.PodAntiAffinity != nil {
|
if affinity != nil && affinity.PodAntiAffinity != nil {
|
||||||
|
@ -1358,21 +1342,16 @@ func (c *PodAffinityChecker) getMatchingAntiAffinityTopologyPairs(pod *v1.Pod, a
|
||||||
glog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
|
glog.Errorf("Node not found, %v", existingPod.Spec.NodeName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
existingPodTopologyTerms, podTopologyPairToMatchingPods, err := getMatchingTopologyPairsOfExistingPod(pod, existingPod, existingPodNode)
|
existingPodsTopologyMaps, err := getMatchingTopologyPairsOfExistingPod(pod, existingPod, existingPodNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
|
||||||
for pair, pods := range podTopologyPairToMatchingPods {
|
|
||||||
topologyPairToMatchingPods[pair] = append(topologyPairToMatchingPods[pair], pods...)
|
|
||||||
}
|
|
||||||
for pod, pairs := range existingPodTopologyTerms {
|
|
||||||
matchingPodToTopologyPairs[pod] = append(matchingPodToTopologyPairs[pod], pairs...)
|
|
||||||
}
|
}
|
||||||
|
topologyMaps.appendMaps(existingPodsTopologyMaps)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return matchingPodToTopologyPairs, topologyPairToMatchingPods, nil
|
return topologyMaps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks if scheduling the pod onto this node would break any anti-affinity
|
// Checks if scheduling the pod onto this node would break any anti-affinity
|
||||||
|
@ -1382,10 +1361,9 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||||
if node == nil {
|
if node == nil {
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
|
return ErrExistingPodsAntiAffinityRulesNotMatch, fmt.Errorf("Node is nil")
|
||||||
}
|
}
|
||||||
var topologyPairToMatchingPods map[topologyPair][]*v1.Pod
|
var topologyMaps *topologyPairsMaps
|
||||||
|
|
||||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||||
topologyPairToMatchingPods = predicateMeta.topologyPairToAntiAffinityPods
|
topologyMaps = predicateMeta.topologyPairsAntiAffinityPodsMap
|
||||||
} else {
|
} else {
|
||||||
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
// Filter out pods whose nodeName is equal to nodeInfo.node.Name, but are not
|
||||||
// present in nodeInfo. Pods on other nodes pass the filter.
|
// present in nodeInfo. Pods on other nodes pass the filter.
|
||||||
|
@ -1395,25 +1373,19 @@ func (c *PodAffinityChecker) satisfiesExistingPodsAntiAffinity(pod *v1.Pod, meta
|
||||||
glog.Error(errMessage)
|
glog.Error(errMessage)
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||||
}
|
}
|
||||||
if _, topologyPairToMatchingPods, err = c.getMatchingAntiAffinityTopologyPairs(pod, filteredPods); err != nil {
|
if topologyMaps, err = c.getMatchingAntiAffinityTopologyPairs(pod, filteredPods); err != nil {
|
||||||
errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
|
errMessage := fmt.Sprintf("Failed to get all terms that pod %+v matches, err: %+v", podName(pod), err)
|
||||||
glog.Error(errMessage)
|
glog.Error(errMessage)
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
return ErrExistingPodsAntiAffinityRulesNotMatch, errors.New(errMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over topology topology pairs to get any of the pods being affected by
|
// Iterate over topology pairs to get any of the pods being affected by
|
||||||
// the scheduled pod anti-affinity rules
|
// the scheduled pod anti-affinity rules
|
||||||
for topologyKey, topologyValue := range node.Labels {
|
for topologyKey, topologyValue := range node.Labels {
|
||||||
if violatedPods, ok := topologyPairToMatchingPods[topologyPair{key: topologyKey, value: topologyValue}]; ok {
|
if _, ok := topologyMaps.topologyPairToPods[topologyPair{key: topologyKey, value: topologyValue}]; ok {
|
||||||
affinity := violatedPods[0].Spec.Affinity
|
glog.V(10).Infof("Cannot schedule pod %+v onto node %v", podName(pod), node.Name)
|
||||||
for _, term := range GetPodAntiAffinityTerms(affinity.PodAntiAffinity) {
|
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
|
||||||
if term.TopologyKey == topologyKey {
|
|
||||||
glog.V(10).Infof("Cannot schedule pod %+v onto node %v,because of PodAntiAffinityTerm %v",
|
|
||||||
podName(pod), node.Name, term)
|
|
||||||
return ErrExistingPodsAntiAffinityRulesNotMatch, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if glog.V(10) {
|
if glog.V(10) {
|
||||||
|
|
Loading…
Reference in New Issue