From b8526cd0777af6717d986942ccb6de6d23eca1bb Mon Sep 17 00:00:00 2001 From: Wang Guoliang Date: Thu, 21 Dec 2017 23:03:46 +0800 Subject: [PATCH] -Add scheduler optimization options, short circuit all predicates if one predicate fails --- ...scheduler-policy-config-with-extender.json | 3 +- examples/scheduler-policy-config.json | 3 +- pkg/scheduler/api/types.go | 6 ++ pkg/scheduler/api/v1/types.go | 6 ++ pkg/scheduler/core/extender_test.go | 2 +- pkg/scheduler/core/generic_scheduler.go | 64 +++++++++++-------- pkg/scheduler/core/generic_scheduler_test.go | 45 +++++++++---- pkg/scheduler/factory/factory.go | 11 +++- pkg/scheduler/scheduler_test.go | 6 +- 9 files changed, 98 insertions(+), 48 deletions(-) diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index 996e6efc82..cd566fb314 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -26,5 +26,6 @@ "nodeCacheCapable": false } ], -"hardPodAffinitySymmetricWeight" : 10 +"hardPodAffinitySymmetricWeight" : 10, +"alwaysCheckAllPredicates" : false } diff --git a/examples/scheduler-policy-config.json b/examples/scheduler-policy-config.json index b0fecffab2..048299e5e3 100644 --- a/examples/scheduler-policy-config.json +++ b/examples/scheduler-policy-config.json @@ -15,5 +15,6 @@ {"name" : "ServiceSpreadingPriority", "weight" : 1}, {"name" : "EqualPriority", "weight" : 1} ], -"hardPodAffinitySymmetricWeight" : 10 +"hardPodAffinitySymmetricWeight" : 10, +"alwaysCheckAllPredicates" : false } diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 080fc386db..28b095f334 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -47,6 +47,12 @@ type Policy struct { // corresponding to every RequiredDuringScheduling affinity rule. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100. HardPodAffinitySymmetricWeight int32 + + // When AlwaysCheckAllPredicates is set to true, scheduler checks all + // the configured predicates even after one or more of them fails. + // When the flag is set to false, scheduler skips checking the rest + // of the predicates after it finds one predicate that failed. + AlwaysCheckAllPredicates bool } type PredicatePolicy struct { diff --git a/pkg/scheduler/api/v1/types.go b/pkg/scheduler/api/v1/types.go index 3f6684a5f3..14e2f06b1e 100644 --- a/pkg/scheduler/api/v1/types.go +++ b/pkg/scheduler/api/v1/types.go @@ -39,6 +39,12 @@ type Policy struct { // corresponding to every RequiredDuringScheduling affinity rule. // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100. HardPodAffinitySymmetricWeight int `json:"hardPodAffinitySymmetricWeight"` + + // When AlwaysCheckAllPredicates is set to true, scheduler checks all + // the configured predicates even after one or more of them fails. + // When the flag is set to false, scheduler skips checking the rest + // of the predicates after it finds one predicate that failed. + AlwaysCheckAllPredicates bool `json:"alwaysCheckAllPredicates"` } type PredicatePolicy struct { diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 23551a2415..09e136d38b 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } queue := NewSchedulingQueue() scheduler := NewGenericScheduler( - cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}) + cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false) podIgnored := &v1.Pod{} machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index f147d53495..2009b7af89 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -90,16 +90,17 @@ func (f *FitError) Error() string { } type genericScheduler struct { - cache schedulercache.Cache - equivalenceCache *EquivalenceCache - schedulingQueue SchedulingQueue - predicates map[string]algorithm.FitPredicate - priorityMetaProducer algorithm.MetadataProducer - predicateMetaProducer algorithm.PredicateMetadataProducer - prioritizers []algorithm.PriorityConfig - extenders []algorithm.SchedulerExtender - lastNodeIndexLock sync.Mutex - lastNodeIndex uint64 + cache schedulercache.Cache + equivalenceCache *EquivalenceCache + schedulingQueue SchedulingQueue + predicates map[string]algorithm.FitPredicate + priorityMetaProducer algorithm.MetadataProducer + predicateMetaProducer algorithm.PredicateMetadataProducer + prioritizers []algorithm.PriorityConfig + extenders []algorithm.SchedulerExtender + lastNodeIndexLock sync.Mutex + lastNodeIndex uint64 + alwaysCheckAllPredicates bool cachedNodeInfoMap map[string]*schedulercache.NodeInfo volumeBinder *volumebinder.VolumeBinder @@ -133,7 +134,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister trace.Step("Computing predicates") startPredicateEvalTime := time.Now() - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates) if err != nil { return "", err } @@ -295,6 +296,7 @@ func findNodesThatFit( metadataProducer algorithm.PredicateMetadataProducer, ecache *EquivalenceCache, schedulingQueue SchedulingQueue, + alwaysCheckAllPredicates bool, ) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} @@ -313,7 +315,7 @@ func findNodesThatFit( meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := nodes[i].Name - fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue) + fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates) if err != nil { predicateResultLock.Lock() errs[err.Error()]++ @@ -402,6 +404,7 @@ func podFitsOnNode( predicateFuncs map[string]algorithm.FitPredicate, ecache *EquivalenceCache, queue SchedulingQueue, + alwaysCheckAllPredicates bool, ) (bool, []algorithm.PredicateFailureReason, error) { var ( equivalenceHash uint64 @@ -457,8 +460,6 @@ func podFitsOnNode( fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash) } - // TODO(bsalamat): When one predicate fails and fit is false, why do we continue - // checking other predicates? if !eCacheAvailable || invalid { // we need to execute predicate functions since equivalence cache does not work fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) @@ -479,6 +480,11 @@ func podFitsOnNode( if !fit { // eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates = append(failedPredicates, reasons...) + // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails. + if !alwaysCheckAllPredicates { + glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate evaluation is short circuited and there are chances of other predicates failing as well.") + break + } } } } @@ -917,7 +923,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -931,7 +937,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false) if !fits { removePod(p) victims = append(victims, p) @@ -1045,18 +1051,20 @@ func NewGenericScheduler( priorityMetaProducer algorithm.MetadataProducer, extenders []algorithm.SchedulerExtender, volumeBinder *volumebinder.VolumeBinder, - pvcLister corelisters.PersistentVolumeClaimLister) algorithm.ScheduleAlgorithm { + pvcLister corelisters.PersistentVolumeClaimLister, + alwaysCheckAllPredicates bool) algorithm.ScheduleAlgorithm { return &genericScheduler{ - cache: cache, - equivalenceCache: eCache, - schedulingQueue: podQueue, - predicates: predicates, - predicateMetaProducer: predicateMetaProducer, - prioritizers: prioritizers, - priorityMetaProducer: priorityMetaProducer, - extenders: extenders, - cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), - volumeBinder: volumeBinder, - pvcLister: pvcLister, + cache: cache, + equivalenceCache: eCache, + schedulingQueue: podQueue, + predicates: predicates, + predicateMetaProducer: predicateMetaProducer, + prioritizers: prioritizers, + priorityMetaProducer: priorityMetaProducer, + extenders: extenders, + cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo), + volumeBinder: volumeBinder, + pvcLister: pvcLister, + alwaysCheckAllPredicates: alwaysCheckAllPredicates, } } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index cdfc6b20fe..55fede23c4 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -187,16 +187,17 @@ func TestSelectHost(t *testing.T) { func TestGenericScheduler(t *testing.T) { predicates.SetPredicatesOrdering(order) tests := []struct { - name string - predicates map[string]algorithm.FitPredicate - prioritizers []algorithm.PriorityConfig - nodes []string - pvcs []*v1.PersistentVolumeClaim - pod *v1.Pod - pods []*v1.Pod - expectedHosts sets.String - expectsErr bool - wErr error + name string + predicates map[string]algorithm.FitPredicate + prioritizers []algorithm.PriorityConfig + alwaysCheckAllPredicates bool + nodes []string + pvcs []*v1.PersistentVolumeClaim + pod *v1.Pod + pods []*v1.Pod + expectedHosts sets.String + expectsErr bool + wErr error }{ { predicates: map[string]algorithm.FitPredicate{"false": falsePredicate}, @@ -377,6 +378,22 @@ func TestGenericScheduler(t *testing.T) { expectsErr: true, wErr: fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"), }, + { + // alwaysCheckAllPredicates is true + predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate, "false": falsePredicate}, + prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}, + alwaysCheckAllPredicates: true, + nodes: []string{"1"}, + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}}, + name: "test alwaysCheckAllPredicates is true", + wErr: &FitError{ + Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}}, + NumAllNodes: 1, + FailedPredicates: FailedPredicateMap{ + "1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate}, + }, + }, + }, } for _, test := range tests { cache := schedulercache.New(time.Duration(0), wait.NeverStop) @@ -393,7 +410,7 @@ func TestGenericScheduler(t *testing.T) { pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs) scheduler := NewGenericScheduler( - cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister) + cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister, test.alwaysCheckAllPredicates) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if !reflect.DeepEqual(err, test.wErr) { @@ -414,7 +431,7 @@ func TestFindFitAllError(t *testing.T) { "2": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(), } - _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil) + _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) if err != nil { t.Errorf("unexpected error: %v", err) @@ -449,7 +466,7 @@ func TestFindFitSomeError(t *testing.T) { nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) } - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil) + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1276,7 +1293,7 @@ func TestPreempt(t *testing.T) { extenders = append(extenders, extender) } scheduler := NewGenericScheduler( - cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}) + cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false) // Call Preempt and check the expected results. node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap})) if err != nil { diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 04b7a58591..fca4f46e3d 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -130,6 +130,9 @@ type configFactory struct { // Handles volume binding decisions volumeBinder *volumebinder.VolumeBinder + + // always check all predicates even if the middle of one predicate fails. + alwaysCheckAllPredicates bool } // NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only @@ -880,6 +883,12 @@ func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler if policy.HardPodAffinitySymmetricWeight != 0 { f.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight } + // When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured + // predicates even after one or more of them fails. + if policy.AlwaysCheckAllPredicates { + f.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates + } + return f.CreateFromKeys(predicateKeys, priorityKeys, extenders) } @@ -933,7 +942,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, glog.Info("Created equivalence class cache") } - algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister) + algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister, f.alwaysCheckAllPredicates) podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 73d4abcc28..ed36792156 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -533,7 +533,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, - schedulertesting.FakePersistentVolumeClaimLister{}) + schedulertesting.FakePersistentVolumeClaimLister{}, + false) bindingChan := make(chan *v1.Binding, 1) errChan := make(chan error, 1) configurator := &FakeConfigurator{ @@ -577,7 +578,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, - schedulertesting.FakePersistentVolumeClaimLister{}) + schedulertesting.FakePersistentVolumeClaimLister{}, + false) bindingChan := make(chan *v1.Binding, 2) configurator := &FakeConfigurator{ Config: &Config{