Merge pull request #67555 from wgliang/opt/improve-performance

Automatic merge from submit-queue (batch tested with PRs 67555, 68196). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md.

Not split nodes when searching for nodes but doing it all at once

**What this PR does / why we need it**:
Not split nodes when searching for nodes but doing it all at once.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:
@bsalamat 

This is a follow up PR of #66733.

https://github.com/kubernetes/kubernetes/pull/66733#discussion_r205932531

**Release note**:

```release-note
Not split nodes when searching for nodes but doing it all at once.
```
pull/8/head
Kubernetes Submit Queue 2018-09-04 11:41:34 -07:00 committed by GitHub
commit a0b457d0e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 20 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package core
import (
"context"
"fmt"
"math"
"sort"
@ -373,16 +374,19 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, 2*numNodesToFind)
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
var predicateResultLock sync.Mutex
var filteredLen int32
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)
ctx, cancel := context.WithCancel(context.Background())
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivClass *equivalence.Class
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
@ -412,25 +416,24 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
return
}
if fits {
filtered[atomic.AddInt32(&filteredLen, 1)-1] = g.cachedNodeInfoMap[nodeName].Node()
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
numNodesProcessed := int32(0)
for numNodesProcessed < allNodes {
numNodesToProcess := allNodes - numNodesProcessed
if numNodesToProcess > numNodesToFind {
numNodesToProcess = numNodesToFind
}
workqueue.Parallelize(16, int(numNodesToProcess), checkNode)
if filteredLen >= numNodesToFind {
break
}
numNodesProcessed += numNodesToProcess
}
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)

View File

@ -17,6 +17,7 @@ limitations under the License.
package workqueue
import (
"context"
"sync"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -24,9 +25,20 @@ import (
type DoWorkPieceFunc func(piece int)
// Parallelize is a very simple framework that allow for parallelizing
// Parallelize is a very simple framework that allows for parallelizing
// N independent pieces of work.
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
ParallelizeUntil(nil, workers, pieces, doWorkPiece)
}
// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
var stop <-chan struct{}
if ctx != nil {
stop = ctx.Done()
}
toProcess := make(chan int, pieces)
for i := 0; i < pieces; i++ {
toProcess <- i
@ -44,7 +56,12 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
defer utilruntime.HandleCrash()
defer wg.Done()
for piece := range toProcess {
doWorkPiece(piece)
select {
case <-stop:
return
default:
doWorkPiece(piece)
}
}
}()
}