mirror of https://github.com/k3s-io/k3s
refactor function CalculateAntiAffinityPriority by using map/reduce pattern
@ -44,11 +44,12 @@ func NewPriorityMetadataFactory(serviceLister algorithm.ServiceLister, controlle
// priorityMetadata is a type that is passed as metadata for priority functions
type priorityMetadata struct {
nonZeroRequest *schedulercache.Resource
podTolerations []v1.Toleration
affinity *v1.Affinity
podSelectors []labels.Selector
controllerRef *metav1.OwnerReference
nonZeroRequest *schedulercache.Resource
podTolerations []v1.Toleration
affinity *v1.Affinity
podSelectors []labels.Selector
controllerRef *metav1.OwnerReference
podFirstServiceSelector labels.Selector
// PriorityMetadata is a MetadataProducer. Node info can be nil.
@ -57,30 +58,40 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo
if pod == nil {
return nil
tolerationsPreferNoSchedule := getAllTolerationPreferNoSchedule(pod.Spec.Tolerations)
podSelectors := getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister)
return &priorityMetadata{
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: tolerationsPreferNoSchedule,
affinity: pod.Spec.Affinity,
podSelectors: podSelectors,
controllerRef: priorityutil.GetControllerRef(pod),
nonZeroRequest: getNonZeroRequests(pod),
podTolerations: getAllTolerationPreferNoSchedule(pod.Spec.Tolerations),
affinity: pod.Spec.Affinity,
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
controllerRef: priorityutil.GetControllerRef(pod),
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
// getFirstServiceSelector returns one selector of services the given pod.
func getFirstServiceSelector(pod *v1.Pod, sl algorithm.ServiceLister) (firstServiceSelector labels.Selector) {
if services, err := sl.GetPodServices(pod); err == nil && len(services) > 0 {
return labels.SelectorFromSet(services[0].Spec.Selector)
return nil
// getSelectors returns selectors of services, RCs and RSs matching the given pod.
func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.ControllerLister, rsl algorithm.ReplicaSetLister, ssl algorithm.StatefulSetLister) []labels.Selector {
var selectors []labels.Selector
if services, err := sl.GetPodServices(pod); err == nil {
for _, service := range services {
selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector))
if rcs, err := cl.GetPodControllers(pod); err == nil {
for _, rc := range rcs {
selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector))
if rss, err := rsl.GetPodReplicaSets(pod); err == nil {
for _, rs := range rss {
if selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector); err == nil {
@ -88,6 +99,7 @@ func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.Controll
if sss, err := ssl.GetPodStatefulSets(pod); err == nil {
for _, ss := range sss {
if selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector); err == nil {
@ -95,5 +107,6 @@ func getSelectors(pod *v1.Pod, sl algorithm.ServiceLister, cl algorithm.Controll
return selectors
@ -177,13 +177,13 @@ type ServiceAntiAffinity struct {
label string
func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) algorithm.PriorityFunction {
func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, label string) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
antiAffinity := &ServiceAntiAffinity{
podLister: podLister,
serviceLister: serviceLister,
label: label,
return antiAffinity.CalculateAntiAffinityPriority
return antiAffinity.CalculateAntiAffinityPriorityMap, antiAffinity.CalculateAntiAffinityPriorityReduce
// Classifies nodes into ones with labels and without labels.
@ -201,52 +201,79 @@ func (s *ServiceAntiAffinity) getNodeClassificationByLabels(nodes []*v1.Node) (m
return labeledNodes, nonLabeledNodes
// CalculateAntiAffinityPriority spreads pods by minimizing the number of pods belonging to the same service
// on machines with the same value for a particular label.
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
var nsServicePods []*v1.Pod
if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
pods, err := s.podLister.List(selector)
if err != nil {
return nil, err
// consider only the pods that belong to the same namespace
for _, nsPod := range pods {
if nsPod.Namespace == pod.Namespace {
nsServicePods = append(nsServicePods, nsPod)
// filteredPod get pods based on namespace and selector
func filteredPod(namespace string, selector labels.Selector, nodeInfo *schedulercache.NodeInfo) (pods []*v1.Pod) {
if nodeInfo.Pods() == nil || len(nodeInfo.Pods()) == 0 || selector == nil {
return []*v1.Pod{}
for _, pod := range nodeInfo.Pods() {
if namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
// separate out the nodes that have the label from the ones that don't
labeledNodes, nonLabeledNodes := s.getNodeClassificationByLabels(nodes)
// CalculateAntiAffinityPriorityMap spreads pods by minimizing the number of pods belonging to the same service
// on given machine
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
var firstServiceSelector labels.Selector
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
priorityMeta, ok := meta.(*priorityMetadata)
if ok {
firstServiceSelector = priorityMeta.podFirstServiceSelector
} else {
firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
//pods matched namespace,selector on current node
matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
return schedulerapi.HostPriority{
Host: node.Name,
Score: int(len(matchedPodsOfNode)),
}, nil
// CalculateAntiAffinityPriorityReduce computes each node score with the same value for a particular label.
// The label to be considered is provided to the struct (ServiceAntiAffinity).
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityReduce(pod *v1.Pod, meta interface{}, nodeNameToInfo map[string]*schedulercache.NodeInfo, result schedulerapi.HostPriorityList) error {
var numServicePods int
var label string
podCounts := map[string]int{}
for _, pod := range nsServicePods {
label, exists := labeledNodes[pod.Spec.NodeName]
if !exists {
labelNodesStatus := map[string]string{}
maxPriorityFloat64 := float64(schedulerapi.MaxPriority)
for _, hostPriority := range result {
numServicePods += hostPriority.Score
if !labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Has(s.label) {
label = labels.Set(nodeNameToInfo[hostPriority.Host].Node().Labels).Get(s.label)
labelNodesStatus[hostPriority.Host] = label
podCounts[label] += hostPriority.Score
numServicePods := len(nsServicePods)
result := []schedulerapi.HostPriority{}
//score int - scale of 0-maxPriority
// 0 being the lowest priority and maxPriority being the highest
for node := range labeledNodes {
// initializing to the default/max node score of maxPriority
fScore := float64(schedulerapi.MaxPriority)
if numServicePods > 0 {
fScore = float64(schedulerapi.MaxPriority) * (float64(numServicePods-podCounts[labeledNodes[node]]) / float64(numServicePods))
for i, hostPriority := range result {
label, ok := labelNodesStatus[hostPriority.Host]
if !ok {
result[i].Host = hostPriority.Host
result[i].Score = int(0)
result = append(result, schedulerapi.HostPriority{Host: node, Score: int(fScore)})
// initializing to the default/max node score of maxPriority
fScore := maxPriorityFloat64
if numServicePods > 0 {
fScore = maxPriorityFloat64 * (float64(numServicePods-podCounts[label]) / float64(numServicePods))
result[i].Host = hostPriority.Host
result[i].Score = int(fScore)
// add the open nodes with a score of 0
for _, node := range nonLabeledNodes {
result = append(result, schedulerapi.HostPriority{Host: node, Score: 0})
return result, nil
return nil
@ -757,19 +757,33 @@ func TestZoneSpreadPriority(t *testing.T) {
test: "service pod on non-zoned node",
// these local variables just make sure controllerLister\replicaSetLister\statefulSetLister not nil
// when construct mataDataProducer
sss := []*apps.StatefulSet{{Spec: apps.StatefulSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
rcs := []*v1.ReplicationController{{Spec: v1.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}
rss := []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nil)
for i, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, makeLabeledNodeList(test.nodes))
zoneSpread := ServiceAntiAffinity{podLister: schedulertesting.FakePodLister(test.pods), serviceLister: schedulertesting.FakeServiceLister(test.services), label: "zone"}
list, err := zoneSpread.CalculateAntiAffinityPriority(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes))
mataDataProducer := NewPriorityMetadataFactory(
mataData := mataDataProducer(test.pod, nodeNameToInfo)
ttp := priorityFunction(zoneSpread.CalculateAntiAffinityPriorityMap, zoneSpread.CalculateAntiAffinityPriorityReduce, mataData)
list, err := ttp(test.pod, nodeNameToInfo, makeLabeledNodeList(test.nodes))
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %v index : %d", err, i)
// sort the two lists to avoid failures on account of different ordering
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
t.Errorf("test index %d (%s): expected %#v, got %#v", i, test.test, test.expectedList, list)
@ -305,7 +305,7 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
if policy.Argument != nil {
if policy.Argument.ServiceAntiAffinity != nil {
pcf = &PriorityConfigFactory{
Function: func(args PluginFactoryArgs) algorithm.PriorityFunction {
MapReduceFunction: func(args PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewServiceAntiAffinityPriority(
Reference in New Issue