Rename equiv. class invalidation functions.

Change the invalidation functions to have cleaner and more consistent
names.
pull/8/head
Jonathan Basseri 2018-05-23 11:41:57 -07:00
parent 5d13798e5c
commit ba08b05e28
4 changed files with 38 additions and 39 deletions

View File

@ -153,22 +153,8 @@ func (ec *EquivalenceCache) lookupResult(
return value, ok return value, ok
} }
// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid // InvalidatePredicates clears all cached results for the given predicates.
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
ec.mu.Lock()
defer ec.mu.Unlock()
for predicateKey := range predicateKeys {
delete(ec.cache[nodeName], predicateKey)
}
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
}
// InvalidateCachedPredicateItemOfAllNodes marks all items of given
// predicateKeys, of all pods, on all node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
@ -183,9 +169,21 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKey
glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys)
} }
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node // InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
// as invalid func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { if len(predicateKeys) == 0 {
return
}
ec.mu.Lock()
defer ec.mu.Unlock()
for predicateKey := range predicateKeys {
delete(ec.cache[nodeName], predicateKey)
}
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) {
ec.mu.Lock() ec.mu.Lock()
defer ec.mu.Unlock() defer ec.mu.Unlock()
delete(ec.cache, nodeName) delete(ec.cache, nodeName)
@ -194,6 +192,7 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri
// InvalidateCachedPredicateItemForPodAdd is a wrapper of // InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case // InvalidateCachedPredicateItem for pod add case
// TODO: This logic does not belong with the equivalence cache implementation.
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to // will not break the existing inter pod affinity. So we does not need to
@ -227,7 +226,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
} }
} }
} }
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
} }
// EquivalenceClassInfo holds equivalence hash which is used for checking // EquivalenceClassInfo holds equivalence hash which is used for checking

View File

@ -493,7 +493,7 @@ func TestLookupResult(t *testing.T) {
if test.expectedPredicateKeyMiss { if test.expectedPredicateKeyMiss {
predicateKeys := sets.NewString() predicateKeys := sets.NewString()
predicateKeys.Insert(test.predicateKey) predicateKeys.Insert(test.predicateKey)
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
} }
// calculate predicate with equivalence cache // calculate predicate with equivalence cache
result, ok := ecache.lookupResult(test.podName, result, ok := ecache.lookupResult(test.podName,
@ -709,7 +709,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
} }
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate)) ecache.InvalidatePredicates(sets.NewString(testPredicate))
// there should be no cached predicate any more // there should be no cached predicate any more
for _, test := range tests { for _, test := range tests {
@ -782,7 +782,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
for _, test := range tests { for _, test := range tests {
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName) ecache.InvalidateAllPredicatesOnNode(test.nodeName)
if _, exist := ecache.cache[test.nodeName]; exist { if _, exist := ecache.cache[test.nodeName]; exist {
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
break break
@ -857,7 +857,7 @@ func TestEquivalenceCacheInvalidationRace(t *testing.T) {
if err := cache.AddPod(pod); err != nil { if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err) t.Errorf("Could not add pod to cache: %v", err)
} }
eCache.InvalidateAllCachedPredicateItemOfNode("machine1") eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{} mockCache.cacheInvalidated <- struct{}{}
}() }()

View File

@ -410,7 +410,7 @@ func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.Persist
break break
} }
} }
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
} }
// isZoneRegionLabel check if given key of label is zone or region label. // isZoneRegionLabel check if given key of label is zone or region label.
@ -468,7 +468,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
invalidPredicates.Insert(predicates.CheckVolumeBindingPred) invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
} }
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
} }
func (c *configFactory) onPvcAdd(obj interface{}) { func (c *configFactory) onPvcAdd(obj interface{}) {
@ -538,7 +538,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
// Add/delete impacts the available PVs to choose from // Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBindingPred) invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
} }
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
} }
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) { func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
@ -553,12 +553,12 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...) invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
} }
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) c.equivalencePodCache.InvalidatePredicates(invalidPredicates)
} }
func (c *configFactory) onServiceAdd(obj interface{}) { func (c *configFactory) onServiceAdd(obj interface{}) {
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
} }
c.podQueue.MoveAllToActiveQueue() c.podQueue.MoveAllToActiveQueue()
} }
@ -569,7 +569,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
oldService := oldObj.(*v1.Service) oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service) newService := newObj.(*v1.Service)
if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) { if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
} }
} }
c.podQueue.MoveAllToActiveQueue() c.podQueue.MoveAllToActiveQueue()
@ -577,7 +577,7 @@ func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{})
func (c *configFactory) onServiceDelete(obj interface{}) { func (c *configFactory) onServiceDelete(obj interface{}) {
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet) c.equivalencePodCache.InvalidatePredicates(serviceAffinitySet)
} }
c.podQueue.MoveAllToActiveQueue() c.podQueue.MoveAllToActiveQueue()
} }
@ -694,13 +694,13 @@ func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, ol
if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) { if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
// MatchInterPodAffinity need to be reconsidered for this node, // MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain. // as well as all nodes in its same failure domain.
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( c.equivalencePodCache.InvalidatePredicates(
matchInterPodAffinitySet) matchInterPodAffinitySet)
} }
// if requested container resource changed, invalidate GeneralPredicates of this node // if requested container resource changed, invalidate GeneralPredicates of this node
if !reflect.DeepEqual(predicates.GetResourceRequest(newPod), if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
predicates.GetResourceRequest(oldPod)) { predicates.GetResourceRequest(oldPod)) {
c.equivalencePodCache.InvalidateCachedPredicateItem( c.equivalencePodCache.InvalidatePredicatesOnNode(
newPod.Spec.NodeName, generalPredicatesSets) newPod.Spec.NodeName, generalPredicatesSets)
} }
} }
@ -741,14 +741,14 @@ func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
// MatchInterPodAffinity need to be reconsidered for this node, // MatchInterPodAffinity need to be reconsidered for this node,
// as well as all nodes in its same failure domain. // as well as all nodes in its same failure domain.
// TODO(resouer) can we just do this for nodes in the same failure domain // TODO(resouer) can we just do this for nodes in the same failure domain
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes( c.equivalencePodCache.InvalidatePredicates(
matchInterPodAffinitySet) matchInterPodAffinitySet)
// if this pod have these PV, cached result of disk conflict will become invalid. // if this pod have these PV, cached result of disk conflict will become invalid.
for _, volume := range pod.Spec.Volumes { for _, volume := range pod.Spec.Volumes {
if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil || if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
volume.RBD != nil || volume.ISCSI != nil { volume.RBD != nil || volume.ISCSI != nil {
c.equivalencePodCache.InvalidateCachedPredicateItem( c.equivalencePodCache.InvalidatePredicatesOnNode(
pod.Spec.NodeName, noDiskConflictSet) pod.Spec.NodeName, noDiskConflictSet)
} }
} }
@ -858,7 +858,7 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node,
if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable { if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
invalidPredicates.Insert(predicates.CheckNodeConditionPred) invalidPredicates.Insert(predicates.CheckNodeConditionPred)
} }
c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates) c.equivalencePodCache.InvalidatePredicatesOnNode(newNode.GetName(), invalidPredicates)
} }
} }
@ -885,7 +885,7 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
glog.Errorf("scheduler cache RemoveNode failed: %v", err) glog.Errorf("scheduler cache RemoveNode failed: %v", err)
} }
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(node.GetName()) c.equivalencePodCache.InvalidateAllPredicatesOnNode(node.GetName())
} }
} }
@ -1315,7 +1315,7 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
c.schedulerCache.RemoveNode(&node) c.schedulerCache.RemoveNode(&node)
// invalidate cached predicate for the node // invalidate cached predicate for the node
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache.InvalidateAllCachedPredicateItemOfNode(nodeName) c.equivalencePodCache.InvalidateAllPredicatesOnNode(nodeName)
} }
} }
} }

View File

@ -284,7 +284,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
if bindingRequired { if bindingRequired {
if sched.config.Ecache != nil { if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred) invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates) sched.config.Ecache.InvalidatePredicates(invalidPredicates)
} }
// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue // bindVolumesWorker() will update the Pod object to put it back in the scheduler queue