Merge pull request #66733 from bsalamat/subset_nodes

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add a feature to the scheduler to score fewer than all nodes in every scheduling cycle

**What this PR does / why we need it**:
Today, the scheduler scores all the nodes in the cluster in every scheduling cycle (every time a posd is attempted). This feature implements a mechanism in the scheduler that allows scoring fewer than all nodes in the cluster. The scheduler stops searching for more nodes once the configured number of feasible nodes are found. This can help improve the scheduler's performance in large clusters (several hundred nodes and larger).
This PR also adds a new structure to the scheduler's cache, called NodeTree, that allows scheduler to iterate over various nodes in different zones in a cluster. This is needed to avoid scoring the same set of nodes in every scheduling cycle.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #66627 

**Special notes for your reviewer**:
This is a large PR, but broken into a few logical commits. Reviewing would be easier if you review by commits.

**Release note**:

```release-note
Add a feature to the scheduler to score fewer than all nodes in every scheduling cycle. This can improve performance of the scheduler in large clusters.
```
pull/8/head
Kubernetes Submit Queue 2018-08-17 17:21:32 -07:00 committed by GitHub
commit 8c1bfeb0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 907 additions and 154 deletions

View File

@ -18,7 +18,6 @@ package options
import (
"fmt"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/apis/componentconfig"

View File

@ -175,6 +175,7 @@ users:
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: 50,
},
},
{
@ -211,6 +212,7 @@ users:
Burst: 100,
ContentType: "application/vnd.kubernetes.protobuf",
},
PercentageOfNodesToScore: 50,
},
},
{

View File

@ -287,23 +287,24 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*scheduler.Con
}
// Set up the configurator which can create schedulers from configs.
configurator := factory.NewConfigFactory(
s.ComponentConfig.SchedulerName,
s.Client,
s.InformerFactory.Core().V1().Nodes(),
s.PodInformer,
s.InformerFactory.Core().V1().PersistentVolumes(),
s.InformerFactory.Core().V1().PersistentVolumeClaims(),
s.InformerFactory.Core().V1().ReplicationControllers(),
s.InformerFactory.Apps().V1().ReplicaSets(),
s.InformerFactory.Apps().V1().StatefulSets(),
s.InformerFactory.Core().V1().Services(),
s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
storageClassInformer,
s.ComponentConfig.HardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
s.ComponentConfig.DisablePreemption,
)
configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: s.ComponentConfig.SchedulerName,
Client: s.Client,
NodeInformer: s.InformerFactory.Core().V1().Nodes(),
PodInformer: s.PodInformer,
PvInformer: s.InformerFactory.Core().V1().PersistentVolumes(),
PvcInformer: s.InformerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: s.InformerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: s.InformerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: s.InformerFactory.Apps().V1().StatefulSets(),
ServiceInformer: s.InformerFactory.Core().V1().Services(),
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
})
source := s.ComponentConfig.AlgorithmSource
var config *scheduler.Config

View File

@ -99,6 +99,15 @@ type KubeSchedulerConfiguration struct {
// DisablePreemption disables the pod preemption feature.
DisablePreemption bool
// PercentageOfNodeToScore is the percentage of all nodes that once found feasible
// for running a pod, the scheduler stops its search for more feasible nodes in
// the cluster. This helps improve scheduler's performance. Scheduler always tries to find
// at least "minFeasibleNodesToFind" feasible nodes no matter what the value of this flag is.
// Example: if the cluster size is 500 nodes and the value of this flag is 30,
// then scheduler stops finding further feasible nodes once it finds 150 feasible ones.
// When the value is 0, default percentage (50%) of the nodes will be scored.
PercentageOfNodesToScore int32
}
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration

View File

@ -278,6 +278,11 @@ func SetDefaults_KubeSchedulerConfiguration(obj *KubeSchedulerConfiguration) {
obj.FailureDomains = kubeletapis.DefaultFailureDomains
}
if obj.PercentageOfNodesToScore == 0 {
// by default, stop finding feasible nodes once the number of feasible nodes is 50% of the cluster.
obj.PercentageOfNodesToScore = 50
}
// Use the default ClientConnectionConfiguration and LeaderElectionConfiguration options
apimachineryconfigv1alpha1.RecommendedDefaultClientConnectionConfiguration(&obj.ClientConnection)
apiserverconfigv1alpha1.RecommendedDefaultLeaderElectionConfiguration(&obj.LeaderElection.LeaderElectionConfiguration)

View File

@ -95,6 +95,16 @@ type KubeSchedulerConfiguration struct {
// DisablePreemption disables the pod preemption feature.
DisablePreemption bool `json:"disablePreemption"`
// PercentageOfNodeToScore specifies what percentage of all nodes should be scored in each
// scheduling cycle. This helps improve scheduler's performance. Scheduler always tries to find
// at least "minFeasibleNodesToFind" feasible nodes no matter what the value of this flag is.
// When this value is below 100%, the scheduler stops finding feasible nodes for running a pod
// once it finds that percentage of feasible nodes of the whole cluster size. For example, if the
// cluster size is 500 nodes and the value of this flag is 30, then scheduler stops finding
// feasible nodes once it finds 150 feasible nodes.
// When the value is 0, default percentage (50%) of the nodes will be scored.
PercentageOfNodesToScore int32 `json:"percentageOfNodesToScore"`
}
// KubeSchedulerLeaderElectionConfiguration expands LeaderElectionConfiguration

View File

@ -936,6 +936,7 @@ func autoConvert_v1alpha1_KubeSchedulerConfiguration_To_componentconfig_KubeSche
out.MetricsBindAddress = in.MetricsBindAddress
out.FailureDomains = in.FailureDomains
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
return nil
}
@ -963,6 +964,7 @@ func autoConvert_componentconfig_KubeSchedulerConfiguration_To_v1alpha1_KubeSche
out.MetricsBindAddress = in.MetricsBindAddress
out.FailureDomains = in.FailureDomains
out.DisablePreemption = in.DisablePreemption
out.PercentageOfNodesToScore = in.PercentageOfNodesToScore
return nil
}

View File

@ -15,6 +15,7 @@ go_test(
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/testing:go_default_library",

View File

@ -849,23 +849,24 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
informerFactory := informers.NewSharedInformerFactory(client, 0)
if _, err := factory.NewConfigFactory(
"some-scheduler-name",
client,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceCache,
false,
).CreateFromConfig(policy); err != nil {
if _, err := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: "some-scheduler-name",
Client: client,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: enableEquivalenceCache,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
}).CreateFromConfig(policy); err != nil {
t.Errorf("%s: Error constructing: %v", v, err)
continue
}

View File

@ -36,6 +36,9 @@ const (
MaxPriority = 10
// MaxWeight defines the max weight value.
MaxWeight = MaxInt / MaxPriority
// DefaultPercentageOfNodesToScore defines the percentage of nodes of all nodes
// that once found feasible, the scheduler stops looking for more nodes.
DefaultPercentageOfNodesToScore = 50
)
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -6,6 +6,7 @@ go_library(
"cache.go",
"interface.go",
"node_info.go",
"node_tree.go",
"util.go",
],
importpath = "k8s.io/kubernetes/pkg/scheduler/cache",
@ -15,6 +16,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -31,11 +33,13 @@ go_test(
srcs = [
"cache_test.go",
"node_info_test.go",
"node_tree_test.go",
"util_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -59,6 +59,7 @@ type schedulerCache struct {
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
nodeTree *NodeTree
pdbs map[string]*policy.PodDisruptionBudget
// A map from image name to its imageState.
imageStates map[string]*imageState
@ -102,6 +103,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
stop: stop,
nodes: make(map[string]*NodeInfo),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
@ -426,6 +428,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
@ -442,6 +445,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
}
@ -462,6 +466,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
delete(cache.nodes, node.Name)
}
cache.nodeTree.RemoveNode(node)
cache.removeNodeImageStates(node)
return nil
}
@ -598,3 +603,7 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
delete(cache.podStates, key)
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

View File

@ -1065,6 +1065,9 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in schedulercache.", node.Name)
}
if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
}
// Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.generation = got.generation
@ -1100,12 +1103,21 @@ func TestNodeOperators(t *testing.T) {
if !reflect.DeepEqual(got, expected) {
t.Errorf("Failed to update node in schedulercache:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
}
// Case 4: the node can not be removed if pods is not empty.
cache.RemoveNode(node)
if _, found := cache.nodes[node.Name]; !found {
t.Errorf("The node %v should not be removed if pods is not empty.", node.Name)
}
// Check nodeTree after remove. The node should be removed from the nodeTree even if there are
// still pods on it.
if cache.nodeTree.NumNodes != 0 || cache.nodeTree.Next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
}
}

View File

@ -125,6 +125,9 @@ type Cache interface {
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
IsUpToDate(n *NodeInfo) bool
// NodeTree returns a node tree structure
NodeTree() *NodeTree
}
// Snapshot is a snapshot of cache state

187
pkg/scheduler/cache/node_tree.go vendored Normal file
View File

@ -0,0 +1,187 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilnode "k8s.io/kubernetes/pkg/util/node"
"github.com/golang/glog"
)
// NodeTree is a tree-like data structure that holds node names in each zone. Zone names are
// keys to "NodeTree.tree" and values of "NodeTree.tree" are arrays of node names.
type NodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
exhaustedZones sets.String // set of zones that all of their nodes are returned by next()
NumNodes int
mu sync.RWMutex
}
// nodeArray is a struct that has nodes that are in a zone.
// We use a slice (as opposed to a set/map) to store the nodes because iterating over the nodes is
// a lot more frequent than searching them by name.
type nodeArray struct {
nodes []string
lastIndex int
}
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
glog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
tree: make(map[string]*nodeArray),
exhaustedZones: sets.NewString(),
}
for _, n := range nodes {
nt.AddNode(n)
}
return nt
}
// AddNode adds a node and its corresponding zone to the tree. If the zone already exists, the node
// is added to the array of nodes in that zone.
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
glog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
glog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
return nt.removeNode(n)
}
func (nt *NodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
if nodeName == n.Name {
na.nodes = append(na.nodes[:i], na.nodes[i+1:]...)
if len(na.nodes) == 0 {
nt.removeZone(zone)
}
glog.V(5).Infof("Removed node %v in group %v from NodeTree", n.Name, zone)
nt.NumNodes--
return nil
}
}
}
glog.Errorf("Node %v in group %v was not found", n.Name, zone)
return fmt.Errorf("node %v in group %v was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
// This function must be called while writer locks are hold.
func (nt *NodeTree) removeZone(zone string) {
delete(nt.tree, zone)
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
}
}
}
// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
oldZone = utilnode.GetZoneKey(old)
}
newZone := utilnode.GetZoneKey(new)
// If the zone ID of the node has not changed, we don't need to do anything. Name of the node
// cannot be changed in an update.
if oldZone == newZone {
return
}
nt.mu.Lock()
defer nt.mu.Unlock()
nt.removeNode(old) // No error checking. We ignore whether the old node exists or not.
nt.addNode(new)
}
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.exhaustedZones = sets.NewString()
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the set of exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
nt.exhaustedZones.Insert(zone)
if len(nt.exhaustedZones) == len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}

441
pkg/scheduler/cache/node_tree_test.go vendored Normal file
View File

@ -0,0 +1,441 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
)
var allNodes = []*v1.Node{
// Node 0: a node without any region-zone label
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
},
// Node 1: a node with region label only
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
},
},
},
// Node 2: a node with zone label only
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-2",
Labels: map[string]string{
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 3: a node with proper region and zone labels
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-3",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 4: a node with proper region and zone labels
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-4",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 5: a node with proper region and zone labels in a different zone, same region as above
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-5",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-3",
},
},
},
// Node 6: a node with proper region and zone labels in a new region and zone
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-6",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 7: a node with proper region and zone labels in a region and zone as node-6
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-7",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
// Node 8: a node with proper region and zone labels in a region and zone as node-6
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-8",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-2",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
}}
func verifyNodeTree(t *testing.T, nt *NodeTree, expectedTree map[string]*nodeArray) {
expectedNumNodes := int(0)
for _, na := range expectedTree {
expectedNumNodes += len(na.nodes)
}
if nt.NumNodes != expectedNumNodes {
t.Errorf("unexpected NodeTree.numNodes. Expected: %v, Got: %v", expectedNumNodes, nt.NumNodes)
}
if !reflect.DeepEqual(nt.tree, expectedTree) {
t.Errorf("The node tree is not the same as expected. Expected: %v, Got: %v", expectedTree, nt.tree)
}
if len(nt.zones) != len(expectedTree) {
t.Errorf("Number of zones in NodeTree.zones is not expected. Expected: %v, Got: %v", len(expectedTree), len(nt.zones))
}
for _, z := range nt.zones {
if _, ok := expectedTree[z]; !ok {
t.Errorf("zone %v is not expected to exist in NodeTree.zones", z)
}
}
}
func TestNodeTree_AddNode(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
expectedTree map[string]*nodeArray
}{
{
name: "single node no labels",
nodesToAdd: allNodes[:1],
expectedTree: map[string]*nodeArray{"": {[]string{"node-0"}, 0}},
},
{
name: "mix of nodes with and without proper labels",
nodesToAdd: allNodes[:4],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3"}, 0},
},
},
{
name: "mix of nodes with and without proper labels and some zones with multiple nodes",
nodesToAdd: allNodes[:7],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(nil)
for _, n := range test.nodesToAdd {
nt.AddNode(n)
}
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_RemoveNode(t *testing.T) {
tests := []struct {
name string
existingNodes []*v1.Node
nodesToRemove []*v1.Node
expectedTree map[string]*nodeArray
expectError bool
}{
{
name: "remove a single node with no labels",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[:1],
expectedTree: map[string]*nodeArray{
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "remove a few nodes including one from a zone with multiple nodes",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[1:4],
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:zone-2": {[]string{"node-4"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "remove all nodes",
existingNodes: allNodes[:7],
nodesToRemove: allNodes[:7],
expectedTree: map[string]*nodeArray{},
},
{
name: "remove non-existing node",
existingNodes: nil,
nodesToRemove: allNodes[:5],
expectedTree: map[string]*nodeArray{},
expectError: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.existingNodes)
for _, n := range test.nodesToRemove {
err := nt.RemoveNode(n)
if test.expectError == (err == nil) {
t.Errorf("unexpected returned error value: %v", err)
}
}
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_UpdateNode(t *testing.T) {
tests := []struct {
name string
existingNodes []*v1.Node
nodeToUpdate *v1.Node
expectedTree map[string]*nodeArray
}{
{
name: "update a node without label",
existingNodes: allNodes[:7],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"region-1:\x00:": {[]string{"node-1"}, 0},
":\x00:zone-2": {[]string{"node-2"}, 0},
"region-1:\x00:zone-2": {[]string{"node-3", "node-4", "node-0"}, 0},
"region-1:\x00:zone-3": {[]string{"node-5"}, 0},
"region-2:\x00:zone-2": {[]string{"node-6"}, 0},
},
},
{
name: "update the only existing node",
existingNodes: allNodes[:1],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"region-1:\x00:zone-2": {[]string{"node-0"}, 0},
},
},
{
name: "update non-existing node",
existingNodes: allNodes[:1],
nodeToUpdate: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-new",
Labels: map[string]string{
kubeletapis.LabelZoneRegion: "region-1",
kubeletapis.LabelZoneFailureDomain: "zone-2",
},
},
},
expectedTree: map[string]*nodeArray{
"": {[]string{"node-0"}, 0},
"region-1:\x00:zone-2": {[]string{"node-new"}, 0},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.existingNodes)
var oldNode *v1.Node
for _, n := range allNodes {
if n.Name == test.nodeToUpdate.Name {
oldNode = n
break
}
}
if oldNode == nil {
oldNode = &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "nonexisting-node"}}
}
nt.UpdateNode(oldNode, test.nodeToUpdate)
verifyNodeTree(t, nt, test.expectedTree)
})
}
}
func TestNodeTree_Next(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
numRuns int // number of times to run Next()
expectedOutput []string
}{
{
name: "empty tree",
nodesToAdd: nil,
numRuns: 2,
expectedOutput: []string{"", ""},
},
{
name: "should go back to the first node after finishing a round",
nodesToAdd: allNodes[:1],
numRuns: 2,
expectedOutput: []string{"node-0", "node-0"},
},
{
name: "should go back to the first node after going over all nodes",
nodesToAdd: allNodes[:4],
numRuns: 5,
expectedOutput: []string{"node-0", "node-1", "node-2", "node-3", "node-0"},
},
{
name: "should go to all zones before going to the second nodes in the same zone",
nodesToAdd: allNodes[:9],
numRuns: 11,
expectedOutput: []string{"node-0", "node-1", "node-2", "node-3", "node-5", "node-6", "node-4", "node-7", "node-8", "node-0", "node-1"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(test.nodesToAdd)
var output []string
for i := 0; i < test.numRuns; i++ {
output = append(output, nt.Next())
}
if !reflect.DeepEqual(output, test.expectedOutput) {
t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output)
}
})
}
}
func TestNodeTreeMultiOperations(t *testing.T) {
tests := []struct {
name string
nodesToAdd []*v1.Node
nodesToRemove []*v1.Node
operations []string
expectedOutput []string
}{
{
name: "add and remove all nodes between two Next operations",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "add", "remove", "remove", "remove", "next"},
expectedOutput: []string{"node-2", ""},
},
{
name: "add and remove some nodes between two Next operations",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "add", "remove", "remove", "next"},
expectedOutput: []string{"node-2", "node-4"},
},
{
name: "remove nodes already iterated on and add new nodes",
nodesToAdd: allNodes[2:9],
nodesToRemove: allNodes[2:9],
operations: []string{"add", "add", "next", "next", "add", "remove", "remove", "next"},
expectedOutput: []string{"node-2", "node-3", "node-4"},
},
{
name: "add more nodes to an exhausted zone",
nodesToAdd: append(allNodes[4:9], allNodes[3]),
nodesToRemove: nil,
operations: []string{"add", "add", "add", "add", "add", "next", "next", "next", "next", "add", "next", "next", "next"},
expectedOutput: []string{"node-4", "node-5", "node-6", "node-7", "node-3", "node-8", "node-4"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nt := newNodeTree(nil)
addIndex := 0
removeIndex := 0
var output []string
for _, op := range test.operations {
switch op {
case "add":
if addIndex >= len(test.nodesToAdd) {
t.Error("more add operations than nodesToAdd")
} else {
nt.AddNode(test.nodesToAdd[addIndex])
addIndex++
}
case "remove":
if removeIndex >= len(test.nodesToRemove) {
t.Error("more remove operations than nodesToRemove")
} else {
nt.RemoveNode(test.nodesToRemove[removeIndex])
removeIndex++
}
case "next":
output = append(output, nt.Next())
default:
t.Errorf("unknow operation: %v", op)
}
}
if !reflect.DeepEqual(output, test.expectedOutput) {
t.Errorf("unexpected output. Expected: %v, Got: %v", test.expectedOutput, output)
}
})
}
}

