mirror of https://github.com/k3s-io/k3s
Add a scheduler config argument to set the percentage of nodes to score
parent
a5045d107e
commit
abb70aee98
|
@ -18,7 +18,6 @@ package options
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
|
|
|
@ -175,6 +175,7 @@ users:
|
|||
Burst: 100,
|
||||
ContentType: "application/vnd.kubernetes.protobuf",
|
||||
},
|
||||
PercentageOfNodesToScore: 50,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -211,6 +212,7 @@ users:
|
|||
Burst: 100,
|
||||
ContentType: "application/vnd.kubernetes.protobuf",
|
||||
},
|
||||
PercentageOfNodesToScore: 50,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -287,23 +287,24 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
|
|||
}
|
||||
|
||||
// Set up the configurator which can create schedulers from configs.
|
||||
configurator := factory.NewConfigFactory(
|
||||
s.ComponentConfig.SchedulerName,
|
||||
s.Client,
|
||||
s.InformerFactory.Core().V1().Nodes(),
|
||||
s.PodInformer,
|
||||
s.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
s.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
s.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
s.InformerFactory.Apps().V1().ReplicaSets(),
|
||||
s.InformerFactory.Apps().V1().StatefulSets(),
|
||||
s.InformerFactory.Core().V1().Services(),
|
||||
s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
storageClassInformer,
|
||||
s.ComponentConfig.HardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
s.ComponentConfig.DisablePreemption,
|
||||
)
|
||||
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: s.ComponentConfig.SchedulerName,
|
||||
Client: s.Client,
|
||||
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
|
||||
PodInformer: s.PodInformer,
|
||||
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
|
||||
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
|
||||
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
|
||||
ServiceInformer: s.InformerFactory.Core().V1().Services(),
|
||||
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
StorageClassInformer: storageClassInformer,
|
||||
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
|
||||
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
DisablePreemption: s.ComponentConfig.DisablePreemption,
|
||||
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
|
||||
})
|
||||
|
||||
source := s.ComponentConfig.AlgorithmSource
|
||||
var config *scheduler.Config
|
||||
|
|
|
@ -99,6 +99,15 @@ type KubeSchedulerConfiguration struct {
|
|||
|
||||
// DisablePreemption disables the pod preemption feature.
|
||||
DisablePreemption bool
|
||||
|
||||
// PercentageOfNodeToScore is the percentage of all nodes that once found feasible
|
||||
// for running a pod, the scheduler stops its search for more feasible nodes in
|
||||
// the cluster. This helps improve scheduler's performance. Scheduler always tries to find
|
||||
// at least "minFeasibleNodesToFind" feasible nodes no matter what the value of this flag is.
|
||||
// Example: if the cluster size is 500 nodes and the value of this flag is 30,
|
||||
// then scheduler stops finding further feasible nodes once it finds 150 feasible ones.
|
||||
// When the value is 0, default percentage (50%) of the nodes will be scored.
|
||||
PercentageOfNodesToScore int32
|
||||
}
|
||||
|
||||
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration
|
||||
|
|
|
@ -278,6 +278,11 @@ func SetDefaults_KubeSchedulerConfiguration(obj *KubeSchedulerConfiguration) {
|
|||
obj.FailureDomains = kubeletapis.DefaultFailureDomains
|
||||
}
|
||||
|
||||
if obj.PercentageOfNodesToScore == 0 {
|
||||
// by default, stop finding feasible nodes once the number of feasible nodes is 50% of the cluster.
|
||||
obj.PercentageOfNodesToScore = 50
|
||||
}
|
||||
|
||||
// Use the default ClientConnectionConfiguration and LeaderElectionConfiguration options
|
||||
apimachineryconfigv1alpha1.RecommendedDefaultClientConnectionConfiguration(&obj.ClientConnection)
|
||||
apiserverconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&obj.LeaderElection.LeaderElectionConfiguration)
|
||||
|
|
|
@ -95,6 +95,16 @@ type KubeSchedulerConfiguration struct {
|
|||
|
||||
// DisablePreemption disables the pod preemption feature.
|
||||
DisablePreemption bool `json:"disablePreemption"`
|
||||
|
||||
// PercentageOfNodeToScore specifies what percentage of all nodes should be scored in each
|
||||
// scheduling cycle. This helps improve scheduler's performance. Scheduler always tries to find
|
||||
// at least "minFeasibleNodesToFind" feasible nodes no matter what the value of this flag is.
|
||||
// When this value is below 100%, the scheduler stops finding feasible nodes for running a pod
|
||||
// once it finds that percentage of feasible nodes of the whole cluster size. For example, if the
|
||||
// cluster size is 500 nodes and the value of this flag is 30, then scheduler stops finding
|
||||
// feasible nodes once it finds 150 feasible nodes.
|
||||
// When the value is 0, default percentage (50%) of the nodes will be scored.
|
||||
PercentageOfNodesToScore int32 `json:"percentageOfNodesToScore"`
|
||||
}
|
||||
|
||||
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration
|
||||
|
|
|
@ -849,23 +849,24 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
|
|||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
|
||||
if _, err := factory.NewConfigFactory(
|
||||
"some-scheduler-name",
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
false,
|
||||
).CreateFromConfig(policy); err != nil {
|
||||
if _, err := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: "some-scheduler-name",
|
||||
Client: client,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
|
||||
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
|
||||
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
|
||||
ServiceInformer: informerFactory.Core().V1().Services(),
|
||||
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
EnableEquivalenceClassCache: enableEquivalenceCache,
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
}).CreateFromConfig(policy); err != nil {
|
||||
t.Errorf("%s: Error constructing: %v", v, err)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@ const (
|
|||
MaxPriority = 10
|
||||
// MaxWeight defines the max weight value.
|
||||
MaxWeight = MaxInt / MaxPriority
|
||||
// DefaultPercentageOfNodesToScore defines the percentage of nodes of all nodes
|
||||
// that once found feasible, the scheduler stops looking for more nodes.
|
||||
DefaultPercentageOfNodesToScore = 50
|
||||
)
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
|
|
@ -514,7 +514,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
|||
nil,
|
||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||
false,
|
||||
false)
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
podIgnored := &v1.Pod{}
|
||||
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
if test.expectsErr {
|
||||
|
|
|
@ -45,6 +45,14 @@ import (
|
|||
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||
)
|
||||
|
||||
const (
|
||||
// minFeasibleNodesToFind is the minimum number of nodes that would be scored
|
||||
// in each scheduling cycle. This is a semi-arbitrary value to ensure that a
|
||||
// certain minimum of nodes are checked for feasibility. This in turn helps
|
||||
// ensure a minimum level of spreading.
|
||||
minFeasibleNodesToFind = 100
|
||||
)
|
||||
|
||||
// FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
|
||||
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
|
||||
|
||||
|
@ -99,6 +107,7 @@ type genericScheduler struct {
|
|||
volumeBinder *volumebinder.VolumeBinder
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
disablePreemption bool
|
||||
percentageOfNodesToScore int32
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||
|
@ -336,6 +345,20 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s
|
|||
return lowerPriorityPods
|
||||
}
|
||||
|
||||
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
|
||||
// its search for more feasible nodes.
|
||||
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) int32 {
|
||||
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore <= 0 ||
|
||||
g.percentageOfNodesToScore >= 100 {
|
||||
return numAllNodes
|
||||
}
|
||||
numNodes := numAllNodes * g.percentageOfNodesToScore / 100
|
||||
if numNodes < minFeasibleNodesToFind {
|
||||
return minFeasibleNodesToFind
|
||||
}
|
||||
return numNodes
|
||||
}
|
||||
|
||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||
// Each node is passed through the predicate functions to determine if it is a fit
|
||||
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
|
||||
|
@ -345,9 +368,12 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
if len(g.predicates) == 0 {
|
||||
filtered = nodes
|
||||
} else {
|
||||
allNodes := int32(g.cache.NodeTree().NumNodes)
|
||||
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
|
||||
|
||||
// Create filtered list with enough space to avoid growing it
|
||||
// and allow assigning.
|
||||
filtered = make([]*v1.Node, len(nodes))
|
||||
filtered = make([]*v1.Node, 2*numNodesToFind)
|
||||
errs := errors.MessageCountMap{}
|
||||
var predicateResultLock sync.Mutex
|
||||
var filteredLen int32
|
||||
|
@ -364,7 +390,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
|
||||
checkNode := func(i int) {
|
||||
var nodeCache *equivalence.NodeCache
|
||||
nodeName := nodes[i].Name
|
||||
nodeName := g.cache.NodeTree().Next()
|
||||
if g.equivalenceCache != nil {
|
||||
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
|
||||
}
|
||||
|
@ -386,14 +412,25 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
|
|||
return
|
||||
}
|
||||
if fits {
|
||||
filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
|
||||
filtered[atomic.AddInt32(&filteredLen, 1)-1] = g.cachedNodeInfoMap[nodeName].Node()
|
||||
} else {
|
||||
predicateResultLock.Lock()
|
||||
failedPredicateMap[nodeName] = failedPredicates
|
||||
predicateResultLock.Unlock()
|
||||
}
|
||||
}
|
||||
workqueue.Parallelize(16, len(nodes), checkNode)
|
||||
numNodesProcessed := int32(0)
|
||||
for numNodesProcessed < allNodes {
|
||||
numNodesToProcess := allNodes - numNodesProcessed
|
||||
if numNodesToProcess > numNodesToFind {
|
||||
numNodesToProcess = numNodesToFind
|
||||
}
|
||||
workqueue.Parallelize(16, int(numNodesToProcess), checkNode)
|
||||
if filteredLen >= numNodesToFind {
|
||||
break
|
||||
}
|
||||
numNodesProcessed += numNodesToProcess
|
||||
}
|
||||
filtered = filtered[:filteredLen]
|
||||
if len(errs) > 0 {
|
||||
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
|
||||
|
@ -1092,6 +1129,7 @@ func NewGenericScheduler(
|
|||
pvcLister corelisters.PersistentVolumeClaimLister,
|
||||
alwaysCheckAllPredicates bool,
|
||||
disablePreemption bool,
|
||||
percentageOfNodesToScore int32,
|
||||
) algorithm.ScheduleAlgorithm {
|
||||
return &genericScheduler{
|
||||
cache: cache,
|
||||
|
@ -1107,5 +1145,6 @@ func NewGenericScheduler(
|
|||
pvcLister: pvcLister,
|
||||
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
|
||||
disablePreemption: disablePreemption,
|
||||
percentageOfNodesToScore: percentageOfNodesToScore,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -426,7 +426,8 @@ func TestGenericScheduler(t *testing.T) {
|
|||
nil,
|
||||
pvcLister,
|
||||
test.alwaysCheckAllPredicates,
|
||||
false)
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
|
||||
if !reflect.DeepEqual(err, test.wErr) {
|
||||
|
@ -456,7 +457,8 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
|
|||
algorithm.EmptyPredicateMetadataProducer,
|
||||
prioritizers,
|
||||
algorithm.EmptyPriorityMetadataProducer,
|
||||
nil, nil, nil, false, false)
|
||||
nil, nil, nil, false, false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
|
||||
return s.(*genericScheduler)
|
||||
|
||||
|
@ -1362,7 +1364,8 @@ func TestPreempt(t *testing.T) {
|
|||
nil,
|
||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||
false,
|
||||
false)
|
||||
false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
// Call Preempt and check the expected results.
|
||||
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
|
||||
if err != nil {
|
||||
|
@ -1481,7 +1484,8 @@ func TestCacheInvalidationRace(t *testing.T) {
|
|||
algorithm.EmptyPredicateMetadataProducer,
|
||||
prioritizers,
|
||||
algorithm.EmptyPriorityMetadataProducer,
|
||||
nil, nil, pvcLister, true, false)
|
||||
nil, nil, pvcLister, true, false,
|
||||
schedulerapi.DefaultPercentageOfNodesToScore)
|
||||
|
||||
// First scheduling attempt should fail.
|
||||
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
|
||||
|
|
|
@ -136,59 +136,66 @@ type configFactory struct {
|
|||
|
||||
// Disable pod preemption or not.
|
||||
disablePreemption bool
|
||||
|
||||
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
|
||||
percentageOfNodesToScore int32
|
||||
}
|
||||
|
||||
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
|
||||
type ConfigFactoryArgs struct {
|
||||
SchedulerName string
|
||||
Client clientset.Interface
|
||||
NodeInformer coreinformers.NodeInformer
|
||||
PodInformer coreinformers.PodInformer
|
||||
PvInformer coreinformers.PersistentVolumeInformer
|
||||
PvcInformer coreinformers.PersistentVolumeClaimInformer
|
||||
ReplicationControllerInformer coreinformers.ReplicationControllerInformer
|
||||
ReplicaSetInformer appsinformers.ReplicaSetInformer
|
||||
StatefulSetInformer appsinformers.StatefulSetInformer
|
||||
ServiceInformer coreinformers.ServiceInformer
|
||||
PdbInformer policyinformers.PodDisruptionBudgetInformer
|
||||
StorageClassInformer storageinformers.StorageClassInformer
|
||||
HardPodAffinitySymmetricWeight int32
|
||||
EnableEquivalenceClassCache bool
|
||||
DisablePreemption bool
|
||||
PercentageOfNodesToScore int32
|
||||
}
|
||||
|
||||
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
|
||||
// return the interface.
|
||||
func NewConfigFactory(
|
||||
schedulerName string,
|
||||
client clientset.Interface,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
podInformer coreinformers.PodInformer,
|
||||
pvInformer coreinformers.PersistentVolumeInformer,
|
||||
pvcInformer coreinformers.PersistentVolumeClaimInformer,
|
||||
replicationControllerInformer coreinformers.ReplicationControllerInformer,
|
||||
replicaSetInformer appsinformers.ReplicaSetInformer,
|
||||
statefulSetInformer appsinformers.StatefulSetInformer,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
pdbInformer policyinformers.PodDisruptionBudgetInformer,
|
||||
storageClassInformer storageinformers.StorageClassInformer,
|
||||
hardPodAffinitySymmetricWeight int32,
|
||||
enableEquivalenceClassCache bool,
|
||||
disablePreemption bool,
|
||||
) scheduler.Configurator {
|
||||
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
|
||||
stopEverything := make(chan struct{})
|
||||
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
|
||||
|
||||
// storageClassInformer is only enabled through VolumeScheduling feature gate
|
||||
var storageClassLister storagelisters.StorageClassLister
|
||||
if storageClassInformer != nil {
|
||||
storageClassLister = storageClassInformer.Lister()
|
||||
if args.StorageClassInformer != nil {
|
||||
storageClassLister = args.StorageClassInformer.Lister()
|
||||
}
|
||||
|
||||
c := &configFactory{
|
||||
client: client,
|
||||
client: args.Client,
|
||||
podLister: schedulerCache,
|
||||
podQueue: core.NewSchedulingQueue(),
|
||||
pVLister: pvInformer.Lister(),
|
||||
pVCLister: pvcInformer.Lister(),
|
||||
serviceLister: serviceInformer.Lister(),
|
||||
controllerLister: replicationControllerInformer.Lister(),
|
||||
replicaSetLister: replicaSetInformer.Lister(),
|
||||
statefulSetLister: statefulSetInformer.Lister(),
|
||||
pdbLister: pdbInformer.Lister(),
|
||||
pVLister: args.PvInformer.Lister(),
|
||||
pVCLister: args.PvcInformer.Lister(),
|
||||
serviceLister: args.ServiceInformer.Lister(),
|
||||
controllerLister: args.ReplicationControllerInformer.Lister(),
|
||||
replicaSetLister: args.ReplicaSetInformer.Lister(),
|
||||
statefulSetLister: args.StatefulSetInformer.Lister(),
|
||||
pdbLister: args.PdbInformer.Lister(),
|
||||
storageClassLister: storageClassLister,
|
||||
schedulerCache: schedulerCache,
|
||||
StopEverything: stopEverything,
|
||||
schedulerName: schedulerName,
|
||||
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceClassCache: enableEquivalenceClassCache,
|
||||
disablePreemption: disablePreemption,
|
||||
schedulerName: args.SchedulerName,
|
||||
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
|
||||
disablePreemption: args.DisablePreemption,
|
||||
percentageOfNodesToScore: args.PercentageOfNodesToScore,
|
||||
}
|
||||
|
||||
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
|
||||
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
|
||||
// scheduled pod cache
|
||||
podInformer.Informer().AddEventHandler(
|
||||
args.PodInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
|
@ -213,15 +220,15 @@ func NewConfigFactory(
|
|||
},
|
||||
)
|
||||
// unscheduled pod queue
|
||||
podInformer.Informer().AddEventHandler(
|
||||
args.PodInformer.Informer().AddEventHandler(
|
||||
cache.FilteringResourceEventHandler{
|
||||
FilterFunc: func(obj interface{}) bool {
|
||||
switch t := obj.(type) {
|
||||
case *v1.Pod:
|
||||
return unassignedNonTerminatedPod(t) && responsibleForPod(t, schedulerName)
|
||||
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
|
||||
case cache.DeletedFinalStateUnknown:
|
||||
if pod, ok := t.Obj.(*v1.Pod); ok {
|
||||
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, schedulerName)
|
||||
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, args.SchedulerName)
|
||||
}
|
||||
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
|
||||
return false
|
||||
|
@ -239,29 +246,29 @@ func NewConfigFactory(
|
|||
)
|
||||
// ScheduledPodLister is something we provide to plug-in functions that
|
||||
// they may need to call.
|
||||
c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
|
||||
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(
|
||||
args.NodeInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addNodeToCache,
|
||||
UpdateFunc: c.updateNodeInCache,
|
||||
DeleteFunc: c.deleteNodeFromCache,
|
||||
},
|
||||
)
|
||||
c.nodeLister = nodeInformer.Lister()
|
||||
c.nodeLister = args.NodeInformer.Lister()
|
||||
|
||||
pdbInformer.Informer().AddEventHandler(
|
||||
args.PdbInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addPDBToCache,
|
||||
UpdateFunc: c.updatePDBInCache,
|
||||
DeleteFunc: c.deletePDBFromCache,
|
||||
},
|
||||
)
|
||||
c.pdbLister = pdbInformer.Lister()
|
||||
c.pdbLister = args.PdbInformer.Lister()
|
||||
|
||||
// On add and delete of PVs, it will affect equivalence cache items
|
||||
// related to persistent volume
|
||||
pvInformer.Informer().AddEventHandler(
|
||||
args.PvInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
|
||||
AddFunc: c.onPvAdd,
|
||||
|
@ -269,38 +276,38 @@ func NewConfigFactory(
|
|||
DeleteFunc: c.onPvDelete,
|
||||
},
|
||||
)
|
||||
c.pVLister = pvInformer.Lister()
|
||||
c.pVLister = args.PvInformer.Lister()
|
||||
|
||||
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
|
||||
pvcInformer.Informer().AddEventHandler(
|
||||
args.PvcInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onPvcAdd,
|
||||
UpdateFunc: c.onPvcUpdate,
|
||||
DeleteFunc: c.onPvcDelete,
|
||||
},
|
||||
)
|
||||
c.pVCLister = pvcInformer.Lister()
|
||||
c.pVCLister = args.PvcInformer.Lister()
|
||||
|
||||
// This is for ServiceAffinity: affected by the selector of the service is updated.
|
||||
// Also, if new service is added, equivalence cache will also become invalid since
|
||||
// existing pods may be "captured" by this service and change this predicate result.
|
||||
serviceInformer.Informer().AddEventHandler(
|
||||
args.ServiceInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onServiceAdd,
|
||||
UpdateFunc: c.onServiceUpdate,
|
||||
DeleteFunc: c.onServiceDelete,
|
||||
},
|
||||
)
|
||||
c.serviceLister = serviceInformer.Lister()
|
||||
c.serviceLister = args.ServiceInformer.Lister()
|
||||
|
||||
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
|
||||
// it only make sense when pod is scheduled or deleted
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
|
||||
// Setup volume binder
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
|
||||
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer)
|
||||
|
||||
storageClassInformer.Informer().AddEventHandler(
|
||||
args.StorageClassInformer.Informer().AddEventHandler(
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onStorageClassAdd,
|
||||
DeleteFunc: c.onStorageClassDelete,
|
||||
|
@ -310,9 +317,9 @@ func NewConfigFactory(
|
|||
|
||||
// Setup cache comparer
|
||||
comparer := &cacheComparer{
|
||||
podLister: podInformer.Lister(),
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
pdbLister: pdbInformer.Lister(),
|
||||
podLister: args.PodInformer.Lister(),
|
||||
nodeLister: args.NodeInformer.Lister(),
|
||||
pdbLister: args.PdbInformer.Lister(),
|
||||
cache: c.schedulerCache,
|
||||
podQueue: c.podQueue,
|
||||
}
|
||||
|
@ -674,7 +681,7 @@ func (c *configFactory) GetSchedulerName() string {
|
|||
return c.schedulerName
|
||||
}
|
||||
|
||||
// GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests.
|
||||
// GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
|
||||
func (c *configFactory) GetClient() clientset.Interface {
|
||||
return c.client
|
||||
}
|
||||
|
@ -1175,6 +1182,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
c.pVCLister,
|
||||
c.alwaysCheckAllPredicates,
|
||||
c.disablePreemption,
|
||||
c.percentageOfNodesToScore,
|
||||
)
|
||||
|
||||
podBackoff := util.CreateDefaultPodBackoff()
|
||||
|
|
|
@ -540,7 +540,7 @@ func TestSkipPodUpdate(t *testing.T) {
|
|||
|
||||
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
return NewConfigFactory(
|
||||
return NewConfigFactory(&ConfigFactoryArgs{
|
||||
v1.DefaultSchedulerName,
|
||||
client,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
|
@ -556,7 +556,8 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
|
|||
hardPodAffinitySymmetricWeight,
|
||||
enableEquivalenceCache,
|
||||
disablePodPreemption,
|
||||
)
|
||||
schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
})
|
||||
}
|
||||
|
||||
type fakeExtender struct {
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/api"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/core"
|
||||
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
|
@ -561,7 +562,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
|
|||
nil,
|
||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||
false,
|
||||
false)
|
||||
false,
|
||||
api.DefaultPercentageOfNodesToScore)
|
||||
bindingChan := make(chan *v1.Binding, 1)
|
||||
errChan := make(chan error, 1)
|
||||
configurator := &FakeConfigurator{
|
||||
|
@ -610,7 +612,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
|||
nil,
|
||||
schedulertesting.FakePersistentVolumeClaimLister{},
|
||||
false,
|
||||
false)
|
||||
false,
|
||||
api.DefaultPercentageOfNodesToScore)
|
||||
bindingChan := make(chan *v1.Binding, 2)
|
||||
configurator := &FakeConfigurator{
|
||||
Config: &Config{
|
||||
|
|
|
@ -94,23 +94,24 @@ func setupScheduler(
|
|||
return
|
||||
}
|
||||
|
||||
schedulerConfigFactory := factory.NewConfigFactory(
|
||||
v1.DefaultSchedulerName,
|
||||
cs,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: v1.DefaultSchedulerName,
|
||||
Client: cs,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
|
||||
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
|
||||
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
|
||||
ServiceInformer: informerFactory.Core().V1().Services(),
|
||||
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
EnableEquivalenceClassCache: true,
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: 100,
|
||||
})
|
||||
|
||||
schedulerConfig, err := schedulerConfigFactory.Create()
|
||||
if err != nil {
|
||||
|
|
|
@ -74,23 +74,24 @@ func createConfiguratorWithPodInformer(
|
|||
podInformer coreinformers.PodInformer,
|
||||
informerFactory informers.SharedInformerFactory,
|
||||
) scheduler.Configurator {
|
||||
return factory.NewConfigFactory(
|
||||
schedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
podInformer,
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
false,
|
||||
)
|
||||
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: schedulerName,
|
||||
Client: clientSet,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: podInformer,
|
||||
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
|
||||
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
|
||||
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
|
||||
ServiceInformer: informerFactory.Core().V1().Services(),
|
||||
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
})
|
||||
}
|
||||
|
||||
// initTestMasterAndScheduler initializes a test environment and creates a master with default
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
@ -97,21 +98,22 @@ func createSchedulerConfigurator(
|
|||
// Enable EnableEquivalenceClassCache for all integration tests.
|
||||
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
|
||||
|
||||
return factory.NewConfigFactory(
|
||||
v1.DefaultSchedulerName,
|
||||
clientSet,
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().PersistentVolumes(),
|
||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
informerFactory.Core().V1().ReplicationControllers(),
|
||||
informerFactory.Apps().V1().ReplicaSets(),
|
||||
informerFactory.Apps().V1().StatefulSets(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
informerFactory.Storage().V1().StorageClasses(),
|
||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
false,
|
||||
)
|
||||
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
|
||||
SchedulerName: v1.DefaultSchedulerName,
|
||||
Client: clientSet,
|
||||
NodeInformer: informerFactory.Core().V1().Nodes(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
|
||||
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
|
||||
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
|
||||
ServiceInformer: informerFactory.Core().V1().Services(),
|
||||
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
|
||||
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||
DisablePreemption: false,
|
||||
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue