Merge pull request #28769 from wojtek-t/optimize_priorities

Automatic merge from submit-queue

Optimize priorities in scheduler

Ref #28590

It's probably easier to review it commit by commit, since those changes are kind of independent from each other.

@davidopp - FYI
pull/6/head
k8s-merge-robot 2016-07-11 07:49:23 -07:00 committed by GitHub
commit 9b74e24fa3
11 changed files with 99 additions and 113 deletions

View File

@ -470,7 +470,7 @@ func GetTaintsFromNodeAnnotations(annotations map[string]string) ([]Taint, error
}
// TolerationToleratesTaint checks if the toleration tolerates the taint.
func TolerationToleratesTaint(toleration Toleration, taint Taint) bool {
func TolerationToleratesTaint(toleration *Toleration, taint *Taint) bool {
if len(toleration.Effect) != 0 && toleration.Effect != taint.Effect {
return false
}
@ -490,10 +490,10 @@ func TolerationToleratesTaint(toleration Toleration, taint Taint) bool {
}
// TaintToleratedByTolerations checks if taint is tolerated by any of the tolerations.
func TaintToleratedByTolerations(taint Taint, tolerations []Toleration) bool {
func TaintToleratedByTolerations(taint *Taint, tolerations []Toleration) bool {
tolerated := false
for _, toleration := range tolerations {
if TolerationToleratesTaint(toleration, taint) {
for i := range tolerations {
if TolerationToleratesTaint(&tolerations[i], taint) {
tolerated = true
break
}

View File

@ -65,10 +65,10 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
return node.(*api.Node), nil
}
// podMetadata defines a type, that is an expected type that is passed
// as metadata for predicate functions
// podMetadata is a type that is passed as metadata for predicate functions
type predicateMetadata struct {
podBestEffort bool
podRequest *resourceRequest
}
func PredicateMetadata(pod *api.Pod) interface{} {
@ -78,6 +78,7 @@ func PredicateMetadata(pod *api.Pod) interface{} {
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod),
}
}
@ -405,7 +406,7 @@ type resourceRequest struct {
nvidiaGPU int64
}
func getResourceRequest(pod *api.Pod) resourceRequest {
func getResourceRequest(pod *api.Pod) *resourceRequest {
result := resourceRequest{}
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
@ -423,7 +424,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
result.milliCPU = cpu
}
}
return result
return &result
}
func CheckPodsExceedingFreeResources(pods []*api.Pod, allocatable api.ResourceList) (fitting []*api.Pod, notFittingCPU, notFittingMemory, notFittingNvidiaGPU []*api.Pod) {
@ -471,17 +472,25 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
if node == nil {
return false, fmt.Errorf("node not found")
}
allocatable := node.Status.Allocatable
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
return false,
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), allowedPodNumber)
newInsufficientResourceError(podCountResourceName, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber))
}
var podRequest *resourceRequest
predicateMeta, ok := meta.(*predicateMetadata)
if ok {
podRequest = predicateMeta.podRequest
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = getResourceRequest(pod)
}
podRequest := getResourceRequest(pod)
if podRequest.milliCPU == 0 && podRequest.memory == 0 && podRequest.nvidiaGPU == 0 {
return true, nil
}
allocatable := node.Status.Allocatable
totalMilliCPU := allocatable.Cpu().MilliValue()
totalMemory := allocatable.Memory().Value()
totalNvidiaGPU := allocatable.NvidiaGPU().Value()
@ -498,8 +507,12 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
return false,
newInsufficientResourceError(nvidiaGpuResourceName, podRequest.nvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, totalNvidiaGPU)
}
glog.V(10).Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
}
return true, nil
}
@ -758,8 +771,10 @@ func getUsedPorts(pods ...*api.Pod) map[int]bool {
// TODO: Aggregate it at the NodeInfo level.
ports := make(map[int]bool)
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, podPort := range container.Ports {
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
for k := range container.Ports {
podPort := &container.Ports[k]
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
@ -999,19 +1014,11 @@ func (checker *PodAffinityChecker) NodeMatchPodAffinityAntiAffinity(pod *api.Pod
return true
}
type TolerationMatch struct {
info NodeInfo
}
func NewTolerationMatchPredicate(info NodeInfo) algorithm.FitPredicate {
tolerationMatch := &TolerationMatch{
info: info,
}
return tolerationMatch.PodToleratesNodeTaints
}
func (t *TolerationMatch) PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
func PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found")
}
taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
@ -1040,7 +1047,8 @@ func tolerationsToleratesTaints(tolerations []api.Toleration, taints []api.Taint
return false
}
for _, taint := range taints {
for i := range taints {
taint := &taints[i]
// skip taints that have effect PreferNoSchedule, since it is for priorities
if taint.Effect == api.TaintEffectPreferNoSchedule {
continue

View File

@ -2724,10 +2724,9 @@ func TestPodToleratesTaints(t *testing.T) {
}
for _, test := range podTolerateTaintsTests {
tolerationMatch := TolerationMatch{FakeNodeInfo(test.node)}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&test.node)
fits, err := tolerationMatch.PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod), nodeInfo)
fits, err := PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod), nodeInfo)
if fits == false && !reflect.DeepEqual(err, ErrTaintsTolerationsNotMatch) {
t.Errorf("%s, unexpected error: %v", test.test, err)
}

View File

@ -25,31 +25,20 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
type NodeAffinity struct {
nodeLister algorithm.NodeLister
}
func NewNodeAffinityPriority(nodeLister algorithm.NodeLister) algorithm.PriorityFunction {
nodeAffinity := &NodeAffinity{
nodeLister: nodeLister,
}
return nodeAffinity.CalculateNodeAffinityPriority
}
// CalculateNodeAffinityPriority prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node match a preferredSchedulingTerm,
// it will a get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
var maxCount int
counts := map[string]int{}
func CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
return nil, err
}
var maxCount float64
counts := make(map[string]float64, len(nodes.Items))
affinity, err := api.GetAffinityFromPodAnnotations(pod.Annotations)
if err != nil {
return nil, err
@ -72,7 +61,7 @@ func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInf
for _, node := range nodes.Items {
if nodeSelector.Matches(labels.Set(node.Labels)) {
counts[node.Name] += int(preferredSchedulingTerm.Weight)
counts[node.Name] += float64(preferredSchedulingTerm.Weight)
}
if counts[node.Name] > maxCount {
@ -82,15 +71,20 @@ func (s *NodeAffinity) CalculateNodeAffinityPriority(pod *api.Pod, nodeNameToInf
}
}
result := []schedulerapi.HostPriority{}
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
for i := range nodes.Items {
node := &nodes.Items[i]
fScore := float64(0)
if maxCount > 0 {
fScore = 10 * (float64(counts[node.Name]) / float64(maxCount))
fScore := 10 * (counts[node.Name] / maxCount)
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
} else {
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: 0})
}
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
glog.V(10).Infof("%v -> %v: NodeAffinityPriority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
return result, nil
}

View File

@ -156,8 +156,7 @@ func TestNodeAffinityPriority(t *testing.T) {
}
for _, test := range tests {
nodeAffinity := NodeAffinity{nodeLister: algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})}
list, err := nodeAffinity.CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
list, err := CalculateNodeAffinityPriority(test.pod, schedulercache.CreateNodeNameToInfoMap(nil), algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -24,22 +24,10 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// NodeTaints hold the node lister
type TaintToleration struct {
nodeLister algorithm.NodeLister
}
// NewTaintTolerationPriority
func NewTaintTolerationPriority(nodeLister algorithm.NodeLister) algorithm.PriorityFunction {
taintToleration := &TaintToleration{
nodeLister: nodeLister,
}
return taintToleration.ComputeTaintTolerationPriority
}
// CountIntolerableTaintsPreferNoSchedule gives the count of intolerable taints of a pod with effect PreferNoSchedule
func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints int) {
for _, taint := range taints {
func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []api.Toleration) (intolerableTaints float64) {
for i := range taints {
taint := &taints[i]
// check only on taints that have effect PreferNoSchedule
if taint.Effect != api.TaintEffectPreferNoSchedule {
continue
@ -54,27 +42,27 @@ func countIntolerableTaintsPreferNoSchedule(taints []api.Taint, tolerations []ap
// getAllTolerationEffectPreferNoSchedule gets the list of all Toleration with Effect PreferNoSchedule
func getAllTolerationPreferNoSchedule(tolerations []api.Toleration) (tolerationList []api.Toleration) {
for _, toleration := range tolerations {
for i := range tolerations {
toleration := &tolerations[i]
if len(toleration.Effect) == 0 || toleration.Effect == api.TaintEffectPreferNoSchedule {
tolerationList = append(tolerationList, toleration)
tolerationList = append(tolerationList, *toleration)
}
}
return
}
// ComputeTaintTolerationPriority prepares the priority list for all the nodes based on the number of intolerable taints on the node
func (s *TaintToleration) ComputeTaintTolerationPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
// counts hold the count of intolerable taints of a pod for a given node
counts := make(map[string]int)
// the max value of counts
var maxCount int
func ComputeTaintTolerationPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister algorithm.NodeLister) (schedulerapi.HostPriorityList, error) {
nodes, err := nodeLister.List()
if err != nil {
return nil, err
}
// the max value of counts
var maxCount float64
// counts hold the count of intolerable taints of a pod for a given node
counts := make(map[string]float64, len(nodes.Items))
tolerations, err := api.GetTolerationsFromPodAnnotations(pod.Annotations)
if err != nil {
return nil, err
@ -99,14 +87,19 @@ func (s *TaintToleration) ComputeTaintTolerationPriority(pod *api.Pod, nodeNameT
// The maximum priority value to give to a node
// Priority values range from 0 - maxPriority
const maxPriority = 10
const maxPriority = float64(10)
result := make(schedulerapi.HostPriorityList, 0, len(nodes.Items))
for _, node := range nodes.Items {
fScore := float64(maxPriority)
for i := range nodes.Items {
node := &nodes.Items[i]
fScore := maxPriority
if maxCount > 0 {
fScore = (1.0 - float64(counts[node.Name])/float64(maxCount)) * 10
fScore = (1.0 - counts[node.Name]/maxCount) * 10
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, node.Name, int(fScore))
}
glog.V(10).Infof("%v -> %v: Taint Toleration Priority, Score: (%d)", pod.Name, node.Name, int(fScore))
result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
}

View File

@ -212,8 +212,7 @@ func TestTaintAndToleration(t *testing.T) {
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap([]*api.Pod{{}})
taintToleration := TaintToleration{nodeLister: algorithm.FakeNodeLister(api.NodeList{Items: test.nodes})}
list, err := taintToleration.ComputeTaintTolerationPriority(
list, err := ComputeTaintTolerationPriority(
test.pod,
nodeNameToInfo,
algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}))

View File

@ -147,12 +147,7 @@ func defaultPredicates() sets.String {
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicateFactory(
"PodToleratesNodeTaints",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewTolerationMatchPredicate(args.NodeInfo)
},
),
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),
// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),
@ -175,23 +170,7 @@ func defaultPriorities() sets.String {
Weight: 1,
},
),
factory.RegisterPriorityConfigFactory(
"NodeAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewNodeAffinityPriority(args.NodeLister)
},
Weight: 1,
},
),
factory.RegisterPriorityConfigFactory(
"TaintTolerationPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewTaintTolerationPriority(args.NodeLister)
},
Weight: 1,
},
),
factory.RegisterPriorityFunction("NodeAffinityPriority", priorities.CalculateNodeAffinityPriority, 1),
factory.RegisterPriorityFunction("TaintTolerationPriority", priorities.ComputeTaintTolerationPriority, 1),
)
}

View File

@ -433,7 +433,8 @@ func (f *ConfigFactory) responsibleForPod(pod *api.Pod) bool {
func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node *api.Node) bool {
for _, cond := range node.Status.Conditions {
for i := range node.Status.Conditions {
cond := &node.Status.Conditions[i]
// We consider the node for scheduling only when its:
// - NodeReady condition status is ConditionTrue,
// - NodeOutOfDisk condition status is ConditionFalse,

View File

@ -232,7 +232,7 @@ func PrioritizeNodes(
nodeLister algorithm.NodeLister,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{}
result := make(schedulerapi.HostPriorityList, 0, len(nodeNameToInfo))
// If no priority configs are provided, then the EqualPriority function is applied
// This is required to generate the priority list in the required format
@ -243,7 +243,7 @@ func PrioritizeNodes(
var (
mu = sync.Mutex{}
wg = sync.WaitGroup{}
combinedScores = map[string]int{}
combinedScores = make(map[string]int, len(nodeNameToInfo))
errs []error
)

View File

@ -39,6 +39,9 @@ type NodeInfo struct {
requestedResource *Resource
pods []*api.Pod
nonzeroRequest *Resource
// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
// explicitly as int, to avoid conversions and improve performance.
allowedPodNumber int
// Whenever NodeInfo changes, generation is bumped.
// This is used to avoid cloning it if the object didn't change.
@ -59,6 +62,7 @@ func NewNodeInfo(pods ...*api.Pod) *NodeInfo {
ni := &NodeInfo{
requestedResource: &Resource{},
nonzeroRequest: &Resource{},
allowedPodNumber: 0,
generation: 0,
}
for _, pod := range pods {
@ -83,6 +87,13 @@ func (n *NodeInfo) Pods() []*api.Pod {
return n.pods
}
func (n *NodeInfo) AllowedPodNumber() int {
if n == nil {
return 0
}
return n.allowedPodNumber
}
// RequestedResource returns aggregated resource request of pods on this node.
func (n *NodeInfo) RequestedResource() Resource {
if n == nil {
@ -105,6 +116,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
node: n.node,
requestedResource: &(*n.requestedResource),
nonzeroRequest: &(*n.nonzeroRequest),
allowedPodNumber: n.allowedPodNumber,
pods: pods,
generation: n.generation,
}
@ -181,6 +193,7 @@ func calculateResource(pod *api.Pod) (cpu int64, mem int64, nvidia_gpu int64, no
// Sets the overall node information.
func (n *NodeInfo) SetNode(node *api.Node) error {
n.node = node
n.allowedPodNumber = int(node.Status.Allocatable.Pods().Value())
n.generation++
return nil
}
@ -192,6 +205,7 @@ func (n *NodeInfo) RemoveNode(node *api.Node) error {
// and thus can potentially be observed later, even though they happened before
// node removal. This is handled correctly in cache.go file.
n.node = nil
n.allowedPodNumber = 0
n.generation++
return nil
}