diff --git a/pkg/scheduler/cache/BUILD b/pkg/scheduler/cache/BUILD index 98d474beba..3b63cc0ed7 100644 --- a/pkg/scheduler/cache/BUILD +++ b/pkg/scheduler/cache/BUILD @@ -50,7 +50,6 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", ], ) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 5fc3980492..0be6512849 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -555,13 +555,6 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi return pdbs, nil } -func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool { - cache.mu.RLock() - defer cache.mu.RUnlock() - node, ok := cache.nodes[n.Node().Name] - return ok && n.generation == node.generation -} - func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) } diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 45b79c0e6e..01837ca569 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" @@ -1336,26 +1335,3 @@ func TestPDBOperations(t *testing.T) { } } } - -func TestIsUpToDate(t *testing.T) { - cache := New(time.Duration(0), wait.NeverStop) - if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil { - t.Errorf("Could not add node: %v", err) - } - s := cache.Snapshot() - node := s.Nodes["n1"] - if !cache.IsUpToDate(node) { - t.Errorf("Node incorrectly marked as stale") - } - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}} - if err := cache.AddPod(pod); err != nil { - t.Errorf("Could not add pod: %v", err) - } - if cache.IsUpToDate(node) { - t.Errorf("Node incorrectly marked as up to date") - } - badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}} - if cache.IsUpToDate(badNode) { - t.Errorf("Nonexistant node incorrectly marked as up to date") - } -} diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 21eba905ef..2f61d541dd 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -123,9 +123,6 @@ type Cache interface { // Snapshot takes a snapshot on current cache Snapshot() *Snapshot - // IsUpToDate returns true if the given NodeInfo matches the current data in the cache. - IsUpToDate(n *NodeInfo) bool - // NodeTree returns a node tree structure NodeTree() *NodeTree } diff --git a/pkg/scheduler/core/equivalence/BUILD b/pkg/scheduler/core/equivalence/BUILD index 1b9f81db3c..1a7887563f 100644 --- a/pkg/scheduler/core/equivalence/BUILD +++ b/pkg/scheduler/core/equivalence/BUILD @@ -25,7 +25,6 @@ go_test( "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/cache:go_default_library", - "//pkg/scheduler/testing:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/scheduler/core/equivalence/eqivalence.go b/pkg/scheduler/core/equivalence/eqivalence.go index 4ee59a3f35..f0a65e9f2a 100644 --- a/pkg/scheduler/core/equivalence/eqivalence.go +++ b/pkg/scheduler/core/equivalence/eqivalence.go @@ -35,7 +35,7 @@ import ( "github.com/golang/glog" ) -// nodeMap stores a *Cache for each node. +// nodeMap stores a *NodeCache for each node. type nodeMap map[string]*NodeCache // Cache is a thread safe map saves and reuses the output of predicate functions, @@ -47,14 +47,20 @@ type nodeMap map[string]*NodeCache type Cache struct { // NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while // the reality is lock contention in first level cache is rare. - mu sync.RWMutex - nodeToCache nodeMap + mu sync.RWMutex + nodeToCache nodeMap + predicateIDMap map[string]int } // NewCache create an empty equiv class cache. -func NewCache() *Cache { +func NewCache(predicates []string) *Cache { + predicateIDMap := make(map[string]int, len(predicates)) + for id, predicate := range predicates { + predicateIDMap[predicate] = id + } return &Cache{ - nodeToCache: make(nodeMap), + nodeToCache: make(nodeMap), + predicateIDMap: predicateIDMap, } } @@ -70,15 +76,46 @@ func NewCache() *Cache { type NodeCache struct { mu sync.RWMutex cache predicateMap + // generation is current generation of node cache, incremented on node + // invalidation. + generation uint64 + // snapshotGeneration saves snapshot of generation of node cache. + snapshotGeneration uint64 + // predicateGenerations stores generation numbers for predicates, incremented on + // predicate invalidation. Created on first update. Use 0 if does not + // exist. + predicateGenerations []uint64 + // snapshotPredicateGenerations saves snapshot of generation numbers for predicates. + snapshotPredicateGenerations []uint64 } // newNodeCache returns an empty NodeCache. -func newNodeCache() *NodeCache { +func newNodeCache(n int) *NodeCache { return &NodeCache{ - cache: make(predicateMap), + cache: make(predicateMap, n), + predicateGenerations: make([]uint64, n), + snapshotPredicateGenerations: make([]uint64, n), } } +// Snapshot snapshots current generations of cache. +// NOTE: We snapshot generations of all node caches before using it and these +// operations are serialized, we can save snapshot as member of node cache +// itself. +func (c *Cache) Snapshot() { + c.mu.RLock() + defer c.mu.RUnlock() + for _, n := range c.nodeToCache { + n.mu.Lock() + // snapshot predicate generations + copy(n.snapshotPredicateGenerations, n.predicateGenerations) + // snapshot node generation + n.snapshotGeneration = n.generation + n.mu.Unlock() + } + return +} + // GetNodeCache returns the existing NodeCache for given node if present. Otherwise, // it creates the NodeCache and returns it. // The boolean flag is true if the value was loaded, false if created. @@ -86,12 +123,32 @@ func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) { c.mu.Lock() defer c.mu.Unlock() if nodeCache, exists = c.nodeToCache[name]; !exists { - nodeCache = newNodeCache() + nodeCache = newNodeCache(len(c.predicateIDMap)) c.nodeToCache[name] = nodeCache } return } +// LoadNodeCache returns the existing NodeCache for given node, nil if not +// present. +func (c *Cache) LoadNodeCache(node string) *NodeCache { + c.mu.RLock() + defer c.mu.RUnlock() + return c.nodeToCache[node] +} + +func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int { + predicateIDs := make([]int, 0, len(predicateKeys)) + for predicateKey := range predicateKeys { + if id, ok := c.predicateIDMap[predicateKey]; ok { + predicateIDs = append(predicateIDs, id) + } else { + glog.Errorf("predicate key %q not found", predicateKey) + } + } + return predicateIDs +} + // InvalidatePredicates clears all cached results for the given predicates. func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { if len(predicateKeys) == 0 { @@ -99,10 +156,12 @@ func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { } c.mu.RLock() defer c.mu.RUnlock() + predicateIDs := c.predicateKeysToIDs(predicateKeys) for _, n := range c.nodeToCache { - n.invalidatePreds(predicateKeys) + n.invalidatePreds(predicateIDs) } glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) + } // InvalidatePredicatesOnNode clears cached results for the given predicates on one node. @@ -112,17 +171,20 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S } c.mu.RLock() defer c.mu.RUnlock() + predicateIDs := c.predicateKeysToIDs(predicateKeys) if n, ok := c.nodeToCache[nodeName]; ok { - n.invalidatePreds(predicateKeys) + n.invalidatePreds(predicateIDs) } glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) } // InvalidateAllPredicatesOnNode clears all cached results for one node. func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.nodeToCache, nodeName) + c.mu.RLock() + defer c.mu.RUnlock() + if node, ok := c.nodeToCache[nodeName]; ok { + node.invalidate() + } glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) } @@ -191,8 +253,8 @@ func NewClass(pod *v1.Pod) *Class { return nil } -// predicateMap stores resultMaps with predicate name as the key. -type predicateMap map[string]resultMap +// predicateMap stores resultMaps with predicate ID as the key. +type predicateMap []resultMap // resultMap stores PredicateResult with pod equivalence hash as the key. type resultMap map[uint64]predicateResult @@ -206,22 +268,22 @@ type predicateResult struct { // RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be // run and its results cached for the next call. // -// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. +// NOTE: RunPredicate will not update the equivalence cache if generation does not match live version. func (n *NodeCache) RunPredicate( pred algorithm.FitPredicate, predicateKey string, + predicateID int, pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, equivClass *Class, - cache schedulercache.Cache, ) (bool, []algorithm.PredicateFailureReason, error) { if nodeInfo == nil || nodeInfo.Node() == nil { // This may happen during tests. return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") } - result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash) + result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash) if ok { return result.Fit, result.FailReasons, nil } @@ -229,19 +291,17 @@ func (n *NodeCache) RunPredicate( if err != nil { return fit, reasons, err } - if cache != nil { - n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo) - } + n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo) return fit, reasons, nil } // updateResult updates the cached result of a predicate. func (n *NodeCache) updateResult( podName, predicateKey string, + predicateID int, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64, - cache schedulercache.Cache, nodeInfo *schedulercache.NodeInfo, ) { if nodeInfo == nil || nodeInfo.Node() == nil { @@ -249,11 +309,6 @@ func (n *NodeCache) updateResult( metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc() return } - // Skip update if NodeInfo is stale. - if !cache.IsUpToDate(nodeInfo) { - metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc() - return - } predicateItem := predicateResult{ Fit: fit, @@ -262,16 +317,24 @@ func (n *NodeCache) updateResult( n.mu.Lock() defer n.mu.Unlock() + if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) { + // Generation of node or predicate has been updated since we last took + // a snapshot, this indicates that we received a invalidation request + // during this time. Cache may be stale, skip update. + metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc() + return + } // If cached predicate map already exists, just update the predicate by key - if predicates, ok := n.cache[predicateKey]; ok { + if predicates := n.cache[predicateID]; predicates != nil { // maps in golang are references, no need to add them back predicates[equivalenceHash] = predicateItem } else { - n.cache[predicateKey] = + n.cache[predicateID] = resultMap{ equivalenceHash: predicateItem, } } + n.predicateGenerations[predicateID]++ glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v", nodeInfo.Node().Name, predicateKey, podName, predicateItem) @@ -281,11 +344,12 @@ func (n *NodeCache) updateResult( // cache entry was found. func (n *NodeCache) lookupResult( podName, nodeName, predicateKey string, + predicateID int, equivalenceHash uint64, ) (value predicateResult, ok bool) { n.mu.RLock() defer n.mu.RUnlock() - value, ok = n.cache[predicateKey][equivalenceHash] + value, ok = n.cache[predicateID][equivalenceHash] if ok { metrics.EquivalenceCacheHits.Inc() } else { @@ -294,15 +358,24 @@ func (n *NodeCache) lookupResult( return value, ok } -// invalidatePreds deletes cached predicates by given keys. -func (n *NodeCache) invalidatePreds(predicateKeys sets.String) { +// invalidatePreds deletes cached predicates by given IDs. +func (n *NodeCache) invalidatePreds(predicateIDs []int) { n.mu.Lock() defer n.mu.Unlock() - for predicateKey := range predicateKeys { - delete(n.cache, predicateKey) + for _, predicateID := range predicateIDs { + n.cache[predicateID] = nil + n.predicateGenerations[predicateID]++ } } +// invalidate invalidates node cache. +func (n *NodeCache) invalidate() { + n.mu.Lock() + defer n.mu.Unlock() + n.cache = make(predicateMap, len(n.cache)) + n.generation++ +} + // equivalencePod is the set of pod attributes which must match for two pods to // be considered equivalent for scheduling purposes. For correctness, this must // include any Pod field which is used by a FitPredicate. diff --git a/pkg/scheduler/core/equivalence/eqivalence_test.go b/pkg/scheduler/core/equivalence/eqivalence_test.go index 47e1d95774..35111918bd 100644 --- a/pkg/scheduler/core/equivalence/eqivalence_test.go +++ b/pkg/scheduler/core/equivalence/eqivalence_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" - schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" ) // makeBasicPod returns a Pod object with many of the fields populated. @@ -155,16 +154,6 @@ type predicateItemType struct { reasons []algorithm.PredicateFailureReason } -// upToDateCache is a fake Cache where IsUpToDate always returns true. -type upToDateCache = schedulertesting.FakeCache - -// staleNodeCache is a fake Cache where IsUpToDate always returns false. -type staleNodeCache struct { - schedulertesting.FakeCache -} - -func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false } - // mockPredicate provides an algorithm.FitPredicate with pre-set return values. type mockPredicate struct { fit bool @@ -182,7 +171,6 @@ func TestRunPredicate(t *testing.T) { tests := []struct { name string pred mockPredicate - cache schedulercache.Cache expectFit, expectCacheHit, expectCacheWrite bool expectedReasons []algorithm.PredicateFailureReason expectedError string @@ -190,7 +178,6 @@ func TestRunPredicate(t *testing.T) { { name: "pod fits/cache hit", pred: mockPredicate{}, - cache: &upToDateCache{}, expectFit: true, expectCacheHit: true, expectCacheWrite: false, @@ -198,23 +185,13 @@ func TestRunPredicate(t *testing.T) { { name: "pod fits/cache miss", pred: mockPredicate{fit: true}, - cache: &upToDateCache{}, expectFit: true, expectCacheHit: false, expectCacheWrite: true, }, - { - name: "pod fits/cache miss/no write", - pred: mockPredicate{fit: true}, - cache: &staleNodeCache{}, - expectFit: true, - expectCacheHit: false, - expectCacheWrite: false, - }, { name: "pod doesn't fit/cache miss", pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}}, - cache: &upToDateCache{}, expectFit: false, expectCacheHit: false, expectCacheWrite: true, @@ -223,7 +200,6 @@ func TestRunPredicate(t *testing.T) { { name: "pod doesn't fit/cache hit", pred: mockPredicate{}, - cache: &upToDateCache{}, expectFit: false, expectCacheHit: true, expectCacheWrite: false, @@ -232,7 +208,6 @@ func TestRunPredicate(t *testing.T) { { name: "predicate error", pred: mockPredicate{err: errors.New("This is expected")}, - cache: &upToDateCache{}, expectFit: false, expectCacheHit: false, expectCacheWrite: false, @@ -240,6 +215,8 @@ func TestRunPredicate(t *testing.T) { }, } + predicatesOrdering := []string{"testPredicate"} + predicateID := 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { node := schedulercache.NewNodeInfo() @@ -249,15 +226,16 @@ func TestRunPredicate(t *testing.T) { meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) // Initialize and populate equivalence class cache. - ecache := NewCache() + ecache := NewCache(predicatesOrdering) + ecache.Snapshot() nodeCache, _ := ecache.GetNodeCache(testNode.Name) equivClass := NewClass(pod) if test.expectCacheHit { - nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) + nodeCache.updateResult(pod.Name, "testPredicate", predicateID, test.expectFit, test.expectedReasons, equivClass.hash, node) } - fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) + fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", predicateID, pod, meta, node, equivClass) if err != nil { if err.Error() != test.expectedError { @@ -288,7 +266,7 @@ func TestRunPredicate(t *testing.T) { if !test.expectCacheHit && test.pred.callCount == 0 { t.Errorf("Predicate should be called") } - _, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) + _, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", predicateID, equivClass.hash) if !ok && test.expectCacheWrite { t.Errorf("Cache write should happen") } @@ -303,22 +281,24 @@ func TestRunPredicate(t *testing.T) { } func TestUpdateResult(t *testing.T) { + predicatesOrdering := []string{"GeneralPredicates"} tests := []struct { name string pod string predicateKey string + predicateID int nodeName string fit bool reasons []algorithm.PredicateFailureReason equivalenceHash uint64 expectPredicateMap bool expectCacheItem predicateResult - cache schedulercache.Cache }{ { name: "test 1", pod: "testPod", predicateKey: "GeneralPredicates", + predicateID: 0, nodeName: "node1", fit: true, equivalenceHash: 123, @@ -326,12 +306,12 @@ func TestUpdateResult(t *testing.T) { expectCacheItem: predicateResult{ Fit: true, }, - cache: &upToDateCache{}, }, { name: "test 2", pod: "testPod", predicateKey: "GeneralPredicates", + predicateID: 0, nodeName: "node2", fit: false, equivalenceHash: 123, @@ -339,7 +319,6 @@ func TestUpdateResult(t *testing.T) { expectCacheItem: predicateResult{ Fit: false, }, - cache: &upToDateCache{}, }, } for _, test := range tests { @@ -349,14 +328,14 @@ func TestUpdateResult(t *testing.T) { node.SetNode(testNode) // Initialize and populate equivalence class cache. - ecache := NewCache() + ecache := NewCache(predicatesOrdering) nodeCache, _ := ecache.GetNodeCache(testNode.Name) if test.expectPredicateMap { predicateItem := predicateResult{ Fit: true, } - nodeCache.cache[test.predicateKey] = + nodeCache.cache[test.predicateID] = resultMap{ test.equivalenceHash: predicateItem, } @@ -365,15 +344,15 @@ func TestUpdateResult(t *testing.T) { nodeCache.updateResult( test.pod, test.predicateKey, + test.predicateID, test.fit, test.reasons, test.equivalenceHash, - test.cache, node, ) - cachedMapItem, ok := nodeCache.cache[test.predicateKey] - if !ok { + cachedMapItem := nodeCache.cache[test.predicateID] + if cachedMapItem == nil { t.Errorf("can't find expected cache item: %v", test.expectCacheItem) } else { if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { @@ -394,18 +373,19 @@ func slicesEqual(a, b []algorithm.PredicateFailureReason) bool { } func TestLookupResult(t *testing.T) { + predicatesOrdering := []string{"GeneralPredicates"} tests := []struct { name string podName string nodeName string predicateKey string + predicateID int equivalenceHashForUpdatePredicate uint64 equivalenceHashForCalPredicate uint64 cachedItem predicateItemType expectedPredicateKeyMiss bool expectedEquivalenceHashMiss bool expectedPredicateItem predicateItemType - cache schedulercache.Cache }{ { name: "test 1", @@ -414,6 +394,7 @@ func TestLookupResult(t *testing.T) { equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, predicateKey: "GeneralPredicates", + predicateID: 0, cachedItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, @@ -423,7 +404,6 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{}, }, - cache: &upToDateCache{}, }, { name: "test 2", @@ -432,6 +412,7 @@ func TestLookupResult(t *testing.T) { equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, predicateKey: "GeneralPredicates", + predicateID: 0, cachedItem: predicateItemType{ fit: true, }, @@ -440,7 +421,6 @@ func TestLookupResult(t *testing.T) { fit: true, reasons: []algorithm.PredicateFailureReason{}, }, - cache: &upToDateCache{}, }, { name: "test 3", @@ -449,6 +429,7 @@ func TestLookupResult(t *testing.T) { equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, predicateKey: "GeneralPredicates", + predicateID: 0, cachedItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, @@ -458,7 +439,6 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - cache: &upToDateCache{}, }, { name: "test 4", @@ -467,6 +447,7 @@ func TestLookupResult(t *testing.T) { equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 456, predicateKey: "GeneralPredicates", + predicateID: 0, cachedItem: predicateItemType{ fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, @@ -477,7 +458,6 @@ func TestLookupResult(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{}, }, - cache: &upToDateCache{}, }, } @@ -486,7 +466,7 @@ func TestLookupResult(t *testing.T) { testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} // Initialize and populate equivalence class cache. - ecache := NewCache() + ecache := NewCache(predicatesOrdering) nodeCache, _ := ecache.GetNodeCache(testNode.Name) node := schedulercache.NewNodeInfo() @@ -495,10 +475,10 @@ func TestLookupResult(t *testing.T) { nodeCache.updateResult( test.podName, test.predicateKey, + test.predicateID, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - test.cache, node, ) // if we want to do invalid, invalid the cached item @@ -511,6 +491,7 @@ func TestLookupResult(t *testing.T) { result, ok := nodeCache.lookupResult(test.podName, test.nodeName, test.predicateKey, + test.predicateID, test.equivalenceHashForCalPredicate, ) fit, reasons := result.Fit, result.FailReasons @@ -659,6 +640,8 @@ func TestGetEquivalenceHash(t *testing.T) { func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { testPredicate := "GeneralPredicates" + testPredicateID := 0 + predicatesOrdering := []string{testPredicate} // tests is used to initialize all nodes tests := []struct { name string @@ -666,7 +649,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { nodeName string equivalenceHashForUpdatePredicate uint64 cachedItem predicateItemType - cache schedulercache.Cache }{ { name: "hash predicate 123 not fits host ports", @@ -679,7 +661,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { predicates.ErrPodNotFitsHostPorts, }, }, - cache: &upToDateCache{}, }, { name: "hash predicate 456 not fits host ports", @@ -692,7 +673,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { predicates.ErrPodNotFitsHostPorts, }, }, - cache: &upToDateCache{}, }, { name: "hash predicate 123 fits", @@ -702,10 +682,9 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { cachedItem: predicateItemType{ fit: true, }, - cache: &upToDateCache{}, }, } - ecache := NewCache() + ecache := NewCache(predicatesOrdering) for _, test := range tests { node := schedulercache.NewNodeInfo() @@ -717,10 +696,10 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { nodeCache.updateResult( test.podName, testPredicate, + testPredicateID, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - test.cache, node, ) } @@ -732,7 +711,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist { - if _, exist := nodeCache.cache[testPredicate]; exist { + if cache := nodeCache.cache[testPredicateID]; cache != nil { t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", testPredicate, test.nodeName) } @@ -743,6 +722,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { testPredicate := "GeneralPredicates" + testPredicateID := 0 + predicatesOrdering := []string{testPredicate} // tests is used to initialize all nodes tests := []struct { name string @@ -750,7 +731,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { nodeName string equivalenceHashForUpdatePredicate uint64 cachedItem predicateItemType - cache schedulercache.Cache }{ { name: "hash predicate 123 not fits host ports", @@ -761,7 +741,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - cache: &upToDateCache{}, }, { name: "hash predicate 456 not fits host ports", @@ -772,7 +751,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { fit: false, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, }, - cache: &upToDateCache{}, }, { name: "hash predicate 123 fits host ports", @@ -782,10 +760,9 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { cachedItem: predicateItemType{ fit: true, }, - cache: &upToDateCache{}, }, } - ecache := NewCache() + ecache := NewCache(predicatesOrdering) for _, test := range tests { node := schedulercache.NewNodeInfo() @@ -797,19 +774,21 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { nodeCache.updateResult( test.podName, testPredicate, + testPredicateID, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate, - test.cache, node, ) } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + oldNodeCache, _ := ecache.GetNodeCache(test.nodeName) + oldGeneration := oldNodeCache.generation // invalidate cached predicate for all nodes ecache.InvalidateAllPredicatesOnNode(test.nodeName) - if _, ok := ecache.GetNodeCache(test.nodeName); ok { + if n, _ := ecache.GetNodeCache(test.nodeName); oldGeneration == n.generation { t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) } }) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 1ab9e576a3..c48a5abea9 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -111,6 +111,25 @@ type genericScheduler struct { percentageOfNodesToScore int32 } +// snapshot snapshots equivalane cache and node infos for all fit and priority +// functions. +func (g *genericScheduler) snapshot() error { + // IMPORTANT NOTE: We must snapshot equivalence cache before snapshotting + // scheduler cache, otherwise stale data may be written into equivalence + // cache, e.g. + // 1. snapshot cache + // 2. event arrives, updating cache and invalidating predicates or whole node cache + // 3. snapshot ecache + // 4. evaludate predicates + // 5. stale result will be written to ecache + if g.equivalenceCache != nil { + g.equivalenceCache.Snapshot() + } + + // Used for all fit and priority funcs. + return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) +} + // Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. @@ -130,8 +149,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister return "", ErrNoNodesAvailable } - // Used for all fit and priority funcs. - err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) + err = g.snapshot() if err != nil { return "", err } @@ -227,7 +245,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if !ok || fitError == nil { return nil, nil, nil, nil } - err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) + err := g.snapshot() if err != nil { return nil, nil, nil, err } @@ -396,14 +414,13 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v var nodeCache *equivalence.NodeCache nodeName := g.cache.NodeTree().Next() if g.equivalenceCache != nil { - nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName) + nodeCache = g.equivalenceCache.LoadNodeCache(nodeName) } fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, - g.cache, nodeCache, g.schedulingQueue, g.alwaysCheckAllPredicates, @@ -516,7 +533,6 @@ func podFitsOnNode( meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, - cache schedulercache.Cache, nodeCache *equivalence.NodeCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, @@ -558,7 +574,7 @@ func podFitsOnNode( // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded - for _, predicateKey := range predicates.Ordering() { + for predicateID, predicateKey := range predicates.Ordering() { var ( fit bool reasons []algorithm.PredicateFailureReason @@ -567,7 +583,7 @@ func podFitsOnNode( //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { if eCacheAvailable { - fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) + fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass) } else { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) } @@ -991,7 +1007,7 @@ func selectVictimsOnNode( // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. - if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits { + if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } @@ -1005,7 +1021,7 @@ func selectVictimsOnNode( violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) reprievePod := func(p *v1.Pod) bool { addPod(p) - fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil) + fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil) if !fits { removePod(p) victims = append(victims, p) diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index cd7827b487..e050a85936 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -1474,7 +1474,10 @@ func TestCacheInvalidationRace(t *testing.T) { cacheInvalidated: make(chan struct{}), } - eCache := equivalence.NewCache() + ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + eCache := equivalence.NewCache(algorithmpredicates.Ordering()) + eCache.GetNodeCache(testNode.Name) // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before // the equivalence cache would be updated. go func() { @@ -1490,8 +1493,6 @@ func TestCacheInvalidationRace(t *testing.T) { }() // Set up the scheduler. - ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} - algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) scheduler := NewGenericScheduler( @@ -1522,3 +1523,84 @@ func TestCacheInvalidationRace(t *testing.T) { t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) } } + +// TestCacheInvalidationRace2 tests that cache invalidation is correctly handled +// when an invalidation event happens while a predicate is running. +func TestCacheInvalidationRace2(t *testing.T) { + // Create a predicate that returns false the first time and true on subsequent calls. + var ( + podWillFit = false + callCount int + cycleStart = make(chan struct{}) + cacheInvalidated = make(chan struct{}) + once sync.Once + ) + testPredicate := func(pod *v1.Pod, + meta algorithm.PredicateMetadata, + nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { + callCount++ + once.Do(func() { + cycleStart <- struct{}{} + <-cacheInvalidated + }) + if !podWillFit { + podWillFit = true + return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil + } + return true, nil, nil + } + + // Set up the mock cache. + cache := schedulercache.New(time.Duration(0), wait.NeverStop) + testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}} + cache.AddNode(testNode) + + ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate} + algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"}) + eCache := equivalence.NewCache(algorithmpredicates.Ordering()) + eCache.GetNodeCache(testNode.Name) + // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before + // the equivalence cache would be updated. + go func() { + <-cycleStart + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"}, + Spec: v1.PodSpec{NodeName: "machine1"}} + if err := cache.AddPod(pod); err != nil { + t.Errorf("Could not add pod to cache: %v", err) + } + eCache.InvalidateAllPredicatesOnNode("machine1") + cacheInvalidated <- struct{}{} + }() + + // Set up the scheduler. + prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} + pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) + scheduler := NewGenericScheduler( + cache, + eCache, + NewSchedulingQueue(), + ps, + algorithm.EmptyPredicateMetadataProducer, + prioritizers, + algorithm.EmptyPriorityMetadataProducer, + nil, nil, pvcLister, true, false, + schedulerapi.DefaultPercentageOfNodesToScore) + + // First scheduling attempt should fail. + nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"})) + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}} + machine, err := scheduler.Schedule(pod, nodeLister) + if machine != "" || err == nil { + t.Error("First scheduling attempt did not fail") + } + + // Second scheduling attempt should succeed because cache was invalidated. + _, err = scheduler.Schedule(pod, nodeLister) + if err != nil { + t.Errorf("Second scheduling attempt failed: %v", err) + } + if callCount != 2 { + t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) + } +} diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index e72c1cfe6f..d3b9a916a9 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -849,15 +849,17 @@ func (c *configFactory) addNodeToCache(obj interface{}) { return } - if err := c.schedulerCache.AddNode(node); err != nil { - glog.Errorf("scheduler cache AddNode failed: %v", err) - } - + // NOTE: Because the scheduler uses equivalence cache for nodes, we need + // to create it before adding node into scheduler cache. if c.enableEquivalenceClassCache { // GetNodeCache() will lazily create NodeCache for given node if it does not exist. c.equivalencePodCache.GetNodeCache(node.GetName()) } + if err := c.schedulerCache.AddNode(node); err != nil { + glog.Errorf("scheduler cache AddNode failed: %v", err) + } + c.podQueue.MoveAllToActiveQueue() // NOTE: add a new node does not affect existing predicates in equivalence cache } @@ -1166,7 +1168,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, // Init equivalence class cache if c.enableEquivalenceClassCache { - c.equivalencePodCache = equivalence.NewCache() + c.equivalencePodCache = equivalence.NewCache(predicates.Ordering()) glog.Info("Created equivalence class cache") } diff --git a/pkg/scheduler/testing/fake_cache.go b/pkg/scheduler/testing/fake_cache.go index f03a491cf2..b9f2bdf61a 100644 --- a/pkg/scheduler/testing/fake_cache.go +++ b/pkg/scheduler/testing/fake_cache.go @@ -106,8 +106,5 @@ func (f *FakeCache) Snapshot() *schedulercache.Snapshot { return &schedulercache.Snapshot{} } -// IsUpToDate is a fake method for testing -func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true } - // NodeTree is a fake method for testing. func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil }