Use runtime.NumCPU() instead of a fixed value for parallel scheduler threads.

pull/564/head
Bobby (Babak) Salamat 2019-02-11 13:37:30 -08:00
parent f7c4389b79
commit d0ebeefbc4
3 changed files with 9 additions and 6 deletions

View File

@ -19,6 +19,7 @@ package predicates
import (
"context"
"fmt"
"runtime"
"sync"
"k8s.io/klog"
@ -415,7 +416,7 @@ func getTPMapMatchingExistingAntiAffinity(pod *v1.Pod, nodeInfoMap map[string]*s
appendTopologyPairsMaps(existingPodTopologyMaps)
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
workqueue.ParallelizeUntil(context.TODO(), runtime.NumCPU(), len(allNodeNames), processNode)
return topologyMaps, firstError
}
@ -503,7 +504,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
workqueue.ParallelizeUntil(context.TODO(), runtime.NumCPU(), len(allNodeNames), processNode)
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
}

View File

@ -18,6 +18,7 @@ package priorities
import (
"context"
"runtime"
"sync"
"k8s.io/api/core/v1"
@ -211,7 +212,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
}
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
workqueue.ParallelizeUntil(context.TODO(), runtime.NumCPU(), len(allNodeNames), processNode)
if pm.firstError != nil {
return nil, pm.firstError
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"runtime"
"sort"
"strings"
"sync"
@ -487,7 +488,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
workqueue.ParallelizeUntil(ctx, runtime.NumCPU(), int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
@ -693,7 +694,7 @@ func PrioritizeNodes(
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
workqueue.ParallelizeUntil(context.TODO(), runtime.NumCPU(), len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
@ -941,7 +942,7 @@ func selectNodesForPreemption(pod *v1.Pod,
resultLock.Unlock()
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
workqueue.ParallelizeUntil(context.TODO(), runtime.NumCPU(), len(potentialNodes), checkNode)
return nodeToVictims, nil
}