diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index 5ec9d9fdcc..c145f2d86f 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -33,25 +33,136 @@ import ( "github.com/golang/glog" ) -// Cache saves and reuses the output of predicate functions. Use RunPredicate to -// get or update the cached results. An appropriate Invalidate* function should -// be called when some predicate results are no longer valid. +// nodeMap stores a *Cache for each node. +type nodeMap map[string]*NodeCache + +// Cache is a thread safe map saves and reuses the output of predicate functions, +// it uses node name as key to access those cached results. // -// Internally, results are keyed by node name, predicate name, and "equivalence +// Internally, results are keyed by predicate name, and "equivalence // class". (Equivalence class is defined in the `Class` type.) Saved results // will be reused until an appropriate invalidation function is called. type Cache struct { - mu sync.RWMutex - cache nodeMap + // NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while + // the reality is lock contention in first level cache is rare. + mu sync.RWMutex + nodeToCache nodeMap } -// NewCache returns an empty Cache. +// NewCache create an empty equiv class cache. func NewCache() *Cache { return &Cache{ - cache: make(nodeMap), + nodeToCache: make(nodeMap), } } +// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to +// get or update the cached results. An appropriate Invalidate* function should +// be called when some predicate results are no longer valid. +// +// Internally, results are keyed by predicate name, and "equivalence +// class". (Equivalence class is defined in the `Class` type.) Saved results +// will be reused until an appropriate invalidation function is called. +// +// NodeCache objects are thread safe within the context of NodeCache, +type NodeCache struct { + mu sync.RWMutex + cache predicateMap +} + +// newNodeCache returns an empty NodeCache. +func newNodeCache() *NodeCache { + return &NodeCache{ + cache: make(predicateMap), + } +} + +// GetNodeCache returns the existing NodeCache for given node if present. Otherwise, +// it creates the NodeCache and returns it. +// The boolean flag is true if the value was loaded, false if created. +func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) { + c.mu.Lock() + defer c.mu.Unlock() + if nodeCache, exists = c.nodeToCache[name]; !exists { + nodeCache = newNodeCache() + c.nodeToCache[name] = nodeCache + } + return +} + +// InvalidatePredicates clears all cached results for the given predicates. +func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + c.mu.RLock() + defer c.mu.RUnlock() + for _, n := range c.nodeToCache { + n.invalidatePreds(predicateKeys) + } + glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) +} + +// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. +func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + c.mu.RLock() + defer c.mu.RUnlock() + if n, ok := c.nodeToCache[nodeName]; ok { + n.invalidatePreds(predicateKeys) + } + glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) +} + +// InvalidateAllPredicatesOnNode clears all cached results for one node. +func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.nodeToCache, nodeName) + glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) +} + +// InvalidateCachedPredicateItemForPodAdd is a wrapper of +// InvalidateCachedPredicateItem for pod add case +// TODO: This does not belong with the equivalence cache implementation. +func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { + // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod + // will not break the existing inter pod affinity. So we does not need to + // invalidate MatchInterPodAffinity when pod added. + // + // But when a pod is deleted, existing inter pod affinity may become invalid. + // (e.g. this pod was preferred by some else, or vice versa) + // + // NOTE: assumptions above will not stand when we implemented features like + // RequiredDuringSchedulingRequiredDuringExecutioc. + + // NoDiskConflict: the newly scheduled pod fits to existing pods on this node, + // it will also fits to equivalence class of existing pods + + // GeneralPredicates: will always be affected by adding a new pod + invalidPredicates := sets.NewString(predicates.GeneralPred) + + // MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc. + for _, vol := range pod.Spec.Volumes { + if vol.PersistentVolumeClaim != nil { + invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred) + } else { + if vol.AWSElasticBlockStore != nil { + invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) + } + if vol.GCEPersistentDisk != nil { + invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) + } + if vol.AzureDisk != nil { + invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) + } + } + } + c.InvalidatePredicatesOnNode(nodeName, invalidPredicates) +} + // Class represents a set of pods which are equivalent from the perspective of // the scheduler. i.e. the scheduler would make the same decision for any pod // from the same class. @@ -78,9 +189,6 @@ func NewClass(pod *v1.Pod) *Class { return nil } -// nodeMap stores PredicateCaches with node name as the key. -type nodeMap map[string]predicateMap - // predicateMap stores resultMaps with predicate name as the key. type predicateMap map[string]resultMap @@ -97,7 +205,7 @@ type predicateResult struct { // run and its results cached for the next call. // // NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. -func (c *Cache) RunPredicate( +func (n *NodeCache) RunPredicate( pred algorithm.FitPredicate, predicateKey string, pod *v1.Pod, @@ -111,7 +219,7 @@ func (c *Cache) RunPredicate( return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") } - result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash) + result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash) if ok { return result.Fit, result.FailReasons, nil } @@ -120,13 +228,13 @@ func (c *Cache) RunPredicate( return fit, reasons, err } if cache != nil { - c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo) + n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo) } return fit, reasons, nil } // updateResult updates the cached result of a predicate. -func (c *Cache) updateResult( +func (n *NodeCache) updateResult( podName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, @@ -134,8 +242,6 @@ func (c *Cache) updateResult( cache schedulercache.Cache, nodeInfo *schedulercache.NodeInfo, ) { - c.mu.Lock() - defer c.mu.Unlock() if nodeInfo == nil || nodeInfo.Node() == nil { // This may happen during tests. return @@ -144,114 +250,48 @@ func (c *Cache) updateResult( if !cache.IsUpToDate(nodeInfo) { return } - nodeName := nodeInfo.Node().GetName() - if _, exist := c.cache[nodeName]; !exist { - c.cache[nodeName] = make(predicateMap) - } + predicateItem := predicateResult{ Fit: fit, FailReasons: reasons, } - // if cached predicate map already exists, just update the predicate by key - if predicates, ok := c.cache[nodeName][predicateKey]; ok { + + n.mu.Lock() + defer n.mu.Unlock() + // If cached predicate map already exists, just update the predicate by key + if predicates, ok := n.cache[predicateKey]; ok { // maps in golang are references, no need to add them back predicates[equivalenceHash] = predicateItem } else { - c.cache[nodeName][predicateKey] = + n.cache[predicateKey] = resultMap{ equivalenceHash: predicateItem, } } - glog.V(5).Infof("Cache update: node=%s,predicate=%s,pod=%s,value=%v", nodeName, predicateKey, podName, predicateItem) + + glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v", + nodeInfo.Node().Name, predicateKey, podName, predicateItem) } // lookupResult returns cached predicate results and a bool saying whether a // cache entry was found. -func (c *Cache) lookupResult( +func (n *NodeCache) lookupResult( podName, nodeName, predicateKey string, equivalenceHash uint64, ) (value predicateResult, ok bool) { - c.mu.RLock() - defer c.mu.RUnlock() - glog.V(5).Infof("Cache lookup: node=%s,predicate=%s,pod=%s", nodeName, predicateKey, podName) - value, ok = c.cache[nodeName][predicateKey][equivalenceHash] + n.mu.RLock() + defer n.mu.RUnlock() + value, ok = n.cache[predicateKey][equivalenceHash] return value, ok } -// InvalidatePredicates clears all cached results for the given predicates. -func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - c.mu.Lock() - defer c.mu.Unlock() - // c.cache uses nodeName as key, so we just iterate it and invalid given predicates - for _, predicates := range c.cache { - for predicateKey := range predicateKeys { - delete(predicates, predicateKey) - } - } - glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) -} - -// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. -func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { - if len(predicateKeys) == 0 { - return - } - c.mu.Lock() - defer c.mu.Unlock() +// invalidatePreds deletes cached predicates by given keys. +func (n *NodeCache) invalidatePreds(predicateKeys sets.String) { + n.mu.Lock() + defer n.mu.Unlock() for predicateKey := range predicateKeys { - delete(c.cache[nodeName], predicateKey) + delete(n.cache, predicateKey) } - glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) -} - -// InvalidateAllPredicatesOnNode clears all cached results for one node. -func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.cache, nodeName) - glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) -} - -// InvalidateCachedPredicateItemForPodAdd is a wrapper of -// InvalidateCachedPredicateItem for pod add case -// TODO: This does not belong with the equivalence cache implementation. -func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { - // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod - // will not break the existing inter pod affinity. So we does not need to - // invalidate MatchInterPodAffinity when pod added. - // - // But when a pod is deleted, existing inter pod affinity may become invalid. - // (e.g. this pod was preferred by some else, or vice versa) - // - // NOTE: assumptions above will not stand when we implemented features like - // RequiredDuringSchedulingRequiredDuringExecution. - - // NoDiskConflict: the newly scheduled pod fits to existing pods on this node, - // it will also fits to equivalence class of existing pods - - // GeneralPredicates: will always be affected by adding a new pod - invalidPredicates := sets.NewString(predicates.GeneralPred) - - // MaxPDVolumeCountPredicate: we check the volumes of pod to make decision. - for _, vol := range pod.Spec.Volumes { - if vol.PersistentVolumeClaim != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred) - } else { - if vol.AWSElasticBlockStore != nil { - invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred) - } - if vol.GCEPersistentDisk != nil { - invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred) - } - if vol.AzureDisk != nil { - invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred) - } - } - } - c.InvalidatePredicatesOnNode(nodeName, invalidPredicates) } // equivalencePod is the set of pod attributes which must match for two pods to diff --git a/pkg/scheduler/core/equivalence/eqivalence_test.go b/pkg/scheduler/core/equivalence/eqivalence_test.go index a7c8fc3360..8d5dafe8cd 100644 --- a/pkg/scheduler/core/equivalence/eqivalence_test.go +++ b/pkg/scheduler/core/equivalence/eqivalence_test.go @@ -243,17 +243,21 @@ func TestRunPredicate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { node := schedulercache.NewNodeInfo() - node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}) + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}} + node.SetNode(testNode) pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) + // Initialize and populate equivalence class cache. ecache := NewCache() + nodeCache, _ := ecache.GetNodeCache(testNode.Name) + equivClass := NewClass(pod) if test.expectCacheHit { - ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) + nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) } - fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) + fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) if err != nil { if err.Error() != test.expectedError { @@ -284,7 +288,7 @@ func TestRunPredicate(t *testing.T) { if !test.expectCacheHit && test.pred.callCount == 0 { t.Errorf("Predicate should be called") } - _, ok := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) + _, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) if !ok && test.expectCacheWrite { t.Errorf("Cache write should happen") } @@ -339,21 +343,25 @@ func TestUpdateResult(t *testing.T) { }, } for _, test := range tests { + node := schedulercache.NewNodeInfo() + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} + node.SetNode(testNode) + + // Initialize and populate equivalence class cache. ecache := NewCache() + nodeCache, _ := ecache.GetNodeCache(testNode.Name) + if test.expectPredicateMap { - ecache.cache[test.nodeName] = make(predicateMap) predicateItem := predicateResult{ Fit: true, } - ecache.cache[test.nodeName][test.predicateKey] = + nodeCache.cache[test.predicateKey] = resultMap{ test.equivalenceHash: predicateItem, } } - node := schedulercache.NewNodeInfo() - node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) - ecache.updateResult( + nodeCache.updateResult( test.pod, test.predicateKey, test.fit, @@ -363,7 +371,7 @@ func TestUpdateResult(t *testing.T) { node, ) - cachedMapItem, ok := ecache.cache[test.nodeName][test.predicateKey] + cachedMapItem, ok := nodeCache.cache[test.predicateKey] if !ok { t.Errorf("Failed: %s, can't find expected cache item: %v", test.name, test.expectCacheItem) @@ -473,11 +481,16 @@ func TestLookupResult(t *testing.T) { } for _, test := range tests { + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} + + // Initialize and populate equivalence class cache. ecache := NewCache() + nodeCache, _ := ecache.GetNodeCache(testNode.Name) + node := schedulercache.NewNodeInfo() - node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) + node.SetNode(testNode) // set cached item to equivalence cache - ecache.updateResult( + nodeCache.updateResult( test.podName, test.predicateKey, test.cachedItem.fit, @@ -493,7 +506,7 @@ func TestLookupResult(t *testing.T) { ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache - result, ok := ecache.lookupResult(test.podName, + result, ok := nodeCache.lookupResult(test.podName, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate, @@ -689,9 +702,12 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { for _, test := range tests { node := schedulercache.NewNodeInfo() - node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} + node.SetNode(testNode) + + nodeCache, _ := ecache.GetNodeCache(testNode.Name) // set cached item to equivalence cache - ecache.updateResult( + nodeCache.updateResult( test.podName, testPredicate, test.cachedItem.fit, @@ -707,8 +723,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { // there should be no cached predicate any more for _, test := range tests { - if algorithmCache, exist := ecache.cache[test.nodeName]; exist { - if _, exist := algorithmCache[testPredicate]; exist { + if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist { + if _, exist := nodeCache.cache[testPredicate]; exist { t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", testPredicate, test.nodeName) break @@ -761,9 +777,12 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { for _, test := range tests { node := schedulercache.NewNodeInfo() - node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} + node.SetNode(testNode) + + nodeCache, _ := ecache.GetNodeCache(testNode.Name) // set cached item to equivalence cache - ecache.updateResult( + nodeCache.updateResult( test.podName, testPredicate, test.cachedItem.fit, @@ -775,10 +794,10 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { } for _, test := range tests { - // invalidate cached predicate for all nodes + // invalidate all cached predicate for node ecache.InvalidateAllPredicatesOnNode(test.nodeName) - if _, exist := ecache.cache[test.nodeName]; exist { - t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) + if _, ok := ecache.GetNodeCache(test.nodeName); ok { + t.Errorf("Failed: node: %v should not be found in internal cache", test.nodeName) break } } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index ad25d6b86a..dd0e36f8fe 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -356,20 +356,25 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) var equivClass *equivalence.Class + if g.equivalenceCache != nil { // getEquivalenceClassInfo will return immediately if no equivalence pod found equivClass = equivalence.NewClass(pod) } checkNode := func(i int) { + var nodeCache *equivalence.NodeCache nodeName := nodes[i].Name + if g.equivalenceCache != nil { + nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName) + } fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, g.cache, - g.equivalenceCache, + nodeCache, g.schedulingQueue, g.alwaysCheckAllPredicates, equivClass, @@ -472,7 +477,7 @@ func podFitsOnNode( info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, cache schedulercache.Cache, - ecache *equivalence.Cache, + nodeCache *equivalence.NodeCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class, @@ -512,7 +517,7 @@ func podFitsOnNode( // Bypass eCache if node has any nominated pods. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. - eCacheAvailable = equivClass != nil && !podsAdded + eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded for _, predicateKey := range predicates.Ordering() { var ( fit bool @@ -522,7 +527,7 @@ func podFitsOnNode( //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { if eCacheAvailable { - fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) + fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) } else { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) } diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 6a6a1ee27a..bce8017296 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1405,7 +1405,8 @@ func TestCacheInvalidationRace(t *testing.T) { // Set up the mock cache. cache := schedulercache.New(time.Duration(0), wait.NeverStop) - cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}) + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} + cache.AddNode(testNode) mockCache := &syncingMockCache{ Cache: cache, cycleStart: make(chan struct{}), diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 93640d6f0e..b0a395509e 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -772,6 +772,11 @@ func (c *configFactory) addNodeToCache(obj interface{}) { glog.Errorf("scheduler cache AddNode failed: %v", err) } + if c.enableEquivalenceClassCache { + // GetNodeCache() will lazily create NodeCache for given node if it does not exist. + c.equivalencePodCache.GetNodeCache(node.GetName()) + } + c.podQueue.MoveAllToActiveQueue() // NOTE: add a new node does not affect existing predicates in equivalence cache }