From ba08b05e286e7102dbc4a14638f440e8f42a171f Mon Sep 17 00:00:00 2001 From: Jonathan Basseri Date: Wed, 23 May 2018 11:41:57 -0700 Subject: [PATCH] Rename equiv. class invalidation functions. Change the invalidation functions to have cleaner and more consistent names. --- pkg/scheduler/core/equivalence_cache.go | 39 ++++++++++---------- pkg/scheduler/core/equivalence_cache_test.go | 8 ++-- pkg/scheduler/factory/factory.go | 28 +++++++------- pkg/scheduler/scheduler.go | 2 +- 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/pkg/scheduler/core/equivalence_cache.go b/pkg/scheduler/core/equivalence_cache.go index b2d56ff2a0..032b53e4a1 100644 --- a/pkg/scheduler/core/equivalence_cache.go +++ b/pkg/scheduler/core/equivalence_cache.go @@ -153,22 +153,8 @@ func (ec *EquivalenceCache) lookupResult( return value, ok } -// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid -func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - ec.mu.Lock() - defer ec.mu.Unlock() - for predicateKey := range predicateKeys { - delete(ec.cache[nodeName], predicateKey) - } - glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) -} - -// InvalidateCachedPredicateItemOfAllNodes marks all items of given -// predicateKeys, of all pods, on all node as invalid -func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { +// InvalidatePredicates clears all cached results for the given predicates. +func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) { if len(predicateKeys) == 0 { return } @@ -183,9 +169,21 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) } -// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node -// as invalid -func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { +// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. +func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + ec.mu.Lock() + defer ec.mu.Unlock() + for predicateKey := range predicateKeys { + delete(ec.cache[nodeName], predicateKey) + } + glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) +} + +// InvalidateAllPredicatesOnNode clears all cached results for one node. +func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) { ec.mu.Lock() defer ec.mu.Unlock() delete(ec.cache, nodeName) @@ -194,6 +192,7 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri // InvalidateCachedPredicateItemForPodAdd is a wrapper of // InvalidateCachedPredicateItem for pod add case +// TODO: This logic does not belong with the equivalence cache implementation. func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod // will not break the existing inter pod affinity. So we does not need to @@ -227,7 +226,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, } } } - ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) + ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates) } // EquivalenceClassInfo holds equivalence hash which is used for checking diff --git a/pkg/scheduler/core/equivalence_cache_test.go b/pkg/scheduler/core/equivalence_cache_test.go index b745f1c81a..a1f3c8221c 100644 --- a/pkg/scheduler/core/equivalence_cache_test.go +++ b/pkg/scheduler/core/equivalence_cache_test.go @@ -493,7 +493,7 @@ func TestLookupResult(t *testing.T) { if test.expectedPredicateKeyMiss { predicateKeys := sets.NewString() predicateKeys.Insert(test.predicateKey) - ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) + ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache result, ok := ecache.lookupResult(test.podName, @@ -709,7 +709,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { } // invalidate cached predicate for all nodes - ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate)) + ecache.InvalidatePredicates(sets.NewString(testPredicate)) // there should be no cached predicate any more for _, test := range tests { @@ -782,7 +782,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { for _, test := range tests { // invalidate cached predicate for all nodes - ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName) + ecache.InvalidateAllPredicatesOnNode(test.nodeName) if _, exist := ecache.cache[test.nodeName]; exist { t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) break @@ -857,7 +857,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) { if err := cache.AddPod(pod); err != nil { t.Errorf("Could not add pod to cache: %v", err) } - eCache.InvalidateAllCachedPredicateItemOfNode("machine1") + eCache.InvalidateAllPredicatesOnNode("machine1") mockCache.cacheInvalidated <- struct{}{} }() diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index fb5cc22fcb..340cd9f6ff 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -410,7 +410,7 @@ func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.Persist break } } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } // isZoneRegionLabel check if given key of label is zone or region label. @@ -468,7 +468,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) { invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) onPvcAdd(obj interface{}) { @@ -538,7 +538,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim // Add/delete impacts the available PVs to choose from invalidPredicates.Insert(predicates.CheckVolumeBindingPred) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { @@ -553,12 +553,12 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) } - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + c.equivalencePodCache.InvalidatePredicates(invalidPredicates) } func (c *configFactory) onServiceAdd(obj interface{}) { if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } @@ -569,7 +569,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) oldService := oldObj.(*v1.Service) newService := newObj.(*v1.Service) if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } } c.podQueue.MoveAllToActiveQueue() @@ -577,7 +577,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) func (c *configFactory) onServiceDelete(obj interface{}) { if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) + c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet) } c.podQueue.MoveAllToActiveQueue() } @@ -694,13 +694,13 @@ func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + c.equivalencePodCache.InvalidatePredicates( matchInterPodAffinitySet) } // if requested container resource changed, invalidate GeneralPredicates of this node if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), predicates.GetResourceRequest(oldPod)) { - c.equivalencePodCache.InvalidateCachedPredicateItem( + c.equivalencePodCache.InvalidatePredicatesOnNode( newPod.Spec.NodeName, generalPredicatesSets) } } @@ -741,14 +741,14 @@ func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) { // MatchInterPodAffinity need to be reconsidered for this node, // as well as all nodes in its same failure domain. // TODO(resouer) can we just do this for nodes in the same failure domain - c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( + c.equivalencePodCache.InvalidatePredicates( matchInterPodAffinitySet) // if this pod have these PV, cached result of disk conflict will become invalid. for _, volume := range pod.Spec.Volumes { if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || volume.RBD != nil || volume.ISCSI != nil { - c.equivalencePodCache.InvalidateCachedPredicateItem( + c.equivalencePodCache.InvalidatePredicatesOnNode( pod.Spec.NodeName, noDiskConflictSet) } } @@ -858,7 +858,7 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { invalidPredicates.Insert(predicates.CheckNodeConditionPred) } - c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) + c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates) } } @@ -885,7 +885,7 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) { glog.Errorf("scheduler cache RemoveNode failed: %v", err) } if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName()) + c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName()) } } @@ -1315,7 +1315,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue c.schedulerCache.RemoveNode(&node) // invalidate cached predicate for the node if c.enableEquivalenceClassCache { - c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(nodeName) + c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName) } } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 98beabe5c5..b7990da15a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -284,7 +284,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error if bindingRequired { if sched.config.Ecache != nil { invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) - sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) + sched.config.Ecache.InvalidatePredicates(invalidPredicates) } // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue