Merge pull request #56926 from wgliang/master

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

-Add scheduler optimization options, short circuit all predicates if …

…one predicate fails

Signed-off-by: Wang Guoliang <iamwgliang@gmail.com>

**What this PR does / why we need it**:
Short circuit all predicates if one predicate fails. 

I think we can add a switch to control it, maybe some scenes do not need to know all the causes of failure, but also can get a great performance improvement; if you need to fully understand the reasons for the failure, and accept the current performance requirements, can maintain the current logic. It should expose this switch to the user.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:

Fixes #56889 and #48186

**Special notes for your reviewer**:
@davidopp

**Release note**:

```
Allow scheduler set AlwaysCheckAllPredicates, short circuit all predicates if one predicate fails can greatly improve the scheduling performance.
```
pull/6/head
Kubernetes Submit Queue 2018-01-14 04:53:05 -08:00 committed by GitHub
commit 5911f87dad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 48 deletions

View File

@ -26,5 +26,6 @@
"nodeCacheCapable": false
}
],
"hardPodAffinitySymmetricWeight" : 10
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}

View File

@ -15,5 +15,6 @@
{"name" : "ServiceSpreadingPriority", "weight" : 1},
{"name" : "EqualPriority", "weight" : 1}
],
"hardPodAffinitySymmetricWeight" : 10
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}

View File

@ -47,6 +47,12 @@ type Policy struct {
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100.
HardPodAffinitySymmetricWeight int32
// When AlwaysCheckAllPredicates is set to true, scheduler checks all
// the configured predicates even after one or more of them fails.
// When the flag is set to false, scheduler skips checking the rest
// of the predicates after it finds one predicate that failed.
AlwaysCheckAllPredicates bool
}
type PredicatePolicy struct {

View File

@ -39,6 +39,12 @@ type Policy struct {
// corresponding to every RequiredDuringScheduling affinity rule.
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 1-100.
HardPodAffinitySymmetricWeight int `json:"hardPodAffinitySymmetricWeight"`
// When AlwaysCheckAllPredicates is set to true, scheduler checks all
// the configured predicates even after one or more of them fails.
// When the flag is set to false, scheduler skips checking the rest
// of the predicates after it finds one predicate that failed.
AlwaysCheckAllPredicates bool `json:"alwaysCheckAllPredicates"`
}
type PredicatePolicy struct {

View File

@ -317,7 +317,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
}
queue := NewSchedulingQueue()
scheduler := NewGenericScheduler(
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
cache, nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@ -90,16 +90,17 @@ func (f *FitError) Error() string {
}
type genericScheduler struct {
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
cache schedulercache.Cache
equivalenceCache *EquivalenceCache
schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.PredicateMetadataProducer
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
lastNodeIndexLock sync.Mutex
lastNodeIndex uint64
alwaysCheckAllPredicates bool
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
volumeBinder *volumebinder.VolumeBinder
@ -133,7 +134,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
if err != nil {
return "", err
}
@ -295,6 +296,7 @@ func findNodesThatFit(
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
@ -313,7 +315,7 @@ func findNodesThatFit(
meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue)
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache, schedulingQueue, alwaysCheckAllPredicates)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
@ -402,6 +404,7 @@ func podFitsOnNode(
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
equivalenceHash uint64
@ -457,8 +460,6 @@ func podFitsOnNode(
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
@ -479,6 +480,11 @@ func podFitsOnNode(
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate evaluation is short circuited and there are chances of other predicates failing as well.")
break
}
}
}
}
@ -917,7 +923,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false); !fits {
if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -931,7 +937,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false)
if !fits {
removePod(p)
victims = append(victims, p)
@ -1045,18 +1051,20 @@ func NewGenericScheduler(
priorityMetaProducer algorithm.MetadataProducer,
extenders []algorithm.SchedulerExtender,
volumeBinder *volumebinder.VolumeBinder,
pvcLister corelisters.PersistentVolumeClaimLister) algorithm.ScheduleAlgorithm {
pvcLister corelisters.PersistentVolumeClaimLister,
alwaysCheckAllPredicates bool) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers,
priorityMetaProducer: priorityMetaProducer,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulercache.NodeInfo),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
}
}

View File

@ -187,16 +187,17 @@ func TestSelectHost(t *testing.T) {
func TestGenericScheduler(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
nodes []string
pvcs []*v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
expectedHosts sets.String
expectsErr bool
wErr error
name string
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
alwaysCheckAllPredicates bool
nodes []string
pvcs []*v1.PersistentVolumeClaim
pod *v1.Pod
pods []*v1.Pod
expectedHosts sets.String
expectsErr bool
wErr error
}{
{
predicates: map[string]algorithm.FitPredicate{"false": falsePredicate},
@ -377,6 +378,22 @@ func TestGenericScheduler(t *testing.T) {
expectsErr: true,
wErr: fmt.Errorf("persistentvolumeclaim \"existingPVC\" is being deleted"),
},
{
// alwaysCheckAllPredicates is true
predicates: map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate, "false": falsePredicate},
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
alwaysCheckAllPredicates: true,
nodes: []string{"1"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
name: "test alwaysCheckAllPredicates is true",
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "2"}},
NumAllNodes: 1,
FailedPredicates: FailedPredicateMap{
"1": []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate},
},
},
},
}
for _, test := range tests {
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
@ -393,7 +410,7 @@ func TestGenericScheduler(t *testing.T) {
pvcLister := schedulertesting.FakePersistentVolumeClaimLister(pvcs)
scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister)
cache, nil, NewSchedulingQueue(), test.predicates, algorithm.EmptyPredicateMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}, nil, pvcLister, test.alwaysCheckAllPredicates)
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@ -414,7 +431,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(),
}
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
@ -449,7 +466,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
}
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil)
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyPredicateMetadataProducer, nil, nil, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1276,7 +1293,7 @@ func TestPreempt(t *testing.T) {
extenders = append(extenders, extender)
}
scheduler := NewGenericScheduler(
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{})
cache, nil, NewSchedulingQueue(), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{{Function: numericPriority, Weight: 1}}, algorithm.EmptyMetadataProducer, extenders, nil, schedulertesting.FakePersistentVolumeClaimLister{}, false)
// Call Preempt and check the expected results.
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {

View File

@ -130,6 +130,9 @@ type configFactory struct {
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
// always check all predicates even if the middle of one predicate fails.
alwaysCheckAllPredicates bool
}
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
@ -880,6 +883,12 @@ func (f *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
if policy.HardPodAffinitySymmetricWeight != 0 {
f.hardPodAffinitySymmetricWeight = policy.HardPodAffinitySymmetricWeight
}
// When AlwaysCheckAllPredicates is set to true, scheduler checks all the configured
// predicates even after one or more of them fails.
if policy.AlwaysCheckAllPredicates {
f.alwaysCheckAllPredicates = policy.AlwaysCheckAllPredicates
}
return f.CreateFromKeys(predicateKeys, priorityKeys, extenders)
}
@ -933,7 +942,7 @@ func (f *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
glog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister)
algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, f.podQueue, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders, f.volumeBinder, f.pVCLister, f.alwaysCheckAllPredicates)
podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{

View File

@ -533,7 +533,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{})
schedulertesting.FakePersistentVolumeClaimLister{},
false)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
@ -577,7 +578,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{},
nil,
schedulertesting.FakePersistentVolumeClaimLister{})
schedulertesting.FakePersistentVolumeClaimLister{},
false)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{