Eclass Task 1: clean up old equiv class code

Co-authored-by: Harry Zhang <resouer@gmail.com>
Co-authored-by: Wang Qingcan <wangqingcan@baidu.com>
pull/564/head
wangqingcan 2018-09-29 19:30:47 +08:00 committed by Zhang, Lei
parent e2be7c91d9
commit 1081e919e3
20 changed files with 27 additions and 2056 deletions

View File

@ -187,7 +187,6 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
@ -356,7 +355,6 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Confi
PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: s.ComponentConfig.DisablePreemption,
PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore,
BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds,

View File

@ -124,12 +124,6 @@ const (
// Add priority to pods. Priority affects scheduling and preemption of pods.
PodPriority utilfeature.Feature = "PodPriority"
// owner: @resouer
// alpha: v1.8
//
// Enable equivalence class cache for scheduler.
EnableEquivalenceClassCache utilfeature.Feature = "EnableEquivalenceClassCache"
// owner: @k82cn
// beta: v1.12
//
@ -418,7 +412,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
DebugContainers: {Default: false, PreRelease: utilfeature.Alpha},
PodShareProcessNamespace: {Default: true, PreRelease: utilfeature.Beta},
PodPriority: {Default: true, PreRelease: utilfeature.Beta},
EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha},
TaintNodesByCondition: {Default: true, PreRelease: utilfeature.Beta},
MountPropagation: {Default: true, PreRelease: utilfeature.GA},
QOSReserved: {Default: false, PreRelease: utilfeature.Alpha},

View File

@ -36,8 +36,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/factory"
)
const enableEquivalenceCache = true
func TestCompatibility_v1_Scheduler(t *testing.T) {
// Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases
schedulerFiles := map[string]struct {
@ -987,7 +985,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: enableEquivalenceCache,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
}).CreateFromConfig(policy); err != nil {

View File

@ -49,13 +49,6 @@ func init() {
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
// IMPORTANT NOTES for predicate developers:
// We are using cached predicate result for pods belonging to the same equivalence class.
// So when implementing a new predicate, you are expected to check whether the result
// of your predicate function can be affected by related API object change (ADD/DELETE/UPDATE).
// If yes, you are expected to invalidate the cached predicate result for related API object change.
// For example:
// https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422
// Registers predicates and priorities that are not enabled by default, but user can pick when creating their
// own set of priorities/predicates.

View File

@ -1,49 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["eqivalence.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/hash:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["eqivalence_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,443 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package equivalence defines Pod equivalence classes and the equivalence class
// cache.
package equivalence
import (
"fmt"
"hash/fnv"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// nodeMap stores a *NodeCache for each node.
type nodeMap map[string]*NodeCache
// Cache is a thread safe map saves and reuses the output of predicate functions,
// it uses node name as key to access those cached results.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
type Cache struct {
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
// the reality is lock contention in first level cache is rare.
mu sync.RWMutex
nodeToCache nodeMap
predicateIDMap map[string]int
}
// NewCache create an empty equiv class cache.
func NewCache(predicates []string) *Cache {
predicateIDMap := make(map[string]int, len(predicates))
for id, predicate := range predicates {
predicateIDMap[predicate] = id
}
return &Cache{
nodeToCache: make(nodeMap),
predicateIDMap: predicateIDMap,
}
}
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
// get or update the cached results. An appropriate Invalidate* function should
// be called when some predicate results are no longer valid.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
//
// NodeCache objects are thread safe within the context of NodeCache,
type NodeCache struct {
mu sync.RWMutex
cache predicateMap
// generation is current generation of node cache, incremented on node
// invalidation.
generation uint64
// snapshotGeneration saves snapshot of generation of node cache.
snapshotGeneration uint64
// predicateGenerations stores generation numbers for predicates, incremented on
// predicate invalidation. Created on first update. Use 0 if does not
// exist.
predicateGenerations []uint64
// snapshotPredicateGenerations saves snapshot of generation numbers for predicates.
snapshotPredicateGenerations []uint64
}
// newNodeCache returns an empty NodeCache.
func newNodeCache(n int) *NodeCache {
return &NodeCache{
cache: make(predicateMap, n),
predicateGenerations: make([]uint64, n),
snapshotPredicateGenerations: make([]uint64, n),
}
}
// Snapshot snapshots current generations of cache.
// NOTE: We snapshot generations of all node caches before using it and these
// operations are serialized, we can save snapshot as member of node cache
// itself.
func (c *Cache) Snapshot() {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodeToCache {
n.mu.Lock()
// snapshot predicate generations
copy(n.snapshotPredicateGenerations, n.predicateGenerations)
// snapshot node generation
n.snapshotGeneration = n.generation
n.mu.Unlock()
}
return
}
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
// it creates the NodeCache and returns it.
// The boolean flag is true if the value was loaded, false if created.
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
c.mu.Lock()
defer c.mu.Unlock()
if nodeCache, exists = c.nodeToCache[name]; !exists {
nodeCache = newNodeCache(len(c.predicateIDMap))
c.nodeToCache[name] = nodeCache
}
return
}
// LoadNodeCache returns the existing NodeCache for given node, nil if not
// present.
func (c *Cache) LoadNodeCache(node string) *NodeCache {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToCache[node]
}
func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int {
predicateIDs := make([]int, 0, len(predicateKeys))
for predicateKey := range predicateKeys {
if id, ok := c.predicateIDMap[predicateKey]; ok {
predicateIDs = append(predicateIDs, id)
} else {
klog.Errorf("predicate key %q not found", predicateKey)
}
}
return predicateIDs
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
predicateIDs := c.predicateKeysToIDs(predicateKeys)
for _, n := range c.nodeToCache {
n.invalidatePreds(predicateIDs)
}
klog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
predicateIDs := c.predicateKeysToIDs(predicateKeys)
if n, ok := c.nodeToCache[nodeName]; ok {
n.invalidatePreds(predicateIDs)
}
klog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.mu.RLock()
defer c.mu.RUnlock()
if node, ok := c.nodeToCache[nodeName]; ok {
node.invalidate()
}
klog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case
// TODO: This does not belong with the equivalence cache implementation.
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecutioc.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString(predicates.GeneralPred)
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
} else {
// We do not consider CSI volumes here because CSI
// volumes can not be used inline.
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if vol.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
}
}
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// Class represents a set of pods which are equivalent from the perspective of
// the scheduler. i.e. the scheduler would make the same decision for any pod
// from the same class.
type Class struct {
// Equivalence hash
hash uint64
}
// NewClass returns the equivalence class for a given Pod. The returned Class
// objects will be equal for two Pods in the same class. nil values should not
// be considered equal to each other.
//
// NOTE: Make sure to compare types of Class and not *Class.
// TODO(misterikkit): Return error instead of nil *Class.
func NewClass(pod *v1.Pod) *Class {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &Class{
hash: uint64(hash.Sum32()),
}
}
return nil
}
// predicateMap stores resultMaps with predicate ID as the key.
type predicateMap []resultMap
// resultMap stores PredicateResult with pod equivalence hash as the key.
type resultMap map[uint64]predicateResult
// predicateResult stores the output of a FitPredicate.
type predicateResult struct {
Fit bool
FailReasons []algorithm.PredicateFailureReason
}
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
// run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if generation does not match live version.
func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
predicateID int,
pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulernodeinfo.NodeInfo,
equivClass *Class,
) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
}
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash)
if ok {
return result.Fit, result.FailReasons, nil
}
fit, reasons, err := pred(pod, meta, nodeInfo)
if err != nil {
return fit, reasons, err
}
n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo)
return fit, reasons, nil
}
// updateResult updates the cached result of a predicate.
func (n *NodeCache) updateResult(
podName, predicateKey string,
predicateID int,
fit bool,
reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64,
nodeInfo *schedulernodeinfo.NodeInfo,
) {
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
return
}
predicateItem := predicateResult{
Fit: fit,
FailReasons: reasons,
}
n.mu.Lock()
defer n.mu.Unlock()
if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) {
// Generation of node or predicate has been updated since we last took
// a snapshot, this indicates that we received a invalidation request
// during this time. Cache may be stale, skip update.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
// If cached predicate map already exists, just update the predicate by key
if predicates := n.cache[predicateID]; predicates != nil {
// maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem
} else {
n.cache[predicateID] =
resultMap{
equivalenceHash: predicateItem,
}
}
n.predicateGenerations[predicateID]++
klog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
}
// lookupResult returns cached predicate results and a bool saying whether a
// cache entry was found.
func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string,
predicateID int,
equivalenceHash uint64,
) (value predicateResult, ok bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, ok = n.cache[predicateID][equivalenceHash]
if ok {
metrics.EquivalenceCacheHits.Inc()
} else {
metrics.EquivalenceCacheMisses.Inc()
}
return value, ok
}
// invalidatePreds deletes cached predicates by given IDs.
func (n *NodeCache) invalidatePreds(predicateIDs []int) {
n.mu.Lock()
defer n.mu.Unlock()
for _, predicateID := range predicateIDs {
n.cache[predicateID] = nil
n.predicateGenerations[predicateID]++
}
}
// invalidate invalidates node cache.
func (n *NodeCache) invalidate() {
n.mu.Lock()
defer n.mu.Unlock()
n.cache = make(predicateMap, len(n.cache))
n.generation++
}
// equivalencePod is the set of pod attributes which must match for two pods to
// be considered equivalent for scheduling purposes. For correctness, this must
// include any Pod field which is used by a FitPredicate.
//
// NOTE: For equivalence hash to be formally correct, lists and maps in the
// equivalencePod should be normalized. (e.g. by sorting them) However, the vast
// majority of equivalent pod classes are expected to be created from a single
// pod template, so they will all have the same ordering.
type equivalencePod struct {
Namespace *string
Labels map[string]string
Affinity *v1.Affinity
Containers []v1.Container // See note about ordering
InitContainers []v1.Container // See note about ordering
NodeName *string
NodeSelector map[string]string
Tolerations []v1.Toleration
Volumes []v1.Volume // See note about ordering
}
// getEquivalencePod returns a normalized representation of a pod so that two
// "equivalent" pods will hash to the same value.
func getEquivalencePod(pod *v1.Pod) *equivalencePod {
ep := &equivalencePod{
Namespace: &pod.Namespace,
Labels: pod.Labels,
Affinity: pod.Spec.Affinity,
Containers: pod.Spec.Containers,
InitContainers: pod.Spec.InitContainers,
NodeName: &pod.Spec.NodeName,
NodeSelector: pod.Spec.NodeSelector,
Tolerations: pod.Spec.Tolerations,
Volumes: pod.Spec.Volumes,
}
// DeepHashObject considers nil and empty slices to be different. Normalize them.
if len(ep.Containers) == 0 {
ep.Containers = nil
}
if len(ep.InitContainers) == 0 {
ep.InitContainers = nil
}
if len(ep.Tolerations) == 0 {
ep.Tolerations = nil
}
if len(ep.Volumes) == 0 {
ep.Volumes = nil
}
// Normalize empty maps also.
if len(ep.Labels) == 0 {
ep.Labels = nil
}
if len(ep.NodeSelector) == 0 {
ep.NodeSelector = nil
}
// TODO(misterikkit): Also normalize nested maps and slices.
return ep
}

View File

@ -1,803 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package equivalence
import (
"errors"
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// makeBasicPod returns a Pod object with many of the fields populated.
func makeBasicPod(name string) *v1.Pod {
isController := true
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "test-ns",
Labels: map[string]string{"app": "web", "env": "prod"},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "ReplicationController",
Name: "rc",
UID: "123",
Controller: &isController,
},
},
},
Spec: v1.PodSpec{
Affinity: &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "failure-domain.beta.kubernetes.io/zone",
Operator: "Exists",
},
},
},
},
},
},
PodAffinity: &v1.PodAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "db"}},
TopologyKey: "kubernetes.io/hostname",
},
},
},
PodAntiAffinity: &v1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "web"}},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
InitContainers: []v1.Container{
{
Name: "init-pause",
Image: "gcr.io/google_containers/pause",
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": resource.MustParse("1"),
"mem": resource.MustParse("100Mi"),
},
},
},
},
Containers: []v1.Container{
{
Name: "pause",
Image: "gcr.io/google_containers/pause",
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": resource.MustParse("1"),
"mem": resource.MustParse("100Mi"),
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: "nfs",
MountPath: "/srv/data",
},
},
},
},
NodeSelector: map[string]string{"node-type": "awesome"},
Tolerations: []v1.Toleration{
{
Effect: "NoSchedule",
Key: "experimental",
Operator: "Exists",
},
},
Volumes: []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "someEBSVol1",
},
},
},
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "someEBSVol2",
},
},
},
{
Name: "nfs",
VolumeSource: v1.VolumeSource{
NFS: &v1.NFSVolumeSource{
Server: "nfs.corp.example.com",
},
},
},
},
},
}
}
type predicateItemType struct {
fit bool
reasons []algorithm.PredicateFailureReason
}
// mockPredicate provides an algorithm.FitPredicate with pre-set return values.
type mockPredicate struct {
fit bool
reasons []algorithm.PredicateFailureReason
err error
callCount int
}
func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
p.callCount++
return p.fit, p.reasons, p.err
}
func TestRunPredicate(t *testing.T) {
tests := []struct {
name string
pred mockPredicate
expectFit, expectCacheHit, expectCacheWrite bool
expectedReasons []algorithm.PredicateFailureReason
expectedError string
}{
{
name: "pod fits/cache hit",
pred: mockPredicate{},
expectFit: true,
expectCacheHit: true,
expectCacheWrite: false,
},
{
name: "pod fits/cache miss",
pred: mockPredicate{fit: true},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: true,
},
{
name: "pod doesn't fit/cache miss",
pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: true,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "pod doesn't fit/cache hit",
pred: mockPredicate{},
expectFit: false,
expectCacheHit: true,
expectCacheWrite: false,
expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate},
},
{
name: "predicate error",
pred: mockPredicate{err: errors.New("This is expected")},
expectFit: false,
expectCacheHit: false,
expectCacheWrite: false,
expectedError: "This is expected",
},
}
predicatesOrdering := []string{"testPredicate"}
predicateID := 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := schedulernodeinfo.NewNodeInfo()
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}
node.SetNode(testNode)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
// Initialize and populate equivalence class cache.
ecache := NewCache(predicatesOrdering)
ecache.Snapshot()
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
equivClass := NewClass(pod)
if test.expectCacheHit {
nodeCache.updateResult(pod.Name, "testPredicate", predicateID, test.expectFit, test.expectedReasons, equivClass.hash, node)
}
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", predicateID, pod, meta, node, equivClass)
if err != nil {
if err.Error() != test.expectedError {
t.Errorf("Expected error %v but got %v", test.expectedError, err)
}
} else if len(test.expectedError) > 0 {
t.Errorf("Expected error %v but got nil", test.expectedError)
}
if fit && !test.expectFit {
t.Errorf("pod should not fit")
}
if !fit && test.expectFit {
t.Errorf("pod should fit")
}
if len(reasons) != len(test.expectedReasons) {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
} else {
for i, reason := range reasons {
if reason != test.expectedReasons[i] {
t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons)
break
}
}
}
if test.expectCacheHit && test.pred.callCount != 0 {
t.Errorf("Predicate should not be called")
}
if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called")
}
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", predicateID, equivClass.hash)
if !ok && test.expectCacheWrite {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && test.expectCacheWrite && !ok {
t.Errorf("Cache write should happen")
}
if !test.expectCacheHit && !test.expectCacheWrite && ok {
t.Errorf("Cache write should not happen")
}
})
}
}
func TestUpdateResult(t *testing.T) {
predicatesOrdering := []string{"GeneralPredicates"}
tests := []struct {
name string
pod string
predicateKey string
predicateID int
nodeName string
fit bool
reasons []algorithm.PredicateFailureReason
equivalenceHash uint64
expectPredicateMap bool
expectCacheItem predicateResult
}{
{
name: "test 1",
pod: "testPod",
predicateKey: "GeneralPredicates",
predicateID: 0,
nodeName: "node1",
fit: true,
equivalenceHash: 123,
expectPredicateMap: false,
expectCacheItem: predicateResult{
Fit: true,
},
},
{
name: "test 2",
pod: "testPod",
predicateKey: "GeneralPredicates",
predicateID: 0,
nodeName: "node2",
fit: false,
equivalenceHash: 123,
expectPredicateMap: true,
expectCacheItem: predicateResult{
Fit: false,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := schedulernodeinfo.NewNodeInfo()
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
// Initialize and populate equivalence class cache.
ecache := NewCache(predicatesOrdering)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
if test.expectPredicateMap {
predicateItem := predicateResult{
Fit: true,
}
nodeCache.cache[test.predicateID] =
resultMap{
test.equivalenceHash: predicateItem,
}
}
nodeCache.updateResult(
test.pod,
test.predicateKey,
test.predicateID,
test.fit,
test.reasons,
test.equivalenceHash,
node,
)
cachedMapItem := nodeCache.cache[test.predicateID]
if cachedMapItem == nil {
t.Errorf("can't find expected cache item: %v", test.expectCacheItem)
} else {
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
t.Errorf("expected cached item: %v, but got: %v",
test.expectCacheItem, cachedMapItem[test.equivalenceHash])
}
}
})
}
}
// slicesEqual wraps reflect.DeepEqual, but returns true when comparing nil and empty slice.
func slicesEqual(a, b []algorithm.PredicateFailureReason) bool {
if len(a) == 0 && len(b) == 0 {
return true
}
return reflect.DeepEqual(a, b)
}
func TestLookupResult(t *testing.T) {
predicatesOrdering := []string{"GeneralPredicates"}
tests := []struct {
name string
podName string
nodeName string
predicateKey string
predicateID int
equivalenceHashForUpdatePredicate uint64
equivalenceHashForCalPredicate uint64
cachedItem predicateItemType
expectedPredicateKeyMiss bool
expectedEquivalenceHashMiss bool
expectedPredicateItem predicateItemType
}{
{
name: "test 1",
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
expectedPredicateKeyMiss: true,
expectedPredicateItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{},
},
},
{
name: "test 2",
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{
fit: true,
},
expectedPredicateKeyMiss: false,
expectedPredicateItem: predicateItemType{
fit: true,
reasons: []algorithm.PredicateFailureReason{},
},
},
{
name: "test 3",
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
expectedPredicateKeyMiss: false,
expectedPredicateItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
},
{
name: "test 4",
podName: "testPod",
nodeName: "node4",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 456,
predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
expectedPredicateKeyMiss: false,
expectedEquivalenceHashMiss: true,
expectedPredicateItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
// Initialize and populate equivalence class cache.
ecache := NewCache(predicatesOrdering)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
node := schedulernodeinfo.NewNodeInfo()
node.SetNode(testNode)
// set cached item to equivalence cache
nodeCache.updateResult(
test.podName,
test.predicateKey,
test.predicateID,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
node,
)
// if we want to do invalid, invalid the cached item
if test.expectedPredicateKeyMiss {
predicateKeys := sets.NewString()
predicateKeys.Insert(test.predicateKey)
ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
}
// calculate predicate with equivalence cache
result, ok := nodeCache.lookupResult(test.podName,
test.nodeName,
test.predicateKey,
test.predicateID,
test.equivalenceHashForCalPredicate,
)
fit, reasons := result.Fit, result.FailReasons
// returned invalid should match expectedPredicateKeyMiss or expectedEquivalenceHashMiss
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
if ok && test.expectedEquivalenceHashMiss {
t.Errorf("Failed: %s, expected (equivalence hash) cache miss", test.name)
}
if !ok && !test.expectedEquivalenceHashMiss {
t.Errorf("Failed: %s, expected (equivalence hash) cache hit", test.name)
}
} else {
if ok && test.expectedPredicateKeyMiss {
t.Errorf("Failed: %s, expected (predicate key) cache miss", test.name)
}
if !ok && !test.expectedPredicateKeyMiss {
t.Errorf("Failed: %s, expected (predicate key) cache hit", test.name)
}
}
// returned predicate result should match expected predicate item
if fit != test.expectedPredicateItem.fit {
t.Errorf("Failed: %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit)
}
if !slicesEqual(reasons, test.expectedPredicateItem.reasons) {
t.Errorf("Failed: %s, expected reasons: %v, but got: %v",
test.name, test.expectedPredicateItem.reasons, reasons)
}
})
}
}
func TestGetEquivalenceHash(t *testing.T) {
pod1 := makeBasicPod("pod1")
pod2 := makeBasicPod("pod2")
pod3 := makeBasicPod("pod3")
pod3.Spec.Volumes = []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "someEBSVol111",
},
},
},
}
pod4 := makeBasicPod("pod4")
pod4.Spec.Volumes = []v1.Volume{
{
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "someEBSVol222",
},
},
},
}
pod5 := makeBasicPod("pod5")
pod5.Spec.Volumes = []v1.Volume{}
pod6 := makeBasicPod("pod6")
pod6.Spec.Volumes = nil
pod7 := makeBasicPod("pod7")
pod7.Spec.NodeSelector = nil
pod8 := makeBasicPod("pod8")
pod8.Spec.NodeSelector = make(map[string]string)
type podInfo struct {
pod *v1.Pod
hashIsValid bool
}
tests := []struct {
name string
podInfoList []podInfo
isEquivalent bool
}{
{
name: "pods with everything the same except name",
podInfoList: []podInfo{
{pod: pod1, hashIsValid: true},
{pod: pod2, hashIsValid: true},
},
isEquivalent: true,
},
{
name: "pods that only differ in their PVC volume sources",
podInfoList: []podInfo{
{pod: pod3, hashIsValid: true},
{pod: pod4, hashIsValid: true},
},
isEquivalent: false,
},
{
name: "pods that have no volumes, but one uses nil and one uses an empty slice",
podInfoList: []podInfo{
{pod: pod5, hashIsValid: true},
{pod: pod6, hashIsValid: true},
},
isEquivalent: true,
},
{
name: "pods that have no NodeSelector, but one uses nil and one uses an empty map",
podInfoList: []podInfo{
{pod: pod7, hashIsValid: true},
{pod: pod8, hashIsValid: true},
},
isEquivalent: true,
},
}
var (
targetPodInfo podInfo
targetHash uint64
)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
for i, podInfo := range test.podInfoList {
testPod := podInfo.pod
eclassInfo := NewClass(testPod)
if eclassInfo == nil && podInfo.hashIsValid {
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
}
if eclassInfo != nil {
// NOTE(harry): the first element will be used as target so
// this logic can't verify more than two inequivalent pods
if i == 0 {
targetHash = eclassInfo.hash
targetPodInfo = podInfo
} else {
if targetHash != eclassInfo.hash {
if test.isEquivalent {
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod)
}
}
}
}
}
})
}
}
func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
testPredicate := "GeneralPredicates"
testPredicateID := 0
predicatesOrdering := []string{testPredicate}
// tests is used to initialize all nodes
tests := []struct {
name string
podName string
nodeName string
equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType
}{
{
name: "hash predicate 123 not fits host ports",
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{
predicates.ErrPodNotFitsHostPorts,
},
},
},
{
name: "hash predicate 456 not fits host ports",
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 456,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{
predicates.ErrPodNotFitsHostPorts,
},
},
},
{
name: "hash predicate 123 fits",
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: true,
},
},
}
ecache := NewCache(predicatesOrdering)
for _, test := range tests {
node := schedulernodeinfo.NewNodeInfo()
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
// set cached item to equivalence cache
nodeCache.updateResult(
test.podName,
testPredicate,
testPredicateID,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
node,
)
}
// invalidate cached predicate for all nodes
ecache.InvalidatePredicates(sets.NewString(testPredicate))
// there should be no cached predicate any more
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
if cache := nodeCache.cache[testPredicateID]; cache != nil {
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
testPredicate, test.nodeName)
}
}
})
}
}
func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
testPredicate := "GeneralPredicates"
testPredicateID := 0
predicatesOrdering := []string{testPredicate}
// tests is used to initialize all nodes
tests := []struct {
name string
podName string
nodeName string
equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType
}{
{
name: "hash predicate 123 not fits host ports",
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
},
{
name: "hash predicate 456 not fits host ports",
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 456,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
},
{
name: "hash predicate 123 fits host ports",
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: true,
},
},
}
ecache := NewCache(predicatesOrdering)
for _, test := range tests {
node := schedulernodeinfo.NewNodeInfo()
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
// set cached item to equivalence cache
nodeCache.updateResult(
test.podName,
testPredicate,
testPredicateID,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
node,
)
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
oldNodeCache, _ := ecache.GetNodeCache(test.nodeName)
oldGeneration := oldNodeCache.generation
// invalidate cached predicate for all nodes
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
if n, _ := ecache.GetNodeCache(test.nodeName); oldGeneration == n.generation {
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
}
})
}
}
func BenchmarkEquivalenceHash(b *testing.B) {
pod := makeBasicPod("test")
for i := 0; i < b.N; i++ {
getEquivalencePod(pod)
}
}

View File

@ -510,7 +510,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
queue := internalqueue.NewSchedulingQueue(nil)
scheduler := NewGenericScheduler(
cache,
nil,
queue,
test.predicates,
algorithm.EmptyPredicateMetadataProducer,

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
@ -116,7 +115,6 @@ type ScheduleAlgorithm interface {
type genericScheduler struct {
cache schedulerinternalcache.Cache
equivalenceCache *equivalence.Cache
schedulingQueue internalqueue.SchedulingQueue
predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.PriorityMetadataProducer
@ -134,21 +132,9 @@ type genericScheduler struct {
percentageOfNodesToScore int32
}
// snapshot snapshots equivalence cache and node infos for all fit and priority
// snapshot snapshots scheduler cache and node infos for all fit and priority
// functions.
func (g *genericScheduler) snapshot() error {
// IMPORTANT NOTE: We must snapshot equivalence cache before snapshotting
// scheduler cache, otherwise stale data may be written into equivalence
// cache, e.g.
// 1. snapshot cache
// 2. event arrives, updating cache and invalidating predicates or whole node cache
// 3. snapshot ecache
// 4. evaluate predicates
// 5. stale result will be written to ecache
if g.equivalenceCache != nil {
g.equivalenceCache.Snapshot()
}
// Used for all fit and priority funcs.
return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
}
@ -420,7 +406,6 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)
ctx, cancel := context.WithCancel(context.Background())
@ -428,26 +413,15 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
}
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
if err != nil {
predicateResultLock.Lock()
@ -556,15 +530,10 @@ func podFitsOnNode(
meta algorithm.PredicateMetadata,
info *schedulernodeinfo.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
nodeCache *equivalence.NodeCache,
queue internalqueue.SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)
var failedPredicates []algorithm.PredicateFailureReason
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
@ -593,11 +562,7 @@ func podFitsOnNode(
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for predicateID, predicateKey := range predicates.Ordering() {
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
@ -605,11 +570,7 @@ func podFitsOnNode(
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
@ -1038,7 +999,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity
// failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
@ -1052,7 +1013,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool {
addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false)
if !fits {
removePod(p)
victims = append(victims, p)
@ -1168,7 +1129,6 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
cache schedulerinternalcache.Cache,
eCache *equivalence.Cache,
podQueue internalqueue.SchedulingQueue,
predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer,
@ -1185,7 +1145,6 @@ func NewGenericScheduler(
) ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
equivalenceCache: eCache,
schedulingQueue: podQueue,
predicates: predicates,
predicateMetaProducer: predicateMetaProducer,

View File

@ -22,7 +22,6 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -39,7 +38,6 @@ import (
algorithmpriorities "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -471,7 +469,6 @@ func TestGenericScheduler(t *testing.T) {
scheduler := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(nil),
test.predicates,
algorithm.EmptyPredicateMetadataProducer,
@ -508,7 +505,6 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod
s := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(nil),
predicates,
algorithm.EmptyPredicateMetadataProducer,
@ -1435,7 +1431,6 @@ func TestPreempt(t *testing.T) {
}
scheduler := NewGenericScheduler(
cache,
nil,
internalqueue.NewSchedulingQueue(nil),
map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources},
algorithm.EmptyPredicateMetadataProducer,
@ -1488,189 +1483,3 @@ func TestPreempt(t *testing.T) {
})
}
}
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulerinternalcache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulernodeinfo is snapshotted and before equivalence cache lock is acquired.
func TestCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode)
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
eCache.GetNodeCache(testNode.Name)
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
pdbLister := schedulertesting.FakePDBLister{}
scheduler := NewGenericScheduler(
mockCache,
eCache,
internalqueue.NewSchedulingQueue(nil),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister,
true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}
// TestCacheInvalidationRace2 tests that cache invalidation is correctly handled
// when an invalidation event happens while a predicate is running.
func TestCacheInvalidationRace2(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
var (
podWillFit = false
callCount int
cycleStart = make(chan struct{})
cacheInvalidated = make(chan struct{})
once sync.Once
)
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
once.Do(func() {
cycleStart <- struct{}{}
<-cacheInvalidated
})
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop)
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode)
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
eCache.GetNodeCache(testNode.Name)
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
pdbLister := schedulertesting.FakePDBLister{}
scheduler := NewGenericScheduler(
cache,
eCache,
internalqueue.NewSchedulingQueue(nil),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
emptyPluginSet,
nil, nil, pvcLister, pdbLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -51,7 +51,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
@ -59,7 +58,6 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/api/validation"
"k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
@ -99,9 +97,7 @@ type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm core.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
@ -225,12 +221,6 @@ type configFactory struct {
// HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100.
hardPodAffinitySymmetricWeight int32
// Equivalence class cache
equivalencePodCache *equivalence.Cache
// Enable equivalence class cache
enableEquivalenceClassCache bool
// Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder
@ -259,7 +249,6 @@ type ConfigFactoryArgs struct {
PdbInformer policyinformers.PodDisruptionBudgetInformer
StorageClassInformer storageinformers.StorageClassInformer
HardPodAffinitySymmetricWeight int32
EnableEquivalenceClassCache bool
DisablePreemption bool
PercentageOfNodesToScore int32
BindTimeoutSeconds int64
@ -297,7 +286,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
StopEverything: stopEverything,
schedulerName: args.SchedulerName,
hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: args.EnableEquivalenceClassCache,
disablePreemption: args.DisablePreemption,
percentageOfNodesToScore: args.PercentageOfNodesToScore,
}
@ -365,14 +353,11 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
},
)
// On add and delete of PVs, it will affect equivalence cache items
// related to persistent volume
args.PvInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: c.onPvAdd,
UpdateFunc: c.onPvUpdate,
DeleteFunc: c.onPvDelete,
},
)
@ -381,13 +366,10 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
cache.ResourceEventHandlerFuncs{
AddFunc: c.onPvcAdd,
UpdateFunc: c.onPvcUpdate,
DeleteFunc: c.onPvcDelete,
},
)
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
args.ServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onServiceAdd,
@ -396,9 +378,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
},
)
// Existing equivalence cache should not be affected by add/delete RC/Deployment etc,
// it only make sense when pod is scheduled or deleted
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Setup volume binder
c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
@ -406,7 +385,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
args.StorageClassInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.onStorageClassAdd,
DeleteFunc: c.onStorageClassDelete,
},
)
}
@ -486,14 +464,6 @@ func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool {
}
func (c *configFactory) onPvAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj)
return
}
c.invalidatePredicatesForPv(pv)
}
// Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
@ -504,19 +474,6 @@ func (c *configFactory) onPvAdd(obj interface{}) {
}
func (c *configFactory) onPvUpdate(old, new interface{}) {
if c.enableEquivalenceClassCache {
newPV, ok := new.(*v1.PersistentVolume)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolume: %v", new)
return
}
oldPV, ok := old.(*v1.PersistentVolume)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolume: %v", old)
return
}
c.invalidatePredicatesForPvUpdate(oldPV, newPV)
}
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
@ -524,96 +481,12 @@ func (c *configFactory) onPvUpdate(old, new interface{}) {
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
invalidPredicates := sets.NewString()
// CheckVolumeBinding predicate calls SchedulerVolumeBinder.FindPodVolumes
// which will cache PVs in PodBindingCache. When PV got updated, we should
// invalidate cache, otherwise PVAssumeCache.Assume will fail with out of sync
// error.
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
for k, v := range newPV.Labels {
// If PV update modifies the zone/region labels.
if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
break
}
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
}
// isZoneRegionLabel check if given key of label is zone or region label.
func isZoneRegionLabel(k string) bool {
return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion
}
func (c *configFactory) onPvDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var pv *v1.PersistentVolume
switch t := obj.(type) {
case *v1.PersistentVolume:
pv = t
case cache.DeletedFinalStateUnknown:
var ok bool
pv, ok = t.Obj.(*v1.PersistentVolume)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t)
return
}
c.invalidatePredicatesForPv(pv)
}
}
func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
// You could have a PVC that points to a PV, but the PV object doesn't exist.
// So when the PV object gets added, we can recount.
invalidPredicates := sets.NewString()
// PV types which impact MaxPDVolumeCountPredicate
if pv.Spec.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if pv.Spec.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if pv.Spec.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
for k := range pv.Labels {
if isZoneRegionLabel(k) {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
break
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
}
func (c *configFactory) onPvcAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj)
return
}
c.invalidatePredicatesForPvc(pvc)
}
c.podQueue.MoveAllToActiveQueue()
}
@ -621,83 +494,9 @@ func (c *configFactory) onPvcUpdate(old, new interface{}) {
if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
return
}
if c.enableEquivalenceClassCache {
newPVC, ok := new.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", new)
return
}
oldPVC, ok := old.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", old)
return
}
c.invalidatePredicatesForPvcUpdate(oldPVC, newPVC)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onPvcDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var pvc *v1.PersistentVolumeClaim
switch t := obj.(type) {
case *v1.PersistentVolumeClaim:
pvc = t
case cache.DeletedFinalStateUnknown:
var ok bool
pvc, ok = t.Obj.(*v1.PersistentVolumeClaim)
if !ok {
klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t)
return
}
c.invalidatePredicatesForPvc(pvc)
}
}
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod
// The bound volume type may change
invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
// The bound volume's label may change
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
}
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
invalidPredicates := sets.NewString()
if old.Spec.VolumeName != new.Spec.VolumeName {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// PVC volume binding has changed
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
// The bound volume type may change
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred)
}
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
}
func (c *configFactory) onStorageClassAdd(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
@ -709,71 +508,20 @@ func (c *configFactory) onStorageClassAdd(obj interface{}) {
// PVCs have specified StorageClass name, creating StorageClass objects
// with late binding will cause predicates to pass, so we need to move pods
// to active queue.
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
c.podQueue.MoveAllToActiveQueue()
}
}
func (c *configFactory) onStorageClassDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
var sc *storagev1.StorageClass
switch t := obj.(type) {
case *storagev1.StorageClass:
sc = t
case cache.DeletedFinalStateUnknown:
var ok bool
sc, ok = t.Obj.(*storagev1.StorageClass)
if !ok {
klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj)
return
}
default:
klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t)
return
}
c.invalidatePredicatesForStorageClass(sc)
}
}
func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) {
invalidPredicates := sets.NewString()
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
// Delete can cause predicates to fail
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
}
}
c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
}
func (c *configFactory) onServiceAdd(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
if c.enableEquivalenceClassCache {
// TODO(resouer) We may need to invalidate this for specified group of pods only
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
}
}
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) onServiceDelete(obj interface{}) {
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
}
c.podQueue.MoveAllToActiveQueue()
}
@ -812,9 +560,6 @@ func (c *configFactory) addPodToCache(obj interface{}) {
}
c.podQueue.AssignedPodAdded(pod)
// NOTE: Updating equivalence cache of addPodToCache has been
// handled optimistically in: pkg/scheduler/scheduler.go#assume()
}
func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
@ -829,16 +574,10 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
klog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
c.podQueue.AssignedPodUpdated(newPod)
}
@ -883,27 +622,6 @@ func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
}
}
func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
if c.enableEquivalenceClassCache {
// if the pod does not have bound node, updating equivalence cache is meaningless;
// if pod's bound node has been changed, that case should be handled by pod add & delete.
if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
c.equivalencePodCache.InvalidatePredicates(
matchInterPodAffinitySet)
}
// if requested container resource changed, invalidate GeneralPredicates of this node
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
predicates.GetResourceRequest(oldPod)) {
c.equivalencePodCache.InvalidatePredicatesOnNode(
newPod.Spec.NodeName, generalPredicatesSets)
}
}
}
}
func (c *configFactory) deletePodFromCache(obj interface{}) {
var pod *v1.Pod
switch t := obj.(type) {
@ -920,40 +638,14 @@ func (c *configFactory) deletePodFromCache(obj interface{}) {
klog.Errorf("cannot convert to *v1.Pod: %v", t)
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.RemovePod(pod); err != nil {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
c.invalidateCachedPredicatesOnDeletePod(pod)
c.podQueue.MoveAllToActiveQueue()
}
func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
if c.enableEquivalenceClassCache {
// part of this case is the same as pod add.
c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
// MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain.
// TODO(resouer) can we just do this for nodes in the same failure domain
c.equivalencePodCache.InvalidatePredicates(
matchInterPodAffinitySet)
// if this pod have these PV, cached result of disk conflict will become invalid.
for _, volume := range pod.Spec.Volumes {
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
volume.RBD != nil || volume.ISCSI != nil {
c.equivalencePodCache.InvalidatePredicatesOnNode(
pod.Spec.NodeName, noDiskConflictSet)
}
}
}
}
func (c *configFactory) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
@ -961,19 +653,11 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
return
}
// NOTE: Because the scheduler uses equivalence cache for nodes, we need
// to create it before adding node into scheduler cache.
if c.enableEquivalenceClassCache {
// GetNodeCache() will lazily create NodeCache for given node if it does not exist.
c.equivalencePodCache.GetNodeCache(node.GetName())
}
if err := c.schedulerCache.AddNode(node); err != nil {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
c.podQueue.MoveAllToActiveQueue()
// NOTE: add a new node does not affect existing predicates in equivalence cache
}
func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
@ -988,16 +672,10 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
klog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
// Only activate unschedulable pods if the node became more schedulable.
// We skip the node property comparison when there is no unschedulable pods in the queue
// to save processing cycles. We still trigger a move to active queue to cover the case
@ -1008,75 +686,6 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
}
}
func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
if c.enableEquivalenceClassCache {
// Begin to update equivalence cache based on node update
// TODO(resouer): think about lazily initialize this set
invalidPredicates := sets.NewString()
if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources"
}
if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches"
for k, v := range oldNode.GetLabels() {
// any label can be topology key of pod, we have to invalidate in all cases
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert(predicates.MatchInterPodAffinityPred)
}
// NoVolumeZoneConflict will only be affected by zone related label change
if isZoneRegionLabel(k) {
if v != newNode.GetLabels()[k] {
invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
}
}
}
}
oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
if oldErr != nil {
klog.Errorf("Failed to get taints from old node annotation for equivalence cache")
}
newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
if newErr != nil {
klog.Errorf("Failed to get taints from new node annotation for equivalence cache")
}
if !reflect.DeepEqual(oldTaints, newTaints) ||
!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) {
invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred)
}
if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
for _, cond := range oldNode.Status.Conditions {
oldConditions[cond.Type] = cond.Status
}
for _, cond := range newNode.Status.Conditions {
newConditions[cond.Type] = cond.Status
}
if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred)
}
if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred)
}
if oldConditions[v1.NodePIDPressure] != newConditions[v1.NodePIDPressure] {
invalidPredicates.Insert(predicates.CheckNodePIDPressurePred)
}
if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] ||
oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] ||
oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] {
invalidPredicates.Insert(predicates.CheckNodeConditionPred)
}
}
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
invalidPredicates.Insert(predicates.CheckNodeConditionPred)
}
c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates)
}
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
@ -1140,17 +749,10 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
klog.Errorf("cannot convert to *v1.Node: %v", t)
return
}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.RemoveNode(node); err != nil {
klog.Errorf("scheduler cache RemoveNode failed: %v", err)
}
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName())
}
}
// Create creates a scheduler with the default algorithm provider.
@ -1288,15 +890,8 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// TODO(bsalamat): the default registrar should be able to process config files.
c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache)
// Init equivalence class cache
if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
klog.Info("Created equivalence class cache")
}
algo := core.NewGenericScheduler(
c.schedulerCache,
c.equivalencePodCache,
c.podQueue,
predicateFuncs,
predicateMetaProducer,
@ -1315,7 +910,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
podBackoff := util.CreateDefaultPodBackoff()
return &Config{
SchedulerCache: c.schedulerCache,
Ecache: c.equivalencePodCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
Algorithm: algo,
@ -1514,16 +1108,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
_, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
c.schedulerCache.RemoveNode(&node)
// invalidate cached predicate for the node
if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName)
}
}
}
} else {

View File

@ -45,7 +45,6 @@ import (
)
const (
enableEquivalenceCache = true
disablePodPreemption = false
bindTimeoutSeconds = 600
)
@ -532,7 +531,6 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
hardPodAffinitySymmetricWeight,
enableEquivalenceCache,
disablePodPreemption,
schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds,

View File

@ -131,22 +131,6 @@ var (
Help: "Total preemption attempts in the cluster till now",
})
equivalenceCacheLookups = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "equiv_cache_lookups_total",
Help: "Total number of equivalence cache lookups, by whether or not a cache entry was found",
}, []string{"result"})
EquivalenceCacheHits = equivalenceCacheLookups.With(prometheus.Labels{"result": "hit"})
EquivalenceCacheMisses = equivalenceCacheLookups.With(prometheus.Labels{"result": "miss"})
EquivalenceCacheWrites = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "equiv_cache_writes",
Help: "Total number of equivalence cache writes, by result",
}, []string{"result"})
metricsList = []prometheus.Collector{
scheduleAttempts,
SchedulingLatency,
@ -158,8 +142,6 @@ var (
SchedulingAlgorithmPremptionEvaluationDuration,
PreemptionVictims,
PreemptionAttempts,
equivalenceCacheLookups,
EquivalenceCacheWrites,
}
)

View File

@ -28,7 +28,6 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
appsinformers "k8s.io/client-go/informers/apps/v1"
@ -38,7 +37,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -70,7 +68,6 @@ func (sched *Scheduler) Cache() schedulerinternalcache.Cache {
type schedulerOptions struct {
schedulerName string
hardPodAffinitySymmetricWeight int32
enableEquivalenceClassCache bool
disablePreemption bool
percentageOfNodesToScore int32
bindTimeoutSeconds int64
@ -93,13 +90,6 @@ func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Op
}
}
// WithEquivalenceClassCacheEnabled sets enableEquivalenceClassCache for Scheduler, the default value is false
func WithEquivalenceClassCacheEnabled(enableEquivalenceClassCache bool) Option {
return func(o *schedulerOptions) {
o.enableEquivalenceClassCache = enableEquivalenceClassCache
}
}
// WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false
func WithPreemptionDisabled(disablePreemption bool) Option {
return func(o *schedulerOptions) {
@ -124,7 +114,6 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option {
var defaultSchedulerOptions = schedulerOptions{
schedulerName: v1.DefaultSchedulerName,
hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
enableEquivalenceClassCache: false,
disablePreemption: false,
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
bindTimeoutSeconds: BindTimeoutSeconds,
@ -167,7 +156,6 @@ func New(client clientset.Interface,
PdbInformer: pdbInformer,
StorageClassInformer: storageClassInformer,
HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: options.enableEquivalenceClassCache,
DisablePreemption: options.disablePreemption,
PercentageOfNodesToScore: options.percentageOfNodesToScore,
BindTimeoutSeconds: options.bindTimeoutSeconds,
@ -370,12 +358,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo
sched.recordSchedulingFailure(assumed, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err))
}
// Invalidate ecache because assumed volumes could have affected the cached
// pvs for other pods
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidatePredicates(invalidPredicates)
}
}
return
}
@ -416,11 +398,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed.Spec.NodeName = host
// NOTE: Updates must be written to scheduler cache before invalidating
// equivalence cache, because we could snapshot equivalence cache after the
// invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
klog.Errorf("scheduler cache AssumePod failed: %v", err)
@ -438,12 +416,6 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
// If the binding fails, these invalidated item will not break anything.
if sched.config.Ecache != nil {
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
}
return nil
}

