From 9b068706209c5133e65e9c64bb791a86d6507cbc Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Tue, 8 May 2018 15:10:40 -0700 Subject: [PATCH 1/3] Clean up names and comments in equivalence cache. --- pkg/scheduler/core/equivalence_cache.go | 111 +++++++++++-------- pkg/scheduler/core/equivalence_cache_test.go | 26 ++--- pkg/scheduler/core/generic_scheduler.go | 6 +- 3 files changed, 78 insertions(+), 65 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index da50d2a81f..6e080a256f 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -31,22 +31,30 @@ import ( "github.com/golang/glog" ) -// EquivalenceCache holds: -// 1. a map of AlgorithmCache with node name as key -// 2. function to get equivalence pod +// EquivalenceCache saves and reuses the output of predicate functions. Use +// RunPredicate to get or update the cached results. An appropriate Invalidate* +// function should be called when some predicate results are no longer valid. +// +// Internally, results are keyed by node name, predicate name, and "equivalence +// class". (Equivalence class is defined in the `EquivalenceClassInfo` type.) +// Saved results will be reused until an appropriate invalidation function is +// called. type EquivalenceCache struct { - mu sync.RWMutex - algorithmCache map[string]AlgorithmCache + mu sync.RWMutex + cache nodeMap } -// The AlgorithmCache stores PredicateMap with predicate name as key, PredicateMap as value. -type AlgorithmCache map[string]PredicateMap +// nodeMap stores PredicateCaches with node name as the key. +type nodeMap map[string]predicateMap -// PredicateMap stores HostPrediacte with equivalence hash as key -type PredicateMap map[uint64]HostPredicate +// predicateMap stores resultMaps with predicate name as the key. +type predicateMap map[string]resultMap -// HostPredicate is the cached predicate result -type HostPredicate struct { +// resultMap stores PredicateResult with pod equivalence hash as the key. +type resultMap map[uint64]predicateResult + +// predicateResult stores the output of a FitPredicate. +type predicateResult struct { Fit bool FailReasons []algorithm.PredicateFailureReason } @@ -55,12 +63,12 @@ type HostPredicate struct { // result from previous scheduling. func NewEquivalenceCache() *EquivalenceCache { return &EquivalenceCache{ - algorithmCache: make(map[string]AlgorithmCache), + cache: make(nodeMap), } } -// RunPredicate will return a cached predicate result. In case of a cache miss, the predicate will -// be run and its results cached for the next call. +// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be +// run and its results cached for the next call. // // NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. func (ec *EquivalenceCache) RunPredicate( @@ -69,7 +77,7 @@ func (ec *EquivalenceCache) RunPredicate( pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, - equivClassInfo *equivalenceClassInfo, + equivClassInfo *EquivalenceClassInfo, cache schedulercache.Cache, ) (bool, []algorithm.PredicateFailureReason, error) { if nodeInfo == nil || nodeInfo.Node() == nil { @@ -111,20 +119,20 @@ func (ec *EquivalenceCache) updateResult( return } nodeName := nodeInfo.Node().GetName() - if _, exist := ec.algorithmCache[nodeName]; !exist { - ec.algorithmCache[nodeName] = AlgorithmCache{} + if _, exist := ec.cache[nodeName]; !exist { + ec.cache[nodeName] = make(predicateMap) } - predicateItem := HostPredicate{ + predicateItem := predicateResult{ Fit: fit, FailReasons: reasons, } // if cached predicate map already exists, just update the predicate by key - if predicateMap, ok := ec.algorithmCache[nodeName][predicateKey]; ok { + if predicates, ok := ec.cache[nodeName][predicateKey]; ok { // maps in golang are references, no need to add them back - predicateMap[equivalenceHash] = predicateItem + predicates[equivalenceHash] = predicateItem } else { - ec.algorithmCache[nodeName][predicateKey] = - PredicateMap{ + ec.cache[nodeName][predicateKey] = + resultMap{ equivalenceHash: predicateItem, } } @@ -143,11 +151,11 @@ func (ec *EquivalenceCache) lookupResult( defer ec.mu.RUnlock() glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) - if hostPredicate, exist := ec.algorithmCache[nodeName][predicateKey][equivalenceHash]; exist { - if hostPredicate.Fit { + if result, exist := ec.cache[nodeName][predicateKey][equivalenceHash]; exist { + if result.Fit { return true, []algorithm.PredicateFailureReason{}, false } - return false, hostPredicate.FailReasons, false + return false, result.FailReasons, false } // is invalid return false, []algorithm.PredicateFailureReason{}, true @@ -161,40 +169,43 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predi ec.mu.Lock() defer ec.mu.Unlock() for predicateKey := range predicateKeys { - delete(ec.algorithmCache[nodeName], predicateKey) + delete(ec.cache[nodeName], predicateKey) } glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) } -// InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid +// InvalidateCachedPredicateItemOfAllNodes marks all items of given +// predicateKeys, of all pods, on all node as invalid func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { if len(predicateKeys) == 0 { return } ec.mu.Lock() defer ec.mu.Unlock() - // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates - for _, algorithmCache := range ec.algorithmCache { + // ec.cache uses nodeName as key, so we just iterate it and invalid given predicates + for _, predicates := range ec.cache { for predicateKey := range predicateKeys { - delete(algorithmCache, predicateKey) + delete(predicates, predicateKey) } } glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) } -// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid +// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node +// as invalid func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { ec.mu.Lock() defer ec.mu.Unlock() - delete(ec.algorithmCache, nodeName) + delete(ec.cache, nodeName) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) } -// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case +// InvalidateCachedPredicateItemForPodAdd is a wrapper of +// InvalidateCachedPredicateItem for pod add case func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod - // will not break the existing inter pod affinity. So we does not need to invalidate - // MatchInterPodAffinity when pod added. + // will not break the existing inter pod affinity. So we does not need to + // invalidate MatchInterPodAffinity when pod added. // // But when a pod is deleted, existing inter pod affinity may become invalid. // (e.g. this pod was preferred by some else, or vice versa) @@ -227,22 +238,23 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) } -// equivalenceClassInfo holds equivalence hash which is used for checking equivalence cache. -// We will pass this to podFitsOnNode to ensure equivalence hash is only calculated per schedule. -type equivalenceClassInfo struct { +// EquivalenceClassInfo holds equivalence hash which is used for checking +// equivalence cache. We will pass this to podFitsOnNode to ensure equivalence +// hash is only calculated per schedule. +type EquivalenceClassInfo struct { // Equivalence hash. hash uint64 } -// getEquivalenceClassInfo returns a hash of the given pod. -// The hashing function returns the same value for any two pods that are -// equivalent from the perspective of scheduling. -func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo { - equivalencePod := getEquivalenceHash(pod) +// GetEquivalenceClassInfo returns a hash of the given pod. The hashing function +// returns the same value for any two pods that are equivalent from the +// perspective of scheduling. +func (ec *EquivalenceCache) GetEquivalenceClassInfo(pod *v1.Pod) *EquivalenceClassInfo { + equivalencePod := getEquivalencePod(pod) if equivalencePod != nil { hash := fnv.New32a() hashutil.DeepHashObject(hash, equivalencePod) - return &equivalenceClassInfo{ + return &EquivalenceClassInfo{ hash: uint64(hash.Sum32()), } } @@ -254,9 +266,9 @@ func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceCla // include any Pod field which is used by a FitPredicate. // // NOTE: For equivalence hash to be formally correct, lists and maps in the -// equivalencePod should be normalized. (e.g. by sorting them) However, the -// vast majority of equivalent pod classes are expected to be created from a -// single pod template, so they will all have the same ordering. +// equivalencePod should be normalized. (e.g. by sorting them) However, the vast +// majority of equivalent pod classes are expected to be created from a single +// pod template, so they will all have the same ordering. type equivalencePod struct { Namespace *string Labels map[string]string @@ -269,8 +281,9 @@ type equivalencePod struct { Volumes []v1.Volume // See note about ordering } -// getEquivalenceHash returns the equivalencePod for a Pod. -func getEquivalenceHash(pod *v1.Pod) *equivalencePod { +// getEquivalencePod returns a normalized representation of a pod so that two +// "equivalent" pods will hash to the same value. +func getEquivalencePod(pod *v1.Pod) *equivalencePod { ep := &equivalencePod{ Namespace: &pod.Namespace, Labels: pod.Labels, diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index 412e258510..efbcf43e9a 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -251,7 +251,7 @@ func TestRunPredicate(t *testing.T) { meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) ecache := NewEquivalenceCache() - equivClass := ecache.getEquivalenceClassInfo(pod) + equivClass := ecache.GetEquivalenceClassInfo(pod) if test.expectCacheHit { ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) } @@ -311,7 +311,7 @@ func TestUpdateResult(t *testing.T) { reasons []algorithm.PredicateFailureReason equivalenceHash uint64 expectPredicateMap bool - expectCacheItem HostPredicate + expectCacheItem predicateResult cache schedulercache.Cache }{ { @@ -322,7 +322,7 @@ func TestUpdateResult(t *testing.T) { fit: true, equivalenceHash: 123, expectPredicateMap: false, - expectCacheItem: HostPredicate{ + expectCacheItem: predicateResult{ Fit: true, }, cache: &upToDateCache{}, @@ -335,7 +335,7 @@ func TestUpdateResult(t *testing.T) { fit: false, equivalenceHash: 123, expectPredicateMap: true, - expectCacheItem: HostPredicate{ + expectCacheItem: predicateResult{ Fit: false, }, cache: &upToDateCache{}, @@ -344,12 +344,12 @@ func TestUpdateResult(t *testing.T) { for _, test := range tests { ecache := NewEquivalenceCache() if test.expectPredicateMap { - ecache.algorithmCache[test.nodeName] = AlgorithmCache{} - predicateItem := HostPredicate{ + ecache.cache[test.nodeName] = make(predicateMap) + predicateItem := predicateResult{ Fit: true, } - ecache.algorithmCache[test.nodeName][test.predicateKey] = - PredicateMap{ + ecache.cache[test.nodeName][test.predicateKey] = + resultMap{ test.equivalenceHash: predicateItem, } } @@ -366,7 +366,7 @@ func TestUpdateResult(t *testing.T) { node, ) - cachedMapItem, ok := ecache.algorithmCache[test.nodeName][test.predicateKey] + cachedMapItem, ok := ecache.cache[test.nodeName][test.predicateKey] if !ok { t.Errorf("Failed: %s, can't find expected cache item: %v", test.name, test.expectCacheItem) @@ -618,7 +618,7 @@ func TestGetEquivalenceHash(t *testing.T) { t.Run(test.name, func(t *testing.T) { for i, podInfo := range test.podInfoList { testPod := podInfo.pod - eclassInfo := ecache.getEquivalenceClassInfo(testPod) + eclassInfo := ecache.GetEquivalenceClassInfo(testPod) if eclassInfo == nil && podInfo.hashIsValid { t.Errorf("Failed: pod %v is expected to have valid hash", testPod) } @@ -708,7 +708,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { // there should be no cached predicate any more for _, test := range tests { - if algorithmCache, exist := ecache.algorithmCache[test.nodeName]; exist { + if algorithmCache, exist := ecache.cache[test.nodeName]; exist { if _, exist := algorithmCache[testPredicate]; exist { t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", testPredicate, test.nodeName) @@ -778,7 +778,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { for _, test := range tests { // invalidate cached predicate for all nodes ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName) - if _, exist := ecache.algorithmCache[test.nodeName]; exist { + if _, exist := ecache.cache[test.nodeName]; exist { t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) break } @@ -788,7 +788,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { func BenchmarkEquivalenceHash(b *testing.B) { pod := makeBasicPod("test") for i := 0; i < b.N; i++ { - getEquivalenceHash(pod) + getEquivalencePod(pod) } } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 960054b0d1..83c21f45c3 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -342,10 +342,10 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v // We can use the same metadata producer for all nodes. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) - var equivCacheInfo *equivalenceClassInfo + var equivCacheInfo *EquivalenceClassInfo if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found - equivCacheInfo = g.equivalenceCache.getEquivalenceClassInfo(pod) + equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod) } checkNode := func(i int) { @@ -462,7 +462,7 @@ func podFitsOnNode( ecache *EquivalenceCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, - equivCacheInfo *equivalenceClassInfo, + equivCacheInfo *EquivalenceClassInfo, ) (bool, []algorithm.PredicateFailureReason, error) { var ( eCacheAvailable bool From 5d13798e5c33ce02834823cdaf58a7c319cb2e1a Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Tue, 8 May 2018 16:00:37 -0700 Subject: [PATCH 2/3] Change the return of EquivalenceClass.lookupResult. This makes the lookup behave like a normal map lookup, so it is easier for readers to follow the logic. It also inverts the "invalid" bool to an "ok" bool because `!invalid` is a double negative. --- pkg/scheduler/core/equivalence_cache.go | 24 ++++------- pkg/scheduler/core/equivalence_cache_test.go | 45 +++++++++++--------- 2 files changed, 33 insertions(+), 36 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index 6e080a256f..b2d56ff2a0 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -85,9 +85,9 @@ func (ec *EquivalenceCache) RunPredicate( return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") } - fit, reasons, invalid := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash) - if !invalid { - return fit, reasons, nil + result, ok := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash) + if ok { + return result.Fit, result.FailReasons, nil } fit, reasons, err := pred(pod, meta, nodeInfo) if err != nil { @@ -139,26 +139,18 @@ func (ec *EquivalenceCache) updateResult( glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) } -// lookupResult returns cached predicate results: -// 1. if pod fit -// 2. reasons if pod did not fit -// 3. if cache item is not found +// lookupResult returns cached predicate results and a bool saying whether a +// cache entry was found. func (ec *EquivalenceCache) lookupResult( podName, nodeName, predicateKey string, equivalenceHash uint64, -) (bool, []algorithm.PredicateFailureReason, bool) { +) (value predicateResult, ok bool) { ec.mu.RLock() defer ec.mu.RUnlock() glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) - if result, exist := ec.cache[nodeName][predicateKey][equivalenceHash]; exist { - if result.Fit { - return true, []algorithm.PredicateFailureReason{}, false - } - return false, result.FailReasons, false - } - // is invalid - return false, []algorithm.PredicateFailureReason{}, true + value, ok = ec.cache[nodeName][predicateKey][equivalenceHash] + return value, ok } // InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index efbcf43e9a..b745f1c81a 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -287,14 +287,14 @@ func TestRunPredicate(t *testing.T) { if !test.expectCacheHit && test.pred.callCount == 0 { t.Errorf("Predicate should be called") } - _, _, invalid := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) - if invalid && test.expectCacheWrite { + _, ok := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) + if !ok && test.expectCacheWrite { t.Errorf("Cache write should happen") } - if !test.expectCacheHit && test.expectCacheWrite && invalid { + if !test.expectCacheHit && test.expectCacheWrite && !ok { t.Errorf("Cache write should happen") } - if !test.expectCacheHit && !test.expectCacheWrite && !invalid { + if !test.expectCacheHit && !test.expectCacheWrite && ok { t.Errorf("Cache write should not happen") } }) @@ -396,8 +396,8 @@ func TestLookupResult(t *testing.T) { equivalenceHashForUpdatePredicate uint64 equivalenceHashForCalPredicate uint64 cachedItem predicateItemType - expectedInvalidPredicateKey bool - expectedInvalidEquivalenceHash bool + expectedPredicateKeyMiss bool + expectedEquivalenceHashMiss bool expectedPredicateItem predicateItemType cache schedulercache.Cache }{ @@ -412,7 +412,7 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - expectedInvalidPredicateKey: true, + expectedPredicateKeyMiss: true, expectedPredicateItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{}, @@ -429,7 +429,7 @@ func TestLookupResult(t *testing.T) { cachedItem: predicateItemType{ fit: true, }, - expectedInvalidPredicateKey: false, + expectedPredicateKeyMiss: false, expectedPredicateItem: predicateItemType{ fit: true, reasons: []algorithm.PredicateFailureReason{}, @@ -447,7 +447,7 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - expectedInvalidPredicateKey: false, + expectedPredicateKeyMiss: false, expectedPredicateItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, @@ -465,8 +465,8 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - expectedInvalidPredicateKey: false, - expectedInvalidEquivalenceHash: true, + expectedPredicateKeyMiss: false, + expectedEquivalenceHashMiss: true, expectedPredicateItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{}, @@ -490,27 +490,32 @@ func TestLookupResult(t *testing.T) { node, ) // if we want to do invalid, invalid the cached item - if test.expectedInvalidPredicateKey { + if test.expectedPredicateKeyMiss { predicateKeys := sets.NewString() predicateKeys.Insert(test.predicateKey) ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache - fit, reasons, invalid := ecache.lookupResult(test.podName, + result, ok := ecache.lookupResult(test.podName, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate, ) - // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash + fit, reasons := result.Fit, result.FailReasons + // returned invalid should match expectedPredicateKeyMiss or expectedEquivalenceHashMiss if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { - if invalid != test.expectedInvalidEquivalenceHash { - t.Errorf("Failed: %s, expected invalid: %v, but got: %v", - test.name, test.expectedInvalidEquivalenceHash, invalid) + if ok && test.expectedEquivalenceHashMiss { + t.Errorf("Failed: %s, expected (equivalence hash) cache miss", test.name) + } + if !ok && !test.expectedEquivalenceHashMiss { + t.Errorf("Failed: %s, expected (equivalence hash) cache hit", test.name) } } else { - if invalid != test.expectedInvalidPredicateKey { - t.Errorf("Failed: %s, expected invalid: %v, but got: %v", - test.name, test.expectedInvalidPredicateKey, invalid) + if ok && test.expectedPredicateKeyMiss { + t.Errorf("Failed: %s, expected (predicate key) cache miss", test.name) + } + if !ok && !test.expectedPredicateKeyMiss { + t.Errorf("Failed: %s, expected (predicate key) cache hit", test.name) } } // returned predicate result should match expected predicate item From ba08b05e286e7102dbc4a14638f440e8f42a171f Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Wed, 23 May 2018 11:41:57 -0700 Subject: [PATCH 3/3] Rename equiv. class invalidation functions. Change the invalidation functions to have cleaner and more consistent names. --- pkg/scheduler/core/equivalence_cache.go | 39 ++++++++++---------- pkg/scheduler/core/equivalence_cache_test.go | 8 ++-- pkg/scheduler/factory/factory.go | 28 +++++++------- pkg/scheduler/scheduler.go | 2 +- 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index b2d56ff2a0..032b53e4a1 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -153,22 +153,8 @@ func (ec *EquivalenceCache) lookupResult( return value, ok } -// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid -func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - ec.mu.Lock() - defer ec.mu.Unlock() - for predicateKey := range predicateKeys { - delete(ec.cache[nodeName], predicateKey) - } - glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) -} - -// InvalidateCachedPredicateItemOfAllNodes marks all items of given -// predicateKeys, of all pods, on all node as invalid -func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { +// InvalidatePredicates clears all cached results for the given predicates. +func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) { if len(predicateKeys) == 0 { return } @@ -183,9 +169,21 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) } -// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node -// as invalid -func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { +// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. +func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + ec.mu.Lock() + defer ec.mu.Unlock() + for predicateKey := range predicateKeys { + delete(ec.cache[nodeName], predicateKey) + } + glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) +} + +// InvalidateAllPredicatesOnNode clears all cached results for one node. +func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) { ec.mu.Lock() defer ec.mu.Unlock() delete(ec.cache, nodeName) @@ -194,6 +192,7 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri // InvalidateCachedPredicateItemForPodAdd is a wrapper of // InvalidateCachedPredicateItem for pod add case +// TODO: This logic does not belong with the equivalence cache implementation. func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod // will not break the existing inter pod affinity. So we does not need to @@ -227,7 +226,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, } } } - ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) + ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates) } // EquivalenceClassInfo holds equivalence hash which is used for checking diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index b745f1c81a..a1f3c8221c 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -493,7 +493,7 @@ func TestLookupResult(t *testing.T) { if test.expectedPredicateKeyMiss { predicateKeys := sets.NewString() predicateKeys.Insert(test.predicateKey) - ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) + ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache result, ok := ecache.lookupResult(test.podName, @@ -709,7 +709,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { } // invalidate cached predicate for all nodes - ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate)) + ecache.InvalidatePredicates(sets.NewString(testPredicate)) // there should be no cached predicate any more for _, test := range tests { @@ -782,7 +782,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { for _, test := range tests { // invalidate cached predicate for all nodes - ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName) + ecache.InvalidateAllPredicatesOnNode(test.nodeName) if _, exist := ecache.cache[test.nodeName]; exist { t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) break @@ -857,7 +857,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) { if err := cache.AddPod(pod); err != nil { t.Errorf("Could not add pod to cache: %v", err) } - eCache.InvalidateAllCachedPredicateItemOfNode("machine1") + eCache.InvalidateAllPredicatesOnNode("machine1") mockCache.cacheInvalidated <- struct{}{} }() diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index fb5cc22fcb..340cd9f6ff 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -410,7 +410,7 @@ func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.Persist break } } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } // isZoneRegionLabel check if given key of label is zone or region label. @@ -468,7 +468,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) onPvcAdd(obj interface{}) { @@ -538,7 +538,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { @@ -553,12 +553,12 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } @@ -569,7 +569,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) oldService := oldObj.(*v1.Service) newService := newObj.(*v1.Service) if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } } c.podQueue.MoveAllToActiveQueue() @@ -577,7 +577,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } @@ -694,13 +694,13 @@ func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + c.equivalencePodCache.InvalidatePredicates( matchInterPodAffinitySet) } // if requested container resource changed, invalidate GeneralPredicates of this node if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), predicates.GetResourceRequest(oldPod)) { - c.equivalencePodCache.InvalidateCachedPredicateItem( + c.equivalencePodCache.InvalidatePredicatesOnNode( newPod.Spec.NodeName, generalPredicatesSets) } } @@ -741,14 +741,14 @@ func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. // TODO(resouer) can we just do this for nodes in the same failure domain - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + c.equivalencePodCache.InvalidatePredicates( matchInterPodAffinitySet) // if this pod have these PV, cached result of disk conflict will become invalid. for _, volume := range pod.Spec.Volumes { if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || volume.RBD != nil || volume.ISCSI != nil { - c.equivalencePodCache.InvalidateCachedPredicateItem( + c.equivalencePodCache.InvalidatePredicatesOnNode( pod.Spec.NodeName, noDiskConflictSet) } } @@ -858,7 +858,7 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } - c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) + c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates) } } @@ -885,7 +885,7 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { glog.Errorf("scheduler cache RemoveNode failed: %v", err) } if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName()) + c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName()) } } @@ -1315,7 +1315,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue c.schedulerCache.RemoveNode(&node) // invalidate cached predicate for the node if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(nodeName) + c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName) } } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 98beabe5c5..b7990da15a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -284,7 +284,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error if bindingRequired { if sched.config.Ecache != nil { invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) - sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + sched.config.Ecache.InvalidatePredicates(invalidPredicates) } // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue