From 6c63dcfffebb9a8bcc5e1cee748ad16d7ed7e293 Mon Sep 17 00:00:00 2001 From: Guoliang Wang Date: Sat, 18 Aug 2018 09:05:58 +0800 Subject: [PATCH] Not split nodes when searching for nodes but doing it all at once --- pkg/scheduler/core/generic_scheduler.go | 39 ++++++++++--------- .../client-go/util/workqueue/parallelizer.go | 21 +++++++++- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 45fe3a9dab..1ab9e576a3 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "context" "fmt" "math" "sort" @@ -373,16 +374,19 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // Create filtered list with enough space to avoid growing it // and allow assigning. - filtered = make([]*v1.Node, 2*numNodesToFind) + filtered = make([]*v1.Node, numNodesToFind) errs := errors.MessageCountMap{} - var predicateResultLock sync.Mutex - var filteredLen int32 + var ( + predicateResultLock sync.Mutex + filteredLen int32 + equivClass *equivalence.Class + ) + + ctx, cancel := context.WithCancel(context.Background()) // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) - var equivClass *equivalence.Class - if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found equivClass = equivalence.NewClass(pod) @@ -412,25 +416,24 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v return } if fits { - filtered[atomic.AddInt32(&filteredLen, 1)-1] = g.cachedNodeInfoMap[nodeName].Node() + length := atomic.AddInt32(&filteredLen, 1) + if length > numNodesToFind { + cancel() + atomic.AddInt32(&filteredLen, -1) + } else { + filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() + } } else { predicateResultLock.Lock() failedPredicateMap[nodeName] = failedPredicates predicateResultLock.Unlock() } } - numNodesProcessed := int32(0) - for numNodesProcessed < allNodes { - numNodesToProcess := allNodes - numNodesProcessed - if numNodesToProcess > numNodesToFind { - numNodesToProcess = numNodesToFind - } - workqueue.Parallelize(16, int(numNodesToProcess), checkNode) - if filteredLen >= numNodesToFind { - break - } - numNodesProcessed += numNodesToProcess - } + + // Stops searching for more nodes once the configured number of feasible nodes + // are found. + workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) + filtered = filtered[:filteredLen] if len(errs) > 0 { return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs) diff --git a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go index be668c4233..526bd244e6 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go +++ b/staging/src/k8s.io/client-go/util/workqueue/parallelizer.go @@ -17,6 +17,7 @@ limitations under the License. package workqueue import ( + "context" "sync" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -24,9 +25,20 @@ import ( type DoWorkPieceFunc func(piece int) -// Parallelize is a very simple framework that allow for parallelizing +// Parallelize is a very simple framework that allows for parallelizing // N independent pieces of work. func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { + ParallelizeUntil(nil, workers, pieces, doWorkPiece) +} + +// ParallelizeUntil is a framework that allows for parallelizing N +// independent pieces of work until done or the context is canceled. +func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { + var stop <-chan struct{} + if ctx != nil { + stop = ctx.Done() + } + toProcess := make(chan int, pieces) for i := 0; i < pieces; i++ { toProcess <- i @@ -44,7 +56,12 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) { defer utilruntime.HandleCrash() defer wg.Done() for piece := range toProcess { - doWorkPiece(piece) + select { + case <-stop: + return + default: + doWorkPiece(piece) + } } }() }