View File

@ -639,7 +639,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},
@ -692,7 +691,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
algo := core.NewGenericScheduler(
scache,
nil,
nil,
predicateMap,
algorithm.EmptyPredicateMetadataProducer,
[]algorithm.PriorityConfig{},

View File

@ -112,7 +112,6 @@ func setupScheduler(
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: false,
DisablePreemption: false,
PercentageOfNodesToScore: 100,
})

View File

@ -106,7 +106,7 @@ func TestReservePlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "reserve-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -157,7 +157,7 @@ func TestPrebindPlugin(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "prebind-plugin", nil),
false, nil, testPluginSet, false, true, time.Second)
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
@ -239,7 +239,7 @@ func TestContextCleanup(t *testing.T) {
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "plugin-context-cleanup", nil),
false, nil, testPluginSet, false, true, time.Second)
false, nil, testPluginSet, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet

View File

@ -33,8 +33,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
@ -46,7 +44,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
@ -91,7 +88,6 @@ func createConfiguratorWithPodInformer(
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: 600,
@ -149,7 +145,7 @@ func initTestScheduler(
) *TestContext {
// Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority
// feature gate is enabled at the same time.
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, true, time.Second)
return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, time.Second)
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -161,16 +157,8 @@ func initTestSchedulerWithOptions(
policy *schedulerapi.Policy,
pluginSet plugins.PluginSet,
disablePreemption bool,
disableEquivalenceCache bool,
resyncPeriod time.Duration,
) *TestContext {
if !disableEquivalenceCache {
defer utilfeaturetesting.SetFeatureGateDuringTest(
t,
utilfeature.DefaultFeatureGate,
features.EnableEquivalenceClassCache, true)()
}
// 1. Create scheduler
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod)
@ -264,7 +252,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext {
// configuration but with pod preemption disabled.
func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
return initTestSchedulerWithOptions(
t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, true, time.Second)
t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, time.Second)
}
// cleanupTest deletes the scheduler and the test namespace. It should be called