View File

@ -514,7 +514,8 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
false,
false)
false,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {

View File

@ -45,6 +45,14 @@ import (
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
)
const (
// minFeasibleNodesToFind is the minimum number of nodes that would be scored
// in each scheduling cycle. This is a semi-arbitrary value to ensure that a
// certain minimum of nodes are checked for feasibility. This in turn helps
// ensure a minimum level of spreading.
minFeasibleNodesToFind = 100
)
// FailedPredicateMap declares a map[string][]algorithm.PredicateFailureReason type.
type FailedPredicateMap map[string][]algorithm.PredicateFailureReason
@ -99,6 +107,7 @@ type genericScheduler struct {
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
disablePreemption bool
percentageOfNodesToScore int32
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
@ -336,6 +345,20 @@ func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName s
return lowerPriorityPods
}
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) int32 {
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore <= 0 ||
g.percentageOfNodesToScore >= 100 {
return numAllNodes
}
numNodes := numAllNodes * g.percentageOfNodesToScore / 100
if numNodes < minFeasibleNodesToFind {
return minFeasibleNodesToFind
}
return numNodes
}
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
@ -345,9 +368,12 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes)
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, len(nodes))
filtered = make([]*v1.Node, 2*numNodesToFind)
errs := errors.MessageCountMap{}
var predicateResultLock sync.Mutex
var filteredLen int32
@ -364,7 +390,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := nodes[i].Name
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
}
@ -386,14 +412,25 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
return
}
if fits {
filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
filtered[atomic.AddInt32(&filteredLen, 1)-1] = g.cachedNodeInfoMap[nodeName].Node()
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
workqueue.Parallelize(16, len(nodes), checkNode)
numNodesProcessed := int32(0)
for numNodesProcessed < allNodes {
numNodesToProcess := allNodes - numNodesProcessed
if numNodesToProcess > numNodesToFind {
numNodesToProcess = numNodesToFind
}
workqueue.Parallelize(16, int(numNodesToProcess), checkNode)
if filteredLen >= numNodesToFind {
break
}
numNodesProcessed += numNodesToProcess
}
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
@ -1092,6 +1129,7 @@ func NewGenericScheduler(
pvcLister corelisters.PersistentVolumeClaimLister,
alwaysCheckAllPredicates bool,
disablePreemption bool,
percentageOfNodesToScore int32,
) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
@ -1107,5 +1145,6 @@ func NewGenericScheduler(
pvcLister: pvcLister,
alwaysCheckAllPredicates: alwaysCheckAllPredicates,
disablePreemption: disablePreemption,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}

View File

@ -426,7 +426,8 @@ func TestGenericScheduler(t *testing.T) {
nil,
pvcLister,
test.alwaysCheckAllPredicates,
false)
false,
schedulerapi.DefaultPercentageOfNodesToScore)
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if !reflect.DeepEqual(err, test.wErr) {
@ -456,7 +457,8 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, nil, false, false)
nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
return s.(*genericScheduler)
@ -1362,7 +1364,8 @@ func TestPreempt(t *testing.T) {
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
false,
false)
false,
schedulerapi.DefaultPercentageOfNodesToScore)
// Call Preempt and check the expected results.
node, victims, _, err := scheduler.Preempt(test.pod, schedulertesting.FakeNodeLister(makeNodeList(nodeNames)), error(&FitError{Pod: test.pod, FailedPredicates: failedPredMap}))
if err != nil {
@ -1481,7 +1484,8 @@ func TestCacheInvalidationRace(t *testing.T) {
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
nil, nil, pvcLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))

View File

@ -136,59 +136,66 @@ type configFactory struct {
// Disable pod preemption or not.
disablePreemption bool
// percentageOfNodesToScore specifies percentage of all nodes to score in each scheduling cycle.
percentageOfNodesToScore int32
}
// ConfigFactoryArgs is a set arguments passed to NewConfigFactory.
type ConfigFactoryArgs struct {
SchedulerName string
Client clientset.Interface
NodeInformer coreinformers.NodeInformer
PodInformer coreinformers.PodInformer
PvInformer coreinformers.PersistentVolumeInformer
PvcInformer coreinformers.PersistentVolumeClaimInformer
ReplicationControllerInformer coreinformers.ReplicationControllerInformer
ReplicaSetInformer appsinformers.ReplicaSetInformer
StatefulSetInformer appsinformers.StatefulSetInformer
ServiceInformer coreinformers.ServiceInformer
PdbInformer policyinformers.PodDisruptionBudgetInformer
StorageClassInformer storageinformers.StorageClassInformer
HardPodAffinitySymmetricWeight int32
EnableEquivalenceClassCache bool
DisablePreemption bool
PercentageOfNodesToScore int32
}
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(
schedulerName string,
client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pvInformer coreinformers.PersistentVolumeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
replicationControllerInformer coreinformers.ReplicationControllerInformer,
replicaSetInformer appsinformers.ReplicaSetInformer,
statefulSetInformer appsinformers.StatefulSetInformer,
serviceInformer coreinformers.ServiceInformer,
pdbInformer policyinformers.PodDisruptionBudgetInformer,
storageClassInformer storageinformers.StorageClassInformer,
hardPodAffinitySymmetricWeight int32,
enableEquivalenceClassCache bool,
disablePreemption bool,
) scheduler.Configurator {
func NewConfigFactory(args *ConfigFactoryArgs) scheduler.Configurator {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
// storageClassInformer is only enabled through VolumeScheduling feature gate
var storageClassLister storagelisters.StorageClassLister
if storageClassInformer != nil {
storageClassLister = storageClassInformer.Lister()
if args.StorageClassInformer != nil {
storageClassLister = args.StorageClassInformer.Lister()
}
c := &configFactory{
client: client,
client: args.Client,
podLister: schedulerCache,
podQueue: core.NewSchedulingQueue(),
pVLister: pvInformer.Lister(),
pVCLister: pvcInformer.Lister(),
serviceLister: serviceInformer.Lister(),
controllerLister: replicationControllerInformer.Lister(),
replicaSetLister: replicaSetInformer.Lister(),
statefulSetLister: statefulSetInformer.Lister(),
pdbLister: pdbInformer.Lister(),
pVLister: args.PvInformer.Lister(),
pVCLister: args.PvcInformer.Lister(),
serviceLister: args.ServiceInformer.Lister(),
controllerLister: args.ReplicationControllerInformer.Lister(),
replicaSetLister: args.ReplicaSetInformer.Lister(),
statefulSetLister: args.StatefulSetInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
storageClassLister: storageClassLister,
schedulerCache: schedulerCache,
StopEverything: stopEverything,
schedulerName: schedulerName,
hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: enableEquivalenceClassCache,
disablePreemption: disablePreemption,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
// scheduled pod cache
podInformer.Informer().AddEventHandler(
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
@ -213,15 +220,15 @@ func NewConfigFactory(
},
)
// unscheduled pod queue
podInformer.Informer().AddEventHandler(
args.PodInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return unassignedNonTerminatedPod(t) && responsibleForPod(t, schedulerName)
return unassignedNonTerminatedPod(t) && responsibleForPod(t, args.SchedulerName)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, schedulerName)
return unassignedNonTerminatedPod(pod) && responsibleForPod(pod, args.SchedulerName)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
return false
@ -239,29 +246,29 @@ func NewConfigFactory(
)
// ScheduledPodLister is something we provide to plug-in functions that
// they may need to call.
c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
nodeInformer.Informer().AddEventHandler(
args.NodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
)
c.nodeLister = nodeInformer.Lister()
c.nodeLister = args.NodeInformer.Lister()
pdbInformer.Informer().AddEventHandler(
args.PdbInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addPDBToCache,
UpdateFunc: c.updatePDBInCache,
DeleteFunc: c.deletePDBFromCache,
},
)
c.pdbLister = pdbInformer.Lister()
c.pdbLister = args.PdbInformer.Lister()
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
pvInformer.Informer().AddEventHandler(
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
@ -269,38 +276,38 @@ func NewConfigFactory(
DeleteFunc: c.onPvDelete,
},
)
c.pVLister = pvInformer.Lister()
c.pVLister = args.PvInformer.Lister()
// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
pvcInformer.Informer().AddEventHandler(
args.PvcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
DeleteFunc: c.onPvcDelete,
},
)
c.pVCLister = pvcInformer.Lister()
c.pVCLister = args.PvcInformer.Lister()
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
serviceInformer.Informer().AddEventHandler(
args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
UpdateFunc: c.onServiceUpdate,
DeleteFunc: c.onServiceDelete,
},
)
c.serviceLister = serviceInformer.Lister()
c.serviceLister = args.ServiceInformer.Lister()
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
// it only make sense when pod is scheduled or deleted
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer)
storageClassInformer.Informer().AddEventHandler(
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onStorageClassAdd,
DeleteFunc: c.onStorageClassDelete,
@ -310,9 +317,9 @@ func NewConfigFactory(
// Setup cache comparer
comparer := &cacheComparer{
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
pdbLister: pdbInformer.Lister(),
podLister: args.PodInformer.Lister(),
nodeLister: args.NodeInformer.Lister(),
pdbLister: args.PdbInformer.Lister(),
cache: c.schedulerCache,
podQueue: c.podQueue,
}
@ -674,7 +681,7 @@ func (c *configFactory) GetSchedulerName() string {
return c.schedulerName
}
// GetClient provides a kubernetes client, mostly internal use, but may also be called by mock-tests.
// GetClient provides a kubernetes Client, mostly internal use, but may also be called by mock-tests.
func (c *configFactory) GetClient() clientset.Interface {
return c.client
}
@ -1175,6 +1182,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
c.pVCLister,
c.alwaysCheckAllPredicates,
c.disablePreemption,
c.percentageOfNodesToScore,
)
podBackoff := util.CreateDefaultPodBackoff()

View File

@ -540,7 +540,7 @@ func TestSkipPodUpdate(t *testing.T) {
func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeight int32) scheduler.Configurator {
informerFactory := informers.NewSharedInformerFactory(client, 0)
return NewConfigFactory(
return NewConfigFactory(&ConfigFactoryArgs{
v1.DefaultSchedulerName,
client,
informerFactory.Core().V1().Nodes(),
@ -556,7 +556,8 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
hardPodAffinitySymmetricWeight,
enableEquivalenceCache,
disablePodPreemption,
)
schedulerapi.DefaultPercentageOfNodesToScore,
})
}
type fakeExtender struct {

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
@ -561,7 +562,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
false,
false)
false,
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 1)
errChan := make(chan error, 1)
configurator := &FakeConfigurator{
@ -610,7 +612,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
nil,
schedulertesting.FakePersistentVolumeClaimLister{},
false,
false)
false,
api.DefaultPercentageOfNodesToScore)
bindingChan := make(chan *v1.Binding, 2)
configurator := &FakeConfigurator{
Config: &Config{

View File

@ -106,5 +106,8 @@ func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{}
}
// IsUpToDate is a fake mthod for testing
// IsUpToDate is a fake method for testing
func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true }
// NodeTree is a fake method for testing.
func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil }

View File

@ -94,23 +94,24 @@ func setupScheduler(
return
}
schedulerConfigFactory := factory.NewConfigFactory(
v1.DefaultSchedulerName,
cs,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
true,
false,
)
schedulerConfigFactory := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: v1.DefaultSchedulerName,
Client: cs,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: true,
DisablePreemption: false,
PercentageOfNodesToScore: 100,
})
schedulerConfig, err := schedulerConfigFactory.Create()
if err != nil {

View File

@ -74,23 +74,24 @@ func createConfiguratorWithPodInformer(
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
) scheduler.Configurator {
return factory.NewConfigFactory(
schedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
podInformer,
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
false,
)
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: schedulerName,
Client: clientSet,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: podInformer,
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
})
}
// initTestMasterAndScheduler initializes a test environment and creates a master with default

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/cloudprovider/providers/gce/cloud:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
"k8s.io/kubernetes/test/integration/framework"
)
@ -97,21 +98,22 @@ func createSchedulerConfigurator(
// Enable EnableEquivalenceClassCache for all integration tests.
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
return factory.NewConfigFactory(
v1.DefaultSchedulerName,
clientSet,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Apps().V1().ReplicaSets(),
informerFactory.Apps().V1().StatefulSets(),
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
v1.DefaultHardPodAffinitySymmetricWeight,
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
false,
)
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: v1.DefaultSchedulerName,
Client: clientSet,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: informerFactory.Core().V1().Pods(),
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
})
}