mirror of https://github.com/k3s-io/k3s
Not split nodes when searching for nodes but doing it all at once
parent
8c1bfeb0cf
commit
6c63dcfffe
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
@ -373,16 +374,19 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
|
||||
// Create filtered list with enough space to avoid growing it
|
||||
// and allow assigning.
|
||||
filtered = make([]*v1.Node, 2*numNodesToFind)
|
||||
filtered = make([]*v1.Node, numNodesToFind)
|
||||
errs := errors.MessageCountMap{}
|
||||
var predicateResultLock sync.Mutex
|
||||
var filteredLen int32
|
||||
var (
|
||||
predicateResultLock sync.Mutex
|
||||
filteredLen int32
|
||||
equivClass *equivalence.Class
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
|
||||
|
||||
var equivClass *equivalence.Class
|
||||
|
||||
if g.equivalenceCache != nil {
|
||||
// getEquivalenceClassInfo will return immediately if no equivalence pod found
|
||||
equivClass = equivalence.NewClass(pod)
|
||||
|
@ -412,25 +416,24 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
return
|
||||
}
|
||||
if fits {
|
||||
filtered[atomic.AddInt32(&filteredLen, 1)-1] = g.cachedNodeInfoMap[nodeName].Node()
|
||||
length := atomic.AddInt32(&filteredLen, 1)
|
||||
if length > numNodesToFind {
|
||||
cancel()
|
||||
atomic.AddInt32(&filteredLen, -1)
|
||||
} else {
|
||||
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
|
||||
}
|
||||
} else {
|
||||
predicateResultLock.Lock()
|
||||
failedPredicateMap[nodeName] = failedPredicates
|
||||
predicateResultLock.Unlock()
|
||||
}
|
||||
}
|
||||
numNodesProcessed := int32(0)
|
||||
for numNodesProcessed < allNodes {
|
||||
numNodesToProcess := allNodes - numNodesProcessed
|
||||
if numNodesToProcess > numNodesToFind {
|
||||
numNodesToProcess = numNodesToFind
|
||||
}
|
||||
workqueue.Parallelize(16, int(numNodesToProcess), checkNode)
|
||||
if filteredLen >= numNodesToFind {
|
||||
break
|
||||
}
|
||||
numNodesProcessed += numNodesToProcess
|
||||
}
|
||||
|
||||
// Stops searching for more nodes once the configured number of feasible nodes
|
||||
// are found.
|
||||
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
|
||||
|
||||
filtered = filtered[:filteredLen]
|
||||
if len(errs) > 0 {
|
||||
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package workqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
|
@ -24,9 +25,20 @@ import (
|
|||
|
||||
type DoWorkPieceFunc func(piece int)
|
||||
|
||||
// Parallelize is a very simple framework that allow for parallelizing
|
||||
// Parallelize is a very simple framework that allows for parallelizing
|
||||
// N independent pieces of work.
|
||||
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
|
||||
ParallelizeUntil(nil, workers, pieces, doWorkPiece)
|
||||
}
|
||||
|
||||
// ParallelizeUntil is a framework that allows for parallelizing N
|
||||
// independent pieces of work until done or the context is canceled.
|
||||
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
|
||||
var stop <-chan struct{}
|
||||
if ctx != nil {
|
||||
stop = ctx.Done()
|
||||
}
|
||||
|
||||
toProcess := make(chan int, pieces)
|
||||
for i := 0; i < pieces; i++ {
|
||||
toProcess <- i
|
||||
|
@ -44,8 +56,13 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
|
|||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
for piece := range toProcess {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
doWorkPiece(piece)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
Loading…
Reference in New Issue