From 1081e919e35a9a4d842331e0c8ae93217f42afc0 Mon Sep 17 00:00:00 2001 From: wangqingcan Date: Sat, 29 Sep 2018 19:30:47 +0800 Subject: [PATCH] Eclass Task 1: clean up old equiv class code Co-authored-by: Harry Zhang Co-authored-by: Wang Qingcan --- cmd/kube-scheduler/app/server.go | 2 - pkg/features/kube_features.go | 7 - .../defaults/compatibility_test.go | 3 - .../algorithmprovider/defaults/defaults.go | 7 - pkg/scheduler/core/equivalence/BUILD | 49 -- pkg/scheduler/core/equivalence/eqivalence.go | 443 ---------- .../core/equivalence/eqivalence_test.go | 803 ------------------ pkg/scheduler/core/extender_test.go | 1 - pkg/scheduler/core/generic_scheduler.go | 53 +- pkg/scheduler/core/generic_scheduler_test.go | 191 ----- pkg/scheduler/factory/factory.go | 423 +-------- pkg/scheduler/factory/factory_test.go | 6 +- pkg/scheduler/metrics/metrics.go | 18 - pkg/scheduler/scheduler.go | 30 +- pkg/scheduler/scheduler_test.go | 2 - test/integration/daemonset/daemonset_test.go | 1 - test/integration/scheduler/plugin_test.go | 6 +- test/integration/scheduler/util.go | 16 +- .../scheduler/volume_binding_test.go | 18 +- test/integration/util/util.go | 4 +- 20 files changed, 27 insertions(+), 2056 deletions(-) delete mode 100644 pkg/scheduler/core/equivalence/BUILD delete mode 100644 pkg/scheduler/core/equivalence/eqivalence.go delete mode 100644 pkg/scheduler/core/equivalence/eqivalence_test.go diff --git a/cmd/kube-scheduler/app/server.go b/cmd/kube-scheduler/app/server.go index 615f9e8f18..096a6b4cae 100644 --- a/cmd/kube-scheduler/app/server.go +++ b/cmd/kube-scheduler/app/server.go @@ -187,7 +187,6 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error stopCh, scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), - scheduler.WithEquivalenceClassCacheEnabled(cc.ComponentConfig.EnableContentionProfiling), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds)) @@ -356,7 +355,6 @@ func NewSchedulerConfig(s schedulerserverconfig.CompletedConfig) (*factory.Confi PdbInformer: s.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: storageClassInformer, HardPodAffinitySymmetricWeight: s.ComponentConfig.HardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), DisablePreemption: s.ComponentConfig.DisablePreemption, PercentageOfNodesToScore: s.ComponentConfig.PercentageOfNodesToScore, BindTimeoutSeconds: *s.ComponentConfig.BindTimeoutSeconds, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 34d9e6713c..785b9d4f7c 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -124,12 +124,6 @@ const ( // Add priority to pods. Priority affects scheduling and preemption of pods. PodPriority utilfeature.Feature = "PodPriority" - // owner: @resouer - // alpha: v1.8 - // - // Enable equivalence class cache for scheduler. - EnableEquivalenceClassCache utilfeature.Feature = "EnableEquivalenceClassCache" - // owner: @k82cn // beta: v1.12 // @@ -418,7 +412,6 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS DebugContainers: {Default: false, PreRelease: utilfeature.Alpha}, PodShareProcessNamespace: {Default: true, PreRelease: utilfeature.Beta}, PodPriority: {Default: true, PreRelease: utilfeature.Beta}, - EnableEquivalenceClassCache: {Default: false, PreRelease: utilfeature.Alpha}, TaintNodesByCondition: {Default: true, PreRelease: utilfeature.Beta}, MountPropagation: {Default: true, PreRelease: utilfeature.GA}, QOSReserved: {Default: false, PreRelease: utilfeature.Alpha}, diff --git a/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index 9e2262ee30..5d387cb18d 100644 --- a/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -36,8 +36,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/factory" ) -const enableEquivalenceCache = true - func TestCompatibility_v1_Scheduler(t *testing.T) { // Add serialized versions of scheduler config that exercise available options to ensure compatibility between releases schedulerFiles := map[string]struct { @@ -987,7 +985,6 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: enableEquivalenceCache, DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, }).CreateFromConfig(policy); err != nil { diff --git a/pkg/scheduler/algorithmprovider/defaults/defaults.go b/pkg/scheduler/algorithmprovider/defaults/defaults.go index 74467fe17a..c7b8e90e87 100644 --- a/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -49,13 +49,6 @@ func init() { registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) // IMPORTANT NOTES for predicate developers: - // We are using cached predicate result for pods belonging to the same equivalence class. - // So when implementing a new predicate, you are expected to check whether the result - // of your predicate function can be affected by related API object change (ADD/DELETE/UPDATE). - // If yes, you are expected to invalidate the cached predicate result for related API object change. - // For example: - // https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422 - // Registers predicates and priorities that are not enabled by default, but user can pick when creating their // own set of priorities/predicates. diff --git a/pkg/scheduler/core/equivalence/BUILD b/pkg/scheduler/core/equivalence/BUILD deleted file mode 100644 index 7cdad3cb51..0000000000 --- a/pkg/scheduler/core/equivalence/BUILD +++ /dev/null @@ -1,49 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = ["eqivalence.go"], - importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence", - visibility = ["//visibility:public"], - deps = [ - "//pkg/features:go_default_library", - "//pkg/scheduler/algorithm:go_default_library", - "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/metrics:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", - "//pkg/util/hash:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", - "//vendor/k8s.io/klog:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["eqivalence_test.go"], - embed = [":go_default_library"], - deps = [ - "//pkg/scheduler/algorithm:go_default_library", - "//pkg/scheduler/algorithm/predicates:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", - "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], - visibility = ["//visibility:public"], -) diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go deleted file mode 100644 index 12f2eed72c..0000000000 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ /dev/null @@ -1,443 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package equivalence defines Pod equivalence classes and the equivalence class -// cache. -package equivalence - -import ( - "fmt" - "hash/fnv" - "sync" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/scheduler/algorithm" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - "k8s.io/kubernetes/pkg/scheduler/metrics" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" - hashutil "k8s.io/kubernetes/pkg/util/hash" -) - -// nodeMap stores a *NodeCache for each node. -type nodeMap map[string]*NodeCache - -// Cache is a thread safe map saves and reuses the output of predicate functions, -// it uses node name as key to access those cached results. -// -// Internally, results are keyed by predicate name, and "equivalence -// class". (Equivalence class is defined in the `Class` type.) Saved results -// will be reused until an appropriate invalidation function is called. -type Cache struct { - // NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while - // the reality is lock contention in first level cache is rare. - mu sync.RWMutex - nodeToCache nodeMap - predicateIDMap map[string]int -} - -// NewCache create an empty equiv class cache. -func NewCache(predicates []string) *Cache { - predicateIDMap := make(map[string]int, len(predicates)) - for id, predicate := range predicates { - predicateIDMap[predicate] = id - } - return &Cache{ - nodeToCache: make(nodeMap), - predicateIDMap: predicateIDMap, - } -} - -// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to -// get or update the cached results. An appropriate Invalidate* function should -// be called when some predicate results are no longer valid. -// -// Internally, results are keyed by predicate name, and "equivalence -// class". (Equivalence class is defined in the `Class` type.) Saved results -// will be reused until an appropriate invalidation function is called. -// -// NodeCache objects are thread safe within the context of NodeCache, -type NodeCache struct { - mu sync.RWMutex - cache predicateMap - // generation is current generation of node cache, incremented on node - // invalidation. - generation uint64 - // snapshotGeneration saves snapshot of generation of node cache. - snapshotGeneration uint64 - // predicateGenerations stores generation numbers for predicates, incremented on - // predicate invalidation. Created on first update. Use 0 if does not - // exist. - predicateGenerations []uint64 - // snapshotPredicateGenerations saves snapshot of generation numbers for predicates. - snapshotPredicateGenerations []uint64 -} - -// newNodeCache returns an empty NodeCache. -func newNodeCache(n int) *NodeCache { - return &NodeCache{ - cache: make(predicateMap, n), - predicateGenerations: make([]uint64, n), - snapshotPredicateGenerations: make([]uint64, n), - } -} - -// Snapshot snapshots current generations of cache. -// NOTE: We snapshot generations of all node caches before using it and these -// operations are serialized, we can save snapshot as member of node cache -// itself. -func (c *Cache) Snapshot() { - c.mu.RLock() - defer c.mu.RUnlock() - for _, n := range c.nodeToCache { - n.mu.Lock() - // snapshot predicate generations - copy(n.snapshotPredicateGenerations, n.predicateGenerations) - // snapshot node generation - n.snapshotGeneration = n.generation - n.mu.Unlock() - } - return -} - -// GetNodeCache returns the existing NodeCache for given node if present. Otherwise, -// it creates the NodeCache and returns it. -// The boolean flag is true if the value was loaded, false if created. -func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) { - c.mu.Lock() - defer c.mu.Unlock() - if nodeCache, exists = c.nodeToCache[name]; !exists { - nodeCache = newNodeCache(len(c.predicateIDMap)) - c.nodeToCache[name] = nodeCache - } - return -} - -// LoadNodeCache returns the existing NodeCache for given node, nil if not -// present. -func (c *Cache) LoadNodeCache(node string) *NodeCache { - c.mu.RLock() - defer c.mu.RUnlock() - return c.nodeToCache[node] -} - -func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int { - predicateIDs := make([]int, 0, len(predicateKeys)) - for predicateKey := range predicateKeys { - if id, ok := c.predicateIDMap[predicateKey]; ok { - predicateIDs = append(predicateIDs, id) - } else { - klog.Errorf("predicate key %q not found", predicateKey) - } - } - return predicateIDs -} - -// InvalidatePredicates clears all cached results for the given predicates. -func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - c.mu.RLock() - defer c.mu.RUnlock() - predicateIDs := c.predicateKeysToIDs(predicateKeys) - for _, n := range c.nodeToCache { - n.invalidatePreds(predicateIDs) - } - klog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) - -} - -// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. -func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - c.mu.RLock() - defer c.mu.RUnlock() - predicateIDs := c.predicateKeysToIDs(predicateKeys) - if n, ok := c.nodeToCache[nodeName]; ok { - n.invalidatePreds(predicateIDs) - } - klog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) -} - -// InvalidateAllPredicatesOnNode clears all cached results for one node. -func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { - c.mu.RLock() - defer c.mu.RUnlock() - if node, ok := c.nodeToCache[nodeName]; ok { - node.invalidate() - } - klog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) -} - -// InvalidateCachedPredicateItemForPodAdd is a wrapper of -// InvalidateCachedPredicateItem for pod add case -// TODO: This does not belong with the equivalence cache implementation. -func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { - // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod - // will not break the existing inter pod affinity. So we does not need to - // invalidate MatchInterPodAffinity when pod added. - // - // But when a pod is deleted, existing inter pod affinity may become invalid. - // (e.g. this pod was preferred by some else, or vice versa) - // - // NOTE: assumptions above will not stand when we implemented features like - // RequiredDuringSchedulingRequiredDuringExecutioc. - - // NoDiskConflict: the newly scheduled pod fits to existing pods on this node, - // it will also fits to equivalence class of existing pods - - // GeneralPredicates: will always be affected by adding a new pod - invalidPredicates := sets.NewString(predicates.GeneralPred) - - // MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc. - for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil { - invalidPredicates.Insert( - predicates.MaxEBSVolumeCountPred, - predicates.MaxGCEPDVolumeCountPred, - predicates.MaxAzureDiskVolumeCountPred) - if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) - } - } else { - // We do not consider CSI volumes here because CSI - // volumes can not be used inline. - if vol.AWSElasticBlockStore != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) - } - if vol.GCEPersistentDisk != nil { - invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) - } - if vol.AzureDisk != nil { - invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) - } - } - } - c.InvalidatePredicatesOnNode(nodeName, invalidPredicates) -} - -// Class represents a set of pods which are equivalent from the perspective of -// the scheduler. i.e. the scheduler would make the same decision for any pod -// from the same class. -type Class struct { - // Equivalence hash - hash uint64 -} - -// NewClass returns the equivalence class for a given Pod. The returned Class -// objects will be equal for two Pods in the same class. nil values should not -// be considered equal to each other. -// -// NOTE: Make sure to compare types of Class and not *Class. -// TODO(misterikkit): Return error instead of nil *Class. -func NewClass(pod *v1.Pod) *Class { - equivalencePod := getEquivalencePod(pod) - if equivalencePod != nil { - hash := fnv.New32a() - hashutil.DeepHashObject(hash, equivalencePod) - return &Class{ - hash: uint64(hash.Sum32()), - } - } - return nil -} - -// predicateMap stores resultMaps with predicate ID as the key. -type predicateMap []resultMap - -// resultMap stores PredicateResult with pod equivalence hash as the key. -type resultMap map[uint64]predicateResult - -// predicateResult stores the output of a FitPredicate. -type predicateResult struct { - Fit bool - FailReasons []algorithm.PredicateFailureReason -} - -// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be -// run and its results cached for the next call. -// -// NOTE: RunPredicate will not update the equivalence cache if generation does not match live version. -func (n *NodeCache) RunPredicate( - pred algorithm.FitPredicate, - predicateKey string, - predicateID int, - pod *v1.Pod, - meta algorithm.PredicateMetadata, - nodeInfo *schedulernodeinfo.NodeInfo, - equivClass *Class, -) (bool, []algorithm.PredicateFailureReason, error) { - if nodeInfo == nil || nodeInfo.Node() == nil { - // This may happen during tests. - return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") - } - - result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash) - if ok { - return result.Fit, result.FailReasons, nil - } - fit, reasons, err := pred(pod, meta, nodeInfo) - if err != nil { - return fit, reasons, err - } - n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo) - return fit, reasons, nil -} - -// updateResult updates the cached result of a predicate. -func (n *NodeCache) updateResult( - podName, predicateKey string, - predicateID int, - fit bool, - reasons []algorithm.PredicateFailureReason, - equivalenceHash uint64, - nodeInfo *schedulernodeinfo.NodeInfo, -) { - if nodeInfo == nil || nodeInfo.Node() == nil { - // This may happen during tests. - metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc() - return - } - - predicateItem := predicateResult{ - Fit: fit, - FailReasons: reasons, - } - - n.mu.Lock() - defer n.mu.Unlock() - if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) { - // Generation of node or predicate has been updated since we last took - // a snapshot, this indicates that we received a invalidation request - // during this time. Cache may be stale, skip update. - metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc() - return - } - // If cached predicate map already exists, just update the predicate by key - if predicates := n.cache[predicateID]; predicates != nil { - // maps in golang are references, no need to add them back - predicates[equivalenceHash] = predicateItem - } else { - n.cache[predicateID] = - resultMap{ - equivalenceHash: predicateItem, - } - } - n.predicateGenerations[predicateID]++ - - klog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v", - nodeInfo.Node().Name, predicateKey, podName, predicateItem) -} - -// lookupResult returns cached predicate results and a bool saying whether a -// cache entry was found. -func (n *NodeCache) lookupResult( - podName, nodeName, predicateKey string, - predicateID int, - equivalenceHash uint64, -) (value predicateResult, ok bool) { - n.mu.RLock() - defer n.mu.RUnlock() - value, ok = n.cache[predicateID][equivalenceHash] - if ok { - metrics.EquivalenceCacheHits.Inc() - } else { - metrics.EquivalenceCacheMisses.Inc() - } - return value, ok -} - -// invalidatePreds deletes cached predicates by given IDs. -func (n *NodeCache) invalidatePreds(predicateIDs []int) { - n.mu.Lock() - defer n.mu.Unlock() - for _, predicateID := range predicateIDs { - n.cache[predicateID] = nil - n.predicateGenerations[predicateID]++ - } -} - -// invalidate invalidates node cache. -func (n *NodeCache) invalidate() { - n.mu.Lock() - defer n.mu.Unlock() - n.cache = make(predicateMap, len(n.cache)) - n.generation++ -} - -// equivalencePod is the set of pod attributes which must match for two pods to -// be considered equivalent for scheduling purposes. For correctness, this must -// include any Pod field which is used by a FitPredicate. -// -// NOTE: For equivalence hash to be formally correct, lists and maps in the -// equivalencePod should be normalized. (e.g. by sorting them) However, the vast -// majority of equivalent pod classes are expected to be created from a single -// pod template, so they will all have the same ordering. -type equivalencePod struct { - Namespace *string - Labels map[string]string - Affinity *v1.Affinity - Containers []v1.Container // See note about ordering - InitContainers []v1.Container // See note about ordering - NodeName *string - NodeSelector map[string]string - Tolerations []v1.Toleration - Volumes []v1.Volume // See note about ordering -} - -// getEquivalencePod returns a normalized representation of a pod so that two -// "equivalent" pods will hash to the same value. -func getEquivalencePod(pod *v1.Pod) *equivalencePod { - ep := &equivalencePod{ - Namespace: &pod.Namespace, - Labels: pod.Labels, - Affinity: pod.Spec.Affinity, - Containers: pod.Spec.Containers, - InitContainers: pod.Spec.InitContainers, - NodeName: &pod.Spec.NodeName, - NodeSelector: pod.Spec.NodeSelector, - Tolerations: pod.Spec.Tolerations, - Volumes: pod.Spec.Volumes, - } - // DeepHashObject considers nil and empty slices to be different. Normalize them. - if len(ep.Containers) == 0 { - ep.Containers = nil - } - if len(ep.InitContainers) == 0 { - ep.InitContainers = nil - } - if len(ep.Tolerations) == 0 { - ep.Tolerations = nil - } - if len(ep.Volumes) == 0 { - ep.Volumes = nil - } - // Normalize empty maps also. - if len(ep.Labels) == 0 { - ep.Labels = nil - } - if len(ep.NodeSelector) == 0 { - ep.NodeSelector = nil - } - // TODO(misterikkit): Also normalize nested maps and slices. - return ep -} diff --git a/pkg/scheduler/core/equivalence/eqivalence_test.go b/pkg/scheduler/core/equivalence/eqivalence_test.go deleted file mode 100644 index 1c5779991c..0000000000 --- a/pkg/scheduler/core/equivalence/eqivalence_test.go +++ /dev/null @@ -1,803 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package equivalence - -import ( - "errors" - "reflect" - "testing" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/scheduler/algorithm" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" -) - -// makeBasicPod returns a Pod object with many of the fields populated. -func makeBasicPod(name string) *v1.Pod { - isController := true - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "test-ns", - Labels: map[string]string{"app": "web", "env": "prod"}, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "v1", - Kind: "ReplicationController", - Name: "rc", - UID: "123", - Controller: &isController, - }, - }, - }, - Spec: v1.PodSpec{ - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "failure-domain.beta.kubernetes.io/zone", - Operator: "Exists", - }, - }, - }, - }, - }, - }, - PodAffinity: &v1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "db"}}, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - PodAntiAffinity: &v1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "web"}}, - TopologyKey: "kubernetes.io/hostname", - }, - }, - }, - }, - InitContainers: []v1.Container{ - { - Name: "init-pause", - Image: "gcr.io/google_containers/pause", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - "cpu": resource.MustParse("1"), - "mem": resource.MustParse("100Mi"), - }, - }, - }, - }, - Containers: []v1.Container{ - { - Name: "pause", - Image: "gcr.io/google_containers/pause", - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - "cpu": resource.MustParse("1"), - "mem": resource.MustParse("100Mi"), - }, - }, - VolumeMounts: []v1.VolumeMount{ - { - Name: "nfs", - MountPath: "/srv/data", - }, - }, - }, - }, - NodeSelector: map[string]string{"node-type": "awesome"}, - Tolerations: []v1.Toleration{ - { - Effect: "NoSchedule", - Key: "experimental", - Operator: "Exists", - }, - }, - Volumes: []v1.Volume{ - { - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: "someEBSVol1", - }, - }, - }, - { - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: "someEBSVol2", - }, - }, - }, - { - Name: "nfs", - VolumeSource: v1.VolumeSource{ - NFS: &v1.NFSVolumeSource{ - Server: "nfs.corp.example.com", - }, - }, - }, - }, - }, - } -} - -type predicateItemType struct { - fit bool - reasons []algorithm.PredicateFailureReason -} - -// mockPredicate provides an algorithm.FitPredicate with pre-set return values. -type mockPredicate struct { - fit bool - reasons []algorithm.PredicateFailureReason - err error - callCount int -} - -func (p *mockPredicate) predicate(*v1.Pod, algorithm.PredicateMetadata, *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - p.callCount++ - return p.fit, p.reasons, p.err -} - -func TestRunPredicate(t *testing.T) { - tests := []struct { - name string - pred mockPredicate - expectFit, expectCacheHit, expectCacheWrite bool - expectedReasons []algorithm.PredicateFailureReason - expectedError string - }{ - { - name: "pod fits/cache hit", - pred: mockPredicate{}, - expectFit: true, - expectCacheHit: true, - expectCacheWrite: false, - }, - { - name: "pod fits/cache miss", - pred: mockPredicate{fit: true}, - expectFit: true, - expectCacheHit: false, - expectCacheWrite: true, - }, - { - name: "pod doesn't fit/cache miss", - pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}}, - expectFit: false, - expectCacheHit: false, - expectCacheWrite: true, - expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, - }, - { - name: "pod doesn't fit/cache hit", - pred: mockPredicate{}, - expectFit: false, - expectCacheHit: true, - expectCacheWrite: false, - expectedReasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, - }, - { - name: "predicate error", - pred: mockPredicate{err: errors.New("This is expected")}, - expectFit: false, - expectCacheHit: false, - expectCacheWrite: false, - expectedError: "This is expected", - }, - } - - predicatesOrdering := []string{"testPredicate"} - predicateID := 0 - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - node := schedulernodeinfo.NewNodeInfo() - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}} - node.SetNode(testNode) - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} - meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) - - // Initialize and populate equivalence class cache. - ecache := NewCache(predicatesOrdering) - ecache.Snapshot() - nodeCache, _ := ecache.GetNodeCache(testNode.Name) - - equivClass := NewClass(pod) - if test.expectCacheHit { - nodeCache.updateResult(pod.Name, "testPredicate", predicateID, test.expectFit, test.expectedReasons, equivClass.hash, node) - } - - fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", predicateID, pod, meta, node, equivClass) - - if err != nil { - if err.Error() != test.expectedError { - t.Errorf("Expected error %v but got %v", test.expectedError, err) - } - } else if len(test.expectedError) > 0 { - t.Errorf("Expected error %v but got nil", test.expectedError) - } - if fit && !test.expectFit { - t.Errorf("pod should not fit") - } - if !fit && test.expectFit { - t.Errorf("pod should fit") - } - if len(reasons) != len(test.expectedReasons) { - t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) - } else { - for i, reason := range reasons { - if reason != test.expectedReasons[i] { - t.Errorf("Expected failures: %v but got %v", test.expectedReasons, reasons) - break - } - } - } - if test.expectCacheHit && test.pred.callCount != 0 { - t.Errorf("Predicate should not be called") - } - if !test.expectCacheHit && test.pred.callCount == 0 { - t.Errorf("Predicate should be called") - } - _, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", predicateID, equivClass.hash) - if !ok && test.expectCacheWrite { - t.Errorf("Cache write should happen") - } - if !test.expectCacheHit && test.expectCacheWrite && !ok { - t.Errorf("Cache write should happen") - } - if !test.expectCacheHit && !test.expectCacheWrite && ok { - t.Errorf("Cache write should not happen") - } - }) - } -} - -func TestUpdateResult(t *testing.T) { - predicatesOrdering := []string{"GeneralPredicates"} - tests := []struct { - name string - pod string - predicateKey string - predicateID int - nodeName string - fit bool - reasons []algorithm.PredicateFailureReason - equivalenceHash uint64 - expectPredicateMap bool - expectCacheItem predicateResult - }{ - { - name: "test 1", - pod: "testPod", - predicateKey: "GeneralPredicates", - predicateID: 0, - nodeName: "node1", - fit: true, - equivalenceHash: 123, - expectPredicateMap: false, - expectCacheItem: predicateResult{ - Fit: true, - }, - }, - { - name: "test 2", - pod: "testPod", - predicateKey: "GeneralPredicates", - predicateID: 0, - nodeName: "node2", - fit: false, - equivalenceHash: 123, - expectPredicateMap: true, - expectCacheItem: predicateResult{ - Fit: false, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - node := schedulernodeinfo.NewNodeInfo() - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} - node.SetNode(testNode) - - // Initialize and populate equivalence class cache. - ecache := NewCache(predicatesOrdering) - nodeCache, _ := ecache.GetNodeCache(testNode.Name) - - if test.expectPredicateMap { - predicateItem := predicateResult{ - Fit: true, - } - nodeCache.cache[test.predicateID] = - resultMap{ - test.equivalenceHash: predicateItem, - } - } - - nodeCache.updateResult( - test.pod, - test.predicateKey, - test.predicateID, - test.fit, - test.reasons, - test.equivalenceHash, - node, - ) - - cachedMapItem := nodeCache.cache[test.predicateID] - if cachedMapItem == nil { - t.Errorf("can't find expected cache item: %v", test.expectCacheItem) - } else { - if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { - t.Errorf("expected cached item: %v, but got: %v", - test.expectCacheItem, cachedMapItem[test.equivalenceHash]) - } - } - }) - } -} - -// slicesEqual wraps reflect.DeepEqual, but returns true when comparing nil and empty slice. -func slicesEqual(a, b []algorithm.PredicateFailureReason) bool { - if len(a) == 0 && len(b) == 0 { - return true - } - return reflect.DeepEqual(a, b) -} - -func TestLookupResult(t *testing.T) { - predicatesOrdering := []string{"GeneralPredicates"} - tests := []struct { - name string - podName string - nodeName string - predicateKey string - predicateID int - equivalenceHashForUpdatePredicate uint64 - equivalenceHashForCalPredicate uint64 - cachedItem predicateItemType - expectedPredicateKeyMiss bool - expectedEquivalenceHashMiss bool - expectedPredicateItem predicateItemType - }{ - { - name: "test 1", - podName: "testPod", - nodeName: "node1", - equivalenceHashForUpdatePredicate: 123, - equivalenceHashForCalPredicate: 123, - predicateKey: "GeneralPredicates", - predicateID: 0, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - expectedPredicateKeyMiss: true, - expectedPredicateItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{}, - }, - }, - { - name: "test 2", - podName: "testPod", - nodeName: "node2", - equivalenceHashForUpdatePredicate: 123, - equivalenceHashForCalPredicate: 123, - predicateKey: "GeneralPredicates", - predicateID: 0, - cachedItem: predicateItemType{ - fit: true, - }, - expectedPredicateKeyMiss: false, - expectedPredicateItem: predicateItemType{ - fit: true, - reasons: []algorithm.PredicateFailureReason{}, - }, - }, - { - name: "test 3", - podName: "testPod", - nodeName: "node3", - equivalenceHashForUpdatePredicate: 123, - equivalenceHashForCalPredicate: 123, - predicateKey: "GeneralPredicates", - predicateID: 0, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - expectedPredicateKeyMiss: false, - expectedPredicateItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - }, - { - name: "test 4", - podName: "testPod", - nodeName: "node4", - equivalenceHashForUpdatePredicate: 123, - equivalenceHashForCalPredicate: 456, - predicateKey: "GeneralPredicates", - predicateID: 0, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - expectedPredicateKeyMiss: false, - expectedEquivalenceHashMiss: true, - expectedPredicateItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{}, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} - - // Initialize and populate equivalence class cache. - ecache := NewCache(predicatesOrdering) - nodeCache, _ := ecache.GetNodeCache(testNode.Name) - - node := schedulernodeinfo.NewNodeInfo() - node.SetNode(testNode) - // set cached item to equivalence cache - nodeCache.updateResult( - test.podName, - test.predicateKey, - test.predicateID, - test.cachedItem.fit, - test.cachedItem.reasons, - test.equivalenceHashForUpdatePredicate, - node, - ) - // if we want to do invalid, invalid the cached item - if test.expectedPredicateKeyMiss { - predicateKeys := sets.NewString() - predicateKeys.Insert(test.predicateKey) - ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys) - } - // calculate predicate with equivalence cache - result, ok := nodeCache.lookupResult(test.podName, - test.nodeName, - test.predicateKey, - test.predicateID, - test.equivalenceHashForCalPredicate, - ) - fit, reasons := result.Fit, result.FailReasons - // returned invalid should match expectedPredicateKeyMiss or expectedEquivalenceHashMiss - if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { - if ok && test.expectedEquivalenceHashMiss { - t.Errorf("Failed: %s, expected (equivalence hash) cache miss", test.name) - } - if !ok && !test.expectedEquivalenceHashMiss { - t.Errorf("Failed: %s, expected (equivalence hash) cache hit", test.name) - } - } else { - if ok && test.expectedPredicateKeyMiss { - t.Errorf("Failed: %s, expected (predicate key) cache miss", test.name) - } - if !ok && !test.expectedPredicateKeyMiss { - t.Errorf("Failed: %s, expected (predicate key) cache hit", test.name) - } - } - // returned predicate result should match expected predicate item - if fit != test.expectedPredicateItem.fit { - t.Errorf("Failed: %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit) - } - if !slicesEqual(reasons, test.expectedPredicateItem.reasons) { - t.Errorf("Failed: %s, expected reasons: %v, but got: %v", - test.name, test.expectedPredicateItem.reasons, reasons) - } - }) - } -} - -func TestGetEquivalenceHash(t *testing.T) { - pod1 := makeBasicPod("pod1") - pod2 := makeBasicPod("pod2") - - pod3 := makeBasicPod("pod3") - pod3.Spec.Volumes = []v1.Volume{ - { - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: "someEBSVol111", - }, - }, - }, - } - - pod4 := makeBasicPod("pod4") - pod4.Spec.Volumes = []v1.Volume{ - { - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: "someEBSVol222", - }, - }, - }, - } - - pod5 := makeBasicPod("pod5") - pod5.Spec.Volumes = []v1.Volume{} - - pod6 := makeBasicPod("pod6") - pod6.Spec.Volumes = nil - - pod7 := makeBasicPod("pod7") - pod7.Spec.NodeSelector = nil - - pod8 := makeBasicPod("pod8") - pod8.Spec.NodeSelector = make(map[string]string) - - type podInfo struct { - pod *v1.Pod - hashIsValid bool - } - - tests := []struct { - name string - podInfoList []podInfo - isEquivalent bool - }{ - { - name: "pods with everything the same except name", - podInfoList: []podInfo{ - {pod: pod1, hashIsValid: true}, - {pod: pod2, hashIsValid: true}, - }, - isEquivalent: true, - }, - { - name: "pods that only differ in their PVC volume sources", - podInfoList: []podInfo{ - {pod: pod3, hashIsValid: true}, - {pod: pod4, hashIsValid: true}, - }, - isEquivalent: false, - }, - { - name: "pods that have no volumes, but one uses nil and one uses an empty slice", - podInfoList: []podInfo{ - {pod: pod5, hashIsValid: true}, - {pod: pod6, hashIsValid: true}, - }, - isEquivalent: true, - }, - { - name: "pods that have no NodeSelector, but one uses nil and one uses an empty map", - podInfoList: []podInfo{ - {pod: pod7, hashIsValid: true}, - {pod: pod8, hashIsValid: true}, - }, - isEquivalent: true, - }, - } - - var ( - targetPodInfo podInfo - targetHash uint64 - ) - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - for i, podInfo := range test.podInfoList { - testPod := podInfo.pod - eclassInfo := NewClass(testPod) - if eclassInfo == nil && podInfo.hashIsValid { - t.Errorf("Failed: pod %v is expected to have valid hash", testPod) - } - - if eclassInfo != nil { - // NOTE(harry): the first element will be used as target so - // this logic can't verify more than two inequivalent pods - if i == 0 { - targetHash = eclassInfo.hash - targetPodInfo = podInfo - } else { - if targetHash != eclassInfo.hash { - if test.isEquivalent { - t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod) - } - } - } - } - } - }) - } -} - -func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { - testPredicate := "GeneralPredicates" - testPredicateID := 0 - predicatesOrdering := []string{testPredicate} - // tests is used to initialize all nodes - tests := []struct { - name string - podName string - nodeName string - equivalenceHashForUpdatePredicate uint64 - cachedItem predicateItemType - }{ - { - name: "hash predicate 123 not fits host ports", - podName: "testPod", - nodeName: "node1", - equivalenceHashForUpdatePredicate: 123, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{ - predicates.ErrPodNotFitsHostPorts, - }, - }, - }, - { - name: "hash predicate 456 not fits host ports", - podName: "testPod", - nodeName: "node2", - equivalenceHashForUpdatePredicate: 456, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{ - predicates.ErrPodNotFitsHostPorts, - }, - }, - }, - { - name: "hash predicate 123 fits", - podName: "testPod", - nodeName: "node3", - equivalenceHashForUpdatePredicate: 123, - cachedItem: predicateItemType{ - fit: true, - }, - }, - } - ecache := NewCache(predicatesOrdering) - - for _, test := range tests { - node := schedulernodeinfo.NewNodeInfo() - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} - node.SetNode(testNode) - - nodeCache, _ := ecache.GetNodeCache(testNode.Name) - // set cached item to equivalence cache - nodeCache.updateResult( - test.podName, - testPredicate, - testPredicateID, - test.cachedItem.fit, - test.cachedItem.reasons, - test.equivalenceHashForUpdatePredicate, - node, - ) - } - - // invalidate cached predicate for all nodes - ecache.InvalidatePredicates(sets.NewString(testPredicate)) - - // there should be no cached predicate any more - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist { - if cache := nodeCache.cache[testPredicateID]; cache != nil { - t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", - testPredicate, test.nodeName) - } - } - }) - } -} - -func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { - testPredicate := "GeneralPredicates" - testPredicateID := 0 - predicatesOrdering := []string{testPredicate} - // tests is used to initialize all nodes - tests := []struct { - name string - podName string - nodeName string - equivalenceHashForUpdatePredicate uint64 - cachedItem predicateItemType - }{ - { - name: "hash predicate 123 not fits host ports", - podName: "testPod", - nodeName: "node1", - equivalenceHashForUpdatePredicate: 123, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - }, - { - name: "hash predicate 456 not fits host ports", - podName: "testPod", - nodeName: "node2", - equivalenceHashForUpdatePredicate: 456, - cachedItem: predicateItemType{ - fit: false, - reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, - }, - }, - { - name: "hash predicate 123 fits host ports", - podName: "testPod", - nodeName: "node3", - equivalenceHashForUpdatePredicate: 123, - cachedItem: predicateItemType{ - fit: true, - }, - }, - } - ecache := NewCache(predicatesOrdering) - - for _, test := range tests { - node := schedulernodeinfo.NewNodeInfo() - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} - node.SetNode(testNode) - - nodeCache, _ := ecache.GetNodeCache(testNode.Name) - // set cached item to equivalence cache - nodeCache.updateResult( - test.podName, - testPredicate, - testPredicateID, - test.cachedItem.fit, - test.cachedItem.reasons, - test.equivalenceHashForUpdatePredicate, - node, - ) - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - oldNodeCache, _ := ecache.GetNodeCache(test.nodeName) - oldGeneration := oldNodeCache.generation - // invalidate cached predicate for all nodes - ecache.InvalidateAllPredicatesOnNode(test.nodeName) - if n, _ := ecache.GetNodeCache(test.nodeName); oldGeneration == n.generation { - t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) - } - }) - } -} - -func BenchmarkEquivalenceHash(b *testing.B) { - pod := makeBasicPod("test") - for i := 0; i < b.N; i++ { - getEquivalencePod(pod) - } -} diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 218e2f8250..1d09f7983b 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -510,7 +510,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { queue := internalqueue.NewSchedulingQueue(nil) scheduler := NewGenericScheduler( cache, - nil, queue, test.predicates, algorithm.EmptyPredicateMetadataProducer, diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 32cbe14855..c7bbbe47b8 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -116,7 +115,6 @@ type ScheduleAlgorithm interface { type genericScheduler struct { cache schedulerinternalcache.Cache - equivalenceCache *equivalence.Cache schedulingQueue internalqueue.SchedulingQueue predicates map[string]algorithm.FitPredicate priorityMetaProducer algorithm.PriorityMetadataProducer @@ -134,21 +132,9 @@ type genericScheduler struct { percentageOfNodesToScore int32 } -// snapshot snapshots equivalence cache and node infos for all fit and priority +// snapshot snapshots scheduler cache and node infos for all fit and priority // functions. func (g *genericScheduler) snapshot() error { - // IMPORTANT NOTE: We must snapshot equivalence cache before snapshotting - // scheduler cache, otherwise stale data may be written into equivalence - // cache, e.g. - // 1. snapshot cache - // 2. event arrives, updating cache and invalidating predicates or whole node cache - // 3. snapshot ecache - // 4. evaluate predicates - // 5. stale result will be written to ecache - if g.equivalenceCache != nil { - g.equivalenceCache.Snapshot() - } - // Used for all fit and priority funcs. return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) } @@ -420,7 +406,6 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v var ( predicateResultLock sync.Mutex filteredLen int32 - equivClass *equivalence.Class ) ctx, cancel := context.WithCancel(context.Background()) @@ -428,26 +413,15 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) - if g.equivalenceCache != nil { - // getEquivalenceClassInfo will return immediately if no equivalence pod found - equivClass = equivalence.NewClass(pod) - } - checkNode := func(i int) { - var nodeCache *equivalence.NodeCache nodeName := g.cache.NodeTree().Next() - if g.equivalenceCache != nil { - nodeCache = g.equivalenceCache.LoadNodeCache(nodeName) - } fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, - nodeCache, g.schedulingQueue, g.alwaysCheckAllPredicates, - equivClass, ) if err != nil { predicateResultLock.Lock() @@ -556,15 +530,10 @@ func podFitsOnNode( meta algorithm.PredicateMetadata, info *schedulernodeinfo.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, - nodeCache *equivalence.NodeCache, queue internalqueue.SchedulingQueue, alwaysCheckAllPredicates bool, - equivClass *equivalence.Class, ) (bool, []algorithm.PredicateFailureReason, error) { - var ( - eCacheAvailable bool - failedPredicates []algorithm.PredicateFailureReason - ) + var failedPredicates []algorithm.PredicateFailureReason podsAdded := false // We run predicates twice in some cases. If the node has greater or equal priority @@ -593,11 +562,7 @@ func podFitsOnNode( } else if !podsAdded || len(failedPredicates) != 0 { break } - // Bypass eCache if node has any nominated pods. - // TODO(bsalamat): consider using eCache and adding proper eCache invalidations - // when pods are nominated or their nominations change. - eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded - for predicateID, predicateKey := range predicates.Ordering() { + for _, predicateKey := range predicates.Ordering() { var ( fit bool reasons []algorithm.PredicateFailureReason @@ -605,11 +570,7 @@ func podFitsOnNode( ) //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { - if eCacheAvailable { - fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass) - } else { - fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) - } + fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) if err != nil { return false, []algorithm.PredicateFailureReason{}, err } @@ -1038,7 +999,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1052,7 +1013,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, queue, false) if !fits { removePod(p) victims = append(victims, p) @@ -1168,7 +1129,6 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla // NewGenericScheduler creates a genericScheduler object. func NewGenericScheduler( cache schedulerinternalcache.Cache, - eCache *equivalence.Cache, podQueue internalqueue.SchedulingQueue, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.PredicateMetadataProducer, @@ -1185,7 +1145,6 @@ func NewGenericScheduler( ) ScheduleAlgorithm { return &genericScheduler{ cache: cache, - equivalenceCache: eCache, schedulingQueue: podQueue, predicates: predicates, predicateMetaProducer: predicateMetaProducer, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index bf6b8dfbbc..fc168e3b13 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -22,7 +22,6 @@ import ( "reflect" "strconv" "strings" - "sync" "testing" "time" @@ -39,7 +38,6 @@ import ( algorithmpriorities "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" - "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" @@ -471,7 +469,6 @@ func TestGenericScheduler(t *testing.T) { scheduler := NewGenericScheduler( cache, - nil, internalqueue.NewSchedulingQueue(nil), test.predicates, algorithm.EmptyPredicateMetadataProducer, @@ -508,7 +505,6 @@ func makeScheduler(predicates map[string]algorithm.FitPredicate, nodes []*v1.Nod s := NewGenericScheduler( cache, - nil, internalqueue.NewSchedulingQueue(nil), predicates, algorithm.EmptyPredicateMetadataProducer, @@ -1435,7 +1431,6 @@ func TestPreempt(t *testing.T) { } scheduler := NewGenericScheduler( cache, - nil, internalqueue.NewSchedulingQueue(nil), map[string]algorithm.FitPredicate{"matches": algorithmpredicates.PodFitsResources}, algorithm.EmptyPredicateMetadataProducer, @@ -1488,189 +1483,3 @@ func TestPreempt(t *testing.T) { }) } } - -// syncingMockCache delegates method calls to an actual Cache, -// but calls to UpdateNodeNameToInfoMap synchronize with the test. -type syncingMockCache struct { - schedulerinternalcache.Cache - cycleStart, cacheInvalidated chan struct{} - once sync.Once -} - -// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it -// synchronizes with the test. -// -// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use -// this point to signal to the test that a scheduling cycle has started. -func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error { - err := c.Cache.UpdateNodeNameToInfoMap(infoMap) - c.once.Do(func() { - c.cycleStart <- struct{}{} - <-c.cacheInvalidated - }) - return err -} - -// TestCacheInvalidationRace tests that equivalence cache invalidation is correctly -// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event -// occurs after schedulernodeinfo is snapshotted and before equivalence cache lock is acquired. -func TestCacheInvalidationRace(t *testing.T) { - // Create a predicate that returns false the first time and true on subsequent calls. - podWillFit := false - var callCount int - testPredicate := func(pod *v1.Pod, - meta algorithm.PredicateMetadata, - nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - callCount++ - if !podWillFit { - podWillFit = true - return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil - } - return true, nil, nil - } - - // Set up the mock cache. - cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} - cache.AddNode(testNode) - mockCache := &syncingMockCache{ - Cache: cache, - cycleStart: make(chan struct{}), - cacheInvalidated: make(chan struct{}), - } - - ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} - algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) - eCache := equivalence.NewCache(algorithmpredicates.Ordering()) - eCache.GetNodeCache(testNode.Name) - // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before - // the equivalence cache would be updated. - go func() { - <-mockCache.cycleStart - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, - Spec: v1.PodSpec{NodeName: "machine1"}} - if err := cache.AddPod(pod); err != nil { - t.Errorf("Could not add pod to cache: %v", err) - } - eCache.InvalidateAllPredicatesOnNode("machine1") - mockCache.cacheInvalidated <- struct{}{} - }() - - // Set up the scheduler. - prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} - pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) - pdbLister := schedulertesting.FakePDBLister{} - scheduler := NewGenericScheduler( - mockCache, - eCache, - internalqueue.NewSchedulingQueue(nil), - ps, - algorithm.EmptyPredicateMetadataProducer, - prioritizers, - algorithm.EmptyPriorityMetadataProducer, - emptyPluginSet, - nil, nil, pvcLister, pdbLister, - true, false, - schedulerapi.DefaultPercentageOfNodesToScore) - - // First scheduling attempt should fail. - nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} - machine, err := scheduler.Schedule(pod, nodeLister) - if machine != "" || err == nil { - t.Error("First scheduling attempt did not fail") - } - - // Second scheduling attempt should succeed because cache was invalidated. - _, err = scheduler.Schedule(pod, nodeLister) - if err != nil { - t.Errorf("Second scheduling attempt failed: %v", err) - } - if callCount != 2 { - t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) - } -} - -// TestCacheInvalidationRace2 tests that cache invalidation is correctly handled -// when an invalidation event happens while a predicate is running. -func TestCacheInvalidationRace2(t *testing.T) { - // Create a predicate that returns false the first time and true on subsequent calls. - var ( - podWillFit = false - callCount int - cycleStart = make(chan struct{}) - cacheInvalidated = make(chan struct{}) - once sync.Once - ) - testPredicate := func(pod *v1.Pod, - meta algorithm.PredicateMetadata, - nodeInfo *schedulernodeinfo.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { - callCount++ - once.Do(func() { - cycleStart <- struct{}{} - <-cacheInvalidated - }) - if !podWillFit { - podWillFit = true - return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil - } - return true, nil, nil - } - - // Set up the mock cache. - cache := schedulerinternalcache.New(time.Duration(0), wait.NeverStop) - testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} - cache.AddNode(testNode) - - ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} - algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) - eCache := equivalence.NewCache(algorithmpredicates.Ordering()) - eCache.GetNodeCache(testNode.Name) - // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before - // the equivalence cache would be updated. - go func() { - <-cycleStart - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, - Spec: v1.PodSpec{NodeName: "machine1"}} - if err := cache.AddPod(pod); err != nil { - t.Errorf("Could not add pod to cache: %v", err) - } - eCache.InvalidateAllPredicatesOnNode("machine1") - cacheInvalidated <- struct{}{} - }() - - // Set up the scheduler. - prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} - pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) - pdbLister := schedulertesting.FakePDBLister{} - scheduler := NewGenericScheduler( - cache, - eCache, - internalqueue.NewSchedulingQueue(nil), - ps, - algorithm.EmptyPredicateMetadataProducer, - prioritizers, - algorithm.EmptyPriorityMetadataProducer, - emptyPluginSet, - nil, nil, pvcLister, pdbLister, true, false, - schedulerapi.DefaultPercentageOfNodesToScore) - - // First scheduling attempt should fail. - nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} - machine, err := scheduler.Schedule(pod, nodeLister) - if machine != "" || err == nil { - t.Error("First scheduling attempt did not fail") - } - - // Second scheduling attempt should succeed because cache was invalidated. - _, err = scheduler.Schedule(pod, nodeLister) - if err != nil { - t.Errorf("Second scheduling attempt failed: %v", err) - } - if callCount != 2 { - t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) - } -} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 0338a74ce7..8e3ea04613 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -51,7 +51,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" podutil "k8s.io/kubernetes/pkg/api/v1/pod" - "k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/features" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/scheduler/algorithm" @@ -59,7 +58,6 @@ import ( schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/api/validation" "k8s.io/kubernetes/pkg/scheduler/core" - "k8s.io/kubernetes/pkg/scheduler/core/equivalence" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -99,9 +97,7 @@ type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache schedulerinternalcache.Cache - // Ecache is used for optimistically invalid affected cache items after - // successfully binding a pod - Ecache *equivalence.Cache + NodeLister algorithm.NodeLister Algorithm core.ScheduleAlgorithm GetBinder func(pod *v1.Pod) Binder @@ -225,12 +221,6 @@ type configFactory struct { // HardPodAffinitySymmetricWeight represents the weight of implicit PreferredDuringScheduling affinity rule, in the range 0-100. hardPodAffinitySymmetricWeight int32 - // Equivalence class cache - equivalencePodCache *equivalence.Cache - - // Enable equivalence class cache - enableEquivalenceClassCache bool - // Handles volume binding decisions volumeBinder *volumebinder.VolumeBinder @@ -259,7 +249,6 @@ type ConfigFactoryArgs struct { PdbInformer policyinformers.PodDisruptionBudgetInformer StorageClassInformer storageinformers.StorageClassInformer HardPodAffinitySymmetricWeight int32 - EnableEquivalenceClassCache bool DisablePreemption bool PercentageOfNodesToScore int32 BindTimeoutSeconds int64 @@ -297,7 +286,6 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { StopEverything: stopEverything, schedulerName: args.SchedulerName, hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, - enableEquivalenceClassCache: args.EnableEquivalenceClassCache, disablePreemption: args.DisablePreemption, percentageOfNodesToScore: args.PercentageOfNodesToScore, } @@ -365,14 +353,11 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { }, ) - // On add and delete of PVs, it will affect equivalence cache items - // related to persistent volume args.PvInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ // MaxPDVolumeCountPredicate: since it relies on the counts of PV. AddFunc: c.onPvAdd, UpdateFunc: c.onPvUpdate, - DeleteFunc: c.onPvDelete, }, ) @@ -381,13 +366,10 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { cache.ResourceEventHandlerFuncs{ AddFunc: c.onPvcAdd, UpdateFunc: c.onPvcUpdate, - DeleteFunc: c.onPvcDelete, }, ) // This is for ServiceAffinity: affected by the selector of the service is updated. - // Also, if new service is added, equivalence cache will also become invalid since - // existing pods may be "captured" by this service and change this predicate result. args.ServiceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.onServiceAdd, @@ -396,17 +378,13 @@ func NewConfigFactory(args *ConfigFactoryArgs) Configurator { }, ) - // Existing equivalence cache should not be affected by add/delete RC/Deployment etc, - // it only make sense when pod is scheduled or deleted - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { // Setup volume binder c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) args.StorageClassInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: c.onStorageClassAdd, - DeleteFunc: c.onStorageClassDelete, + AddFunc: c.onStorageClassAdd, }, ) } @@ -486,14 +464,6 @@ func (c *configFactory) skipPodUpdate(pod *v1.Pod) bool { } func (c *configFactory) onPvAdd(obj interface{}) { - if c.enableEquivalenceClassCache { - pv, ok := obj.(*v1.PersistentVolume) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolume: %v", obj) - return - } - c.invalidatePredicatesForPv(pv) - } // Pods created when there are no PVs available will be stuck in // unschedulable queue. But unbound PVs created for static provisioning and // delay binding storage class are skipped in PV controller dynamic @@ -504,19 +474,6 @@ func (c *configFactory) onPvAdd(obj interface{}) { } func (c *configFactory) onPvUpdate(old, new interface{}) { - if c.enableEquivalenceClassCache { - newPV, ok := new.(*v1.PersistentVolume) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolume: %v", new) - return - } - oldPV, ok := old.(*v1.PersistentVolume) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolume: %v", old) - return - } - c.invalidatePredicatesForPvUpdate(oldPV, newPV) - } // Scheduler.bindVolumesWorker may fail to update assumed pod volume // bindings due to conflicts if PVs are updated by PV controller or other // parties, then scheduler will add pod back to unschedulable queue. We @@ -524,96 +481,12 @@ func (c *configFactory) onPvUpdate(old, new interface{}) { c.podQueue.MoveAllToActiveQueue() } -func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) { - invalidPredicates := sets.NewString() - // CheckVolumeBinding predicate calls SchedulerVolumeBinder.FindPodVolumes - // which will cache PVs in PodBindingCache. When PV got updated, we should - // invalidate cache, otherwise PVAssumeCache.Assume will fail with out of sync - // error. - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - invalidPredicates.Insert(predicates.CheckVolumeBindingPred) - } - for k, v := range newPV.Labels { - // If PV update modifies the zone/region labels. - if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) { - invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) - break - } - } - c.equivalencePodCache.InvalidatePredicates(invalidPredicates) -} - // isZoneRegionLabel check if given key of label is zone or region label. func isZoneRegionLabel(k string) bool { return k == kubeletapis.LabelZoneFailureDomain || k == kubeletapis.LabelZoneRegion } -func (c *configFactory) onPvDelete(obj interface{}) { - if c.enableEquivalenceClassCache { - var pv *v1.PersistentVolume - switch t := obj.(type) { - case *v1.PersistentVolume: - pv = t - case cache.DeletedFinalStateUnknown: - var ok bool - pv, ok = t.Obj.(*v1.PersistentVolume) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t.Obj) - return - } - default: - klog.Errorf("cannot convert to *v1.PersistentVolume: %v", t) - return - } - c.invalidatePredicatesForPv(pv) - } -} - -func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { - // You could have a PVC that points to a PV, but the PV object doesn't exist. - // So when the PV object gets added, we can recount. - invalidPredicates := sets.NewString() - - // PV types which impact MaxPDVolumeCountPredicate - if pv.Spec.AWSElasticBlockStore != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) - } - if pv.Spec.GCEPersistentDisk != nil { - invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) - } - if pv.Spec.AzureDisk != nil { - invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) - } - - if pv.Spec.CSI != nil && utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) - } - - // If PV contains zone related label, it may impact cached NoVolumeZoneConflict - for k := range pv.Labels { - if isZoneRegionLabel(k) { - invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) - break - } - } - - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - // Add/delete impacts the available PVs to choose from - invalidPredicates.Insert(predicates.CheckVolumeBindingPred) - } - - c.equivalencePodCache.InvalidatePredicates(invalidPredicates) -} - func (c *configFactory) onPvcAdd(obj interface{}) { - if c.enableEquivalenceClassCache { - pvc, ok := obj.(*v1.PersistentVolumeClaim) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", obj) - return - } - c.invalidatePredicatesForPvc(pvc) - } c.podQueue.MoveAllToActiveQueue() } @@ -621,83 +494,9 @@ func (c *configFactory) onPvcUpdate(old, new interface{}) { if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { return } - - if c.enableEquivalenceClassCache { - newPVC, ok := new.(*v1.PersistentVolumeClaim) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", new) - return - } - oldPVC, ok := old.(*v1.PersistentVolumeClaim) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", old) - return - } - c.invalidatePredicatesForPvcUpdate(oldPVC, newPVC) - } c.podQueue.MoveAllToActiveQueue() } -func (c *configFactory) onPvcDelete(obj interface{}) { - if c.enableEquivalenceClassCache { - var pvc *v1.PersistentVolumeClaim - switch t := obj.(type) { - case *v1.PersistentVolumeClaim: - pvc = t - case cache.DeletedFinalStateUnknown: - var ok bool - pvc, ok = t.Obj.(*v1.PersistentVolumeClaim) - if !ok { - klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t.Obj) - return - } - default: - klog.Errorf("cannot convert to *v1.PersistentVolumeClaim: %v", t) - return - } - c.invalidatePredicatesForPvc(pvc) - } -} - -func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) { - // We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod - - // The bound volume type may change - invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...) - - if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) - } - - // The bound volume's label may change - invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) - - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - // Add/delete impacts the available PVs to choose from - invalidPredicates.Insert(predicates.CheckVolumeBindingPred) - } - c.equivalencePodCache.InvalidatePredicates(invalidPredicates) -} - -func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { - invalidPredicates := sets.NewString() - - if old.Spec.VolumeName != new.Spec.VolumeName { - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - // PVC volume binding has changed - invalidPredicates.Insert(predicates.CheckVolumeBindingPred) - } - // The bound volume type may change - invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) - - if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { - invalidPredicates.Insert(predicates.MaxCSIVolumeCountPred) - } - } - - c.equivalencePodCache.InvalidatePredicates(invalidPredicates) -} - func (c *configFactory) onStorageClassAdd(obj interface{}) { sc, ok := obj.(*storagev1.StorageClass) if !ok { @@ -709,71 +508,20 @@ func (c *configFactory) onStorageClassAdd(obj interface{}) { // PVCs have specified StorageClass name, creating StorageClass objects // with late binding will cause predicates to pass, so we need to move pods // to active queue. - // We don't need to invalidate cached results because results will not be - // cached for pod that has unbound immediate PVCs. if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { c.podQueue.MoveAllToActiveQueue() } } -func (c *configFactory) onStorageClassDelete(obj interface{}) { - if c.enableEquivalenceClassCache { - var sc *storagev1.StorageClass - switch t := obj.(type) { - case *storagev1.StorageClass: - sc = t - case cache.DeletedFinalStateUnknown: - var ok bool - sc, ok = t.Obj.(*storagev1.StorageClass) - if !ok { - klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t.Obj) - return - } - default: - klog.Errorf("cannot convert to *storagev1.StorageClass: %v", t) - return - } - c.invalidatePredicatesForStorageClass(sc) - } -} - -func (c *configFactory) invalidatePredicatesForStorageClass(sc *storagev1.StorageClass) { - invalidPredicates := sets.NewString() - - if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) { - if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer { - // Delete can cause predicates to fail - invalidPredicates.Insert(predicates.CheckVolumeBindingPred) - invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) - } - } - - c.equivalencePodCache.InvalidatePredicates(invalidPredicates) -} - func (c *configFactory) onServiceAdd(obj interface{}) { - if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) - } c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) { - if c.enableEquivalenceClassCache { - // TODO(resouer) We may need to invalidate this for specified group of pods only - oldService := oldObj.(*v1.Service) - newService := newObj.(*v1.Service) - if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { - c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) - } - } c.podQueue.MoveAllToActiveQueue() } func (c *configFactory) onServiceDelete(obj interface{}) { - if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) - } c.podQueue.MoveAllToActiveQueue() } @@ -812,9 +560,6 @@ func (c *configFactory) addPodToCache(obj interface{}) { } c.podQueue.AssignedPodAdded(pod) - - // NOTE: Updating equivalence cache of addPodToCache has been - // handled optimistically in: pkg/scheduler/scheduler.go#assume() } func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { @@ -829,16 +574,10 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { return } - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { klog.Errorf("scheduler cache UpdatePod failed: %v", err) } - c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod) c.podQueue.AssignedPodUpdated(newPod) } @@ -883,27 +622,6 @@ func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) { } } -func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { - if c.enableEquivalenceClassCache { - // if the pod does not have bound node, updating equivalence cache is meaningless; - // if pod's bound node has been changed, that case should be handled by pod add & delete. - if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName { - if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { - // MatchInterPodAffinity need to be reconsidered for this node, - // as well as all nodes in its same failure domain. - c.equivalencePodCache.InvalidatePredicates( - matchInterPodAffinitySet) - } - // if requested container resource changed, invalidate GeneralPredicates of this node - if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), - predicates.GetResourceRequest(oldPod)) { - c.equivalencePodCache.InvalidatePredicatesOnNode( - newPod.Spec.NodeName, generalPredicatesSets) - } - } - } -} - func (c *configFactory) deletePodFromCache(obj interface{}) { var pod *v1.Pod switch t := obj.(type) { @@ -920,40 +638,14 @@ func (c *configFactory) deletePodFromCache(obj interface{}) { klog.Errorf("cannot convert to *v1.Pod: %v", t) return } - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. + if err := c.schedulerCache.RemovePod(pod); err != nil { klog.Errorf("scheduler cache RemovePod failed: %v", err) } - c.invalidateCachedPredicatesOnDeletePod(pod) c.podQueue.MoveAllToActiveQueue() } -func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { - if c.enableEquivalenceClassCache { - // part of this case is the same as pod add. - c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName) - // MatchInterPodAffinity need to be reconsidered for this node, - // as well as all nodes in its same failure domain. - // TODO(resouer) can we just do this for nodes in the same failure domain - c.equivalencePodCache.InvalidatePredicates( - matchInterPodAffinitySet) - - // if this pod have these PV, cached result of disk conflict will become invalid. - for _, volume := range pod.Spec.Volumes { - if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || - volume.RBD != nil || volume.ISCSI != nil { - c.equivalencePodCache.InvalidatePredicatesOnNode( - pod.Spec.NodeName, noDiskConflictSet) - } - } - } -} - func (c *configFactory) addNodeToCache(obj interface{}) { node, ok := obj.(*v1.Node) if !ok { @@ -961,19 +653,11 @@ func (c *configFactory) addNodeToCache(obj interface{}) { return } - // NOTE: Because the scheduler uses equivalence cache for nodes, we need - // to create it before adding node into scheduler cache. - if c.enableEquivalenceClassCache { - // GetNodeCache() will lazily create NodeCache for given node if it does not exist. - c.equivalencePodCache.GetNodeCache(node.GetName()) - } - if err := c.schedulerCache.AddNode(node); err != nil { klog.Errorf("scheduler cache AddNode failed: %v", err) } c.podQueue.MoveAllToActiveQueue() - // NOTE: add a new node does not affect existing predicates in equivalence cache } func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { @@ -988,16 +672,10 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { return } - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { klog.Errorf("scheduler cache UpdateNode failed: %v", err) } - c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode) // Only activate unschedulable pods if the node became more schedulable. // We skip the node property comparison when there is no unschedulable pods in the queue // to save processing cycles. We still trigger a move to active queue to cover the case @@ -1008,75 +686,6 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) { } } -func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) { - if c.enableEquivalenceClassCache { - // Begin to update equivalence cache based on node update - // TODO(resouer): think about lazily initialize this set - invalidPredicates := sets.NewString() - - if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) { - invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources" - } - if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) { - invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches" - for k, v := range oldNode.GetLabels() { - // any label can be topology key of pod, we have to invalidate in all cases - if v != newNode.GetLabels()[k] { - invalidPredicates.Insert(predicates.MatchInterPodAffinityPred) - } - // NoVolumeZoneConflict will only be affected by zone related label change - if isZoneRegionLabel(k) { - if v != newNode.GetLabels()[k] { - invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred) - } - } - } - } - - oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations()) - if oldErr != nil { - klog.Errorf("Failed to get taints from old node annotation for equivalence cache") - } - newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations()) - if newErr != nil { - klog.Errorf("Failed to get taints from new node annotation for equivalence cache") - } - if !reflect.DeepEqual(oldTaints, newTaints) || - !reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) { - invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred) - } - - if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) { - oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) - newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus) - for _, cond := range oldNode.Status.Conditions { - oldConditions[cond.Type] = cond.Status - } - for _, cond := range newNode.Status.Conditions { - newConditions[cond.Type] = cond.Status - } - if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] { - invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred) - } - if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] { - invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred) - } - if oldConditions[v1.NodePIDPressure] != newConditions[v1.NodePIDPressure] { - invalidPredicates.Insert(predicates.CheckNodePIDPressurePred) - } - if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] || - oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] || - oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] { - invalidPredicates.Insert(predicates.CheckNodeConditionPred) - } - } - if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { - invalidPredicates.Insert(predicates.CheckNodeConditionPred) - } - c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates) - } -} - func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool { if nodeSpecUnschedulableChanged(newNode, oldNode) { return true @@ -1140,17 +749,10 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { klog.Errorf("cannot convert to *v1.Node: %v", t) return } - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. + if err := c.schedulerCache.RemoveNode(node); err != nil { klog.Errorf("scheduler cache RemoveNode failed: %v", err) } - if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName()) - } } // Create creates a scheduler with the default algorithm provider. @@ -1288,15 +890,8 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // TODO(bsalamat): the default registrar should be able to process config files. c.pluginSet = plugins.NewDefaultPluginSet(pluginsv1alpha1.NewPluginContext(), &c.schedulerCache) - // Init equivalence class cache - if c.enableEquivalenceClassCache { - c.equivalencePodCache = equivalence.NewCache(predicates.Ordering()) - klog.Info("Created equivalence class cache") - } - algo := core.NewGenericScheduler( c.schedulerCache, - c.equivalencePodCache, c.podQueue, predicateFuncs, predicateMetaProducer, @@ -1315,7 +910,6 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, podBackoff := util.CreateDefaultPodBackoff() return &Config{ SchedulerCache: c.schedulerCache, - Ecache: c.equivalencePodCache, // The scheduler only needs to consider schedulable nodes. NodeLister: &nodeLister{c.nodeLister}, Algorithm: algo, @@ -1514,16 +1108,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue _, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil && errors.IsNotFound(err) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. c.schedulerCache.RemoveNode(&node) - // invalidate cached predicate for the node - if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName) - } } } } else { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 7f45516e7a..c389983766 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -45,9 +45,8 @@ import ( ) const ( - enableEquivalenceCache = true - disablePodPreemption = false - bindTimeoutSeconds = 600 + disablePodPreemption = false + bindTimeoutSeconds = 600 ) func TestCreate(t *testing.T) { @@ -532,7 +531,6 @@ func newConfigFactory(client clientset.Interface, hardPodAffinitySymmetricWeight informerFactory.Policy().V1beta1().PodDisruptionBudgets(), informerFactory.Storage().V1().StorageClasses(), hardPodAffinitySymmetricWeight, - enableEquivalenceCache, disablePodPreemption, schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds, diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 0ebbd4f8ef..b05ff33eb2 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -131,22 +131,6 @@ var ( Help: "Total preemption attempts in the cluster till now", }) - equivalenceCacheLookups = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Subsystem: SchedulerSubsystem, - Name: "equiv_cache_lookups_total", - Help: "Total number of equivalence cache lookups, by whether or not a cache entry was found", - }, []string{"result"}) - EquivalenceCacheHits = equivalenceCacheLookups.With(prometheus.Labels{"result": "hit"}) - EquivalenceCacheMisses = equivalenceCacheLookups.With(prometheus.Labels{"result": "miss"}) - - EquivalenceCacheWrites = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Subsystem: SchedulerSubsystem, - Name: "equiv_cache_writes", - Help: "Total number of equivalence cache writes, by result", - }, []string{"result"}) - metricsList = []prometheus.Collector{ scheduleAttempts, SchedulingLatency, @@ -158,8 +142,6 @@ var ( SchedulingAlgorithmPremptionEvaluationDuration, PreemptionVictims, PreemptionAttempts, - equivalenceCacheLookups, - EquivalenceCacheWrites, } ) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c2c60508b6..a9e2d12ec6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -28,7 +28,6 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -38,7 +37,6 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -70,7 +68,6 @@ func (sched *Scheduler) Cache() schedulerinternalcache.Cache { type schedulerOptions struct { schedulerName string hardPodAffinitySymmetricWeight int32 - enableEquivalenceClassCache bool disablePreemption bool percentageOfNodesToScore int32 bindTimeoutSeconds int64 @@ -93,13 +90,6 @@ func WithHardPodAffinitySymmetricWeight(hardPodAffinitySymmetricWeight int32) Op } } -// WithEquivalenceClassCacheEnabled sets enableEquivalenceClassCache for Scheduler, the default value is false -func WithEquivalenceClassCacheEnabled(enableEquivalenceClassCache bool) Option { - return func(o *schedulerOptions) { - o.enableEquivalenceClassCache = enableEquivalenceClassCache - } -} - // WithPreemptionDisabled sets disablePreemption for Scheduler, the default value is false func WithPreemptionDisabled(disablePreemption bool) Option { return func(o *schedulerOptions) { @@ -124,7 +114,6 @@ func WithBindTimeoutSeconds(bindTimeoutSeconds int64) Option { var defaultSchedulerOptions = schedulerOptions{ schedulerName: v1.DefaultSchedulerName, hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - enableEquivalenceClassCache: false, disablePreemption: false, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds: BindTimeoutSeconds, @@ -167,7 +156,6 @@ func New(client clientset.Interface, PdbInformer: pdbInformer, StorageClassInformer: storageClassInformer, HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: options.enableEquivalenceClassCache, DisablePreemption: options.disablePreemption, PercentageOfNodesToScore: options.percentageOfNodesToScore, BindTimeoutSeconds: options.bindTimeoutSeconds, @@ -370,12 +358,6 @@ func (sched *Scheduler) assumeVolumes(assumed *v1.Pod, host string) (allBound bo sched.recordSchedulingFailure(assumed, err, SchedulerError, fmt.Sprintf("AssumePodVolumes failed: %v", err)) } - // Invalidate ecache because assumed volumes could have affected the cached - // pvs for other pods - if sched.config.Ecache != nil { - invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) - sched.config.Ecache.InvalidatePredicates(invalidPredicates) - } } return } @@ -416,11 +398,7 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // If the binding fails, scheduler will release resources allocated to assumed pod // immediately. assumed.Spec.NodeName = host - // NOTE: Updates must be written to scheduler cache before invalidating - // equivalence cache, because we could snapshot equivalence cache after the - // invalidation and then snapshot the cache itself. If the cache is - // snapshotted before updates are written, we would update equivalence - // cache with stale information which is based on snapshot of old cache. + if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) @@ -438,12 +416,6 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { sched.config.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } - // Optimistically assume that the binding will succeed, so we need to invalidate affected - // predicates in equivalence cache. - // If the binding fails, these invalidated item will not break anything. - if sched.config.Ecache != nil { - sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host) - } return nil } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 51bae5d7ed..f0e0b4877f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -639,7 +639,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern algo := core.NewGenericScheduler( scache, nil, - nil, predicateMap, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, @@ -692,7 +691,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algo := core.NewGenericScheduler( scache, nil, - nil, predicateMap, algorithm.EmptyPredicateMetadataProducer, []algorithm.PriorityConfig{}, diff --git a/test/integration/daemonset/daemonset_test.go b/test/integration/daemonset/daemonset_test.go index f73cbe93a9..38536a64eb 100644 --- a/test/integration/daemonset/daemonset_test.go +++ b/test/integration/daemonset/daemonset_test.go @@ -112,7 +112,6 @@ func setupScheduler( PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: false, DisablePreemption: false, PercentageOfNodesToScore: 100, }) diff --git a/test/integration/scheduler/plugin_test.go b/test/integration/scheduler/plugin_test.go index 1fdf513d01..a2bcba025f 100644 --- a/test/integration/scheduler/plugin_test.go +++ b/test/integration/scheduler/plugin_test.go @@ -106,7 +106,7 @@ func TestReservePlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "reserve-plugin", nil), - false, nil, testPluginSet, false, true, time.Second) + false, nil, testPluginSet, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -157,7 +157,7 @@ func TestPrebindPlugin(t *testing.T) { // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "prebind-plugin", nil), - false, nil, testPluginSet, false, true, time.Second) + false, nil, testPluginSet, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet @@ -239,7 +239,7 @@ func TestContextCleanup(t *testing.T) { // Create the master and the scheduler with the test plugin set. context := initTestSchedulerWithOptions(t, initTestMaster(t, "plugin-context-cleanup", nil), - false, nil, testPluginSet, false, true, time.Second) + false, nil, testPluginSet, false, time.Second) defer cleanupTest(t, context) cs := context.clientSet diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index f993d8342b..07fb23a4f6 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -33,8 +33,6 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" - utilfeature "k8s.io/apiserver/pkg/util/feature" - utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" @@ -46,7 +44,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/disruption" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" _ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" @@ -91,7 +88,6 @@ func createConfiguratorWithPodInformer( PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, BindTimeoutSeconds: 600, @@ -149,7 +145,7 @@ func initTestScheduler( ) *TestContext { // Pod preemption is enabled by default scheduler configuration, but preemption only happens when PodPriority // feature gate is enabled at the same time. - return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, true, time.Second) + return initTestSchedulerWithOptions(t, context, setPodInformer, policy, nil, false, time.Second) } // initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default @@ -161,16 +157,8 @@ func initTestSchedulerWithOptions( policy *schedulerapi.Policy, pluginSet plugins.PluginSet, disablePreemption bool, - disableEquivalenceCache bool, resyncPeriod time.Duration, ) *TestContext { - if !disableEquivalenceCache { - defer utilfeaturetesting.SetFeatureGateDuringTest( - t, - utilfeature.DefaultFeatureGate, - features.EnableEquivalenceClassCache, true)() - } - // 1. Create scheduler context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, resyncPeriod) @@ -264,7 +252,7 @@ func initTest(t *testing.T, nsPrefix string) *TestContext { // configuration but with pod preemption disabled. func initTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext { return initTestSchedulerWithOptions( - t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, true, time.Second) + t, initTestMaster(t, nsPrefix, nil), true, nil, nil, true, time.Second) } // cleanupTest deletes the scheduler and the test namespace. It should be called diff --git a/test/integration/scheduler/volume_binding_test.go b/test/integration/scheduler/volume_binding_test.go index 7fa38f7b75..609a45958d 100644 --- a/test/integration/scheduler/volume_binding_test.go +++ b/test/integration/scheduler/volume_binding_test.go @@ -99,7 +99,7 @@ type testPVC struct { func TestVolumeBinding(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - config := setupCluster(t, "volume-scheduling-", 2, 0, 0, true) + config := setupCluster(t, "volume-scheduling-", 2, 0, 0) defer config.teardown() cases := map[string]struct { @@ -270,7 +270,7 @@ func TestVolumeBinding(t *testing.T) { func TestVolumeBindingRescheduling(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - config := setupCluster(t, "volume-scheduling-", 2, 0, 0, true) + config := setupCluster(t, "volume-scheduling-", 2, 0, 0) defer config.teardown() storageClassName := "local-storage" @@ -414,7 +414,7 @@ func TestVolumeBindingDynamicStressSlow(t *testing.T) { func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds, true) + config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds) defer config.teardown() // Set max volume limit to the number of PVCs the test will create @@ -504,8 +504,7 @@ func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - // TODO: disable equivalence cache until kubernetes/kubernetes#67680 is fixed - config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0, true) + config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0) defer config.teardown() pods := []*v1.Pod{} @@ -632,7 +631,7 @@ func TestVolumeBindingWithAffinity(t *testing.T) { func TestPVAffinityConflict(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - config := setupCluster(t, "volume-scheduling-", 3, 0, 0, true) + config := setupCluster(t, "volume-scheduling-", 3, 0, 0) defer config.teardown() pv := makePV("local-pv", classImmediate, "", "", node1) @@ -693,7 +692,7 @@ func TestPVAffinityConflict(t *testing.T) { func TestVolumeProvision(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.VolumeScheduling, true)() defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PersistentLocalVolumes, true)() - config := setupCluster(t, "volume-scheduling", 1, 0, 0, true) + config := setupCluster(t, "volume-scheduling", 1, 0, 0) defer config.teardown() cases := map[string]struct { @@ -889,9 +888,8 @@ func TestRescheduleProvisioning(t *testing.T) { } } -func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int, disableEquivalenceCache bool) *testConfig { - context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, disableEquivalenceCache, resyncPeriod) - +func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig { + context := initTestSchedulerWithOptions(t, initTestMaster(t, nsName, nil), false, nil, nil, false, resyncPeriod) clientset := context.clientSet ns := context.ns.Name diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 4137429ec4..939815747b 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -21,14 +21,12 @@ import ( "net/http/httptest" "k8s.io/api/core/v1" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" clientv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog" "k8s.io/kubernetes/pkg/api/legacyscheme" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/factory" @@ -94,6 +92,7 @@ func createSchedulerConfigurator( informerFactory informers.SharedInformerFactory, stopCh <-chan struct{}, ) factory.Configurator { + return factory.NewConfigFactory(&factory.ConfigFactoryArgs{ SchedulerName: v1.DefaultSchedulerName, Client: clientSet, @@ -108,7 +107,6 @@ func createSchedulerConfigurator( PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(), StorageClassInformer: informerFactory.Storage().V1().StorageClasses(), HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, - EnableEquivalenceClassCache: utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache), DisablePreemption: false, PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, StopCh: stopCh,