View File

@ -99,7 +99,7 @@ type testPVC struct {
func TestVolumeBinding(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
config := setupCluster(t, "volume-scheduling-", 2, 0, 0, true)
config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
defer config.teardown()
cases := map[string]struct {
@ -270,7 +270,7 @@ func TestVolumeBinding(t *testing.T) {
func TestVolumeBindingRescheduling(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
config := setupCluster(t, "volume-scheduling-", 2, 0, 0, true)
config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
defer config.teardown()
storageClassName := "local-storage"
@ -414,7 +414,7 @@ func TestVolumeBindingDynamicStressSlow(t *testing.T) {
func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds, true)
config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds)
defer config.teardown()
// Set max volume limit to the number of PVCs the test will create
@ -504,8 +504,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration,
func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
// TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed
config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0, true)
config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0)
defer config.teardown()
pods := []*v1.Pod{}
@ -632,7 +631,7 @@ func TestVolumeBindingWithAffinity(t *testing.T) {
func TestPVAffinityConflict(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
config := setupCluster(t, "volume-scheduling-", 3, 0, 0, true)
config := setupCluster(t, "volume-scheduling-", 3, 0, 0)
defer config.teardown()
pv := makePV("local-pv", classImmediate, "", "", node1)
@ -693,7 +692,7 @@ func TestPVAffinityConflict(t *testing.T) {
func TestVolumeProvision(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)()
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)()
config := setupCluster(t, "volume-scheduling", 1, 0, 0, true)
config := setupCluster(t, "volume-scheduling", 1, 0, 0)
defer config.teardown()
cases := map[string]struct {
@ -889,9 +888,8 @@ func TestRescheduleProvisioning(t *testing.T) {
}
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig {
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, disableEquivalenceCache, resyncPeriod)
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, resyncPeriod)
clientset := context.clientSet
ns := context.ns.Name

View File

@ -21,14 +21,12 @@ import (
"net/http/httptest"
"k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
"k8s.io/kubernetes/pkg/scheduler/factory"
@ -94,6 +92,7 @@ func createSchedulerConfigurator(
informerFactory informers.SharedInformerFactory,
stopCh <-chan struct{},
) factory.Configurator {
return factory.NewConfigFactory(&factory.ConfigFactoryArgs{
SchedulerName: v1.DefaultSchedulerName,
Client: clientSet,
@ -108,7 +107,6 @@ func createSchedulerConfigurator(
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
StopCh: stopCh,