From e0ad2718913d8cf24e183ecb7d4f329b82795798 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 12 Feb 2019 11:07:29 -0800 Subject: [PATCH 1/2] Optimize scheduler cache snapshotting using an MRU structure. --- pkg/scheduler/core/generic_scheduler.go | 26 +-- pkg/scheduler/core/generic_scheduler_test.go | 2 +- pkg/scheduler/internal/cache/cache.go | 189 ++++++++++++++---- pkg/scheduler/internal/cache/cache_test.go | 2 +- pkg/scheduler/internal/cache/fake/BUILD | 1 - .../internal/cache/fake/fake_cache.go | 5 +- pkg/scheduler/internal/cache/interface.go | 12 +- 7 files changed, 178 insertions(+), 59 deletions(-) diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 59a4d40179..d11ee4f915 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -141,7 +141,7 @@ type genericScheduler struct { extenders []algorithm.SchedulerExtender lastNodeIndex uint64 alwaysCheckAllPredicates bool - cachedNodeInfoMap map[string]*schedulernodeinfo.NodeInfo + nodeInfoSnapshot schedulerinternalcache.NodeInfoSnapshot volumeBinder *volumebinder.VolumeBinder pvcLister corelisters.PersistentVolumeClaimLister pdbLister algorithm.PDBLister @@ -153,7 +153,7 @@ type genericScheduler struct { // functions. func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. - return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) + return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot) } // Schedule tries to schedule the given pod to one of the nodes in the node list. @@ -210,8 +210,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister }, nil } - metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) - priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) + metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) + priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return result, err } @@ -290,7 +290,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if !ok || fitError == nil { return nil, nil, nil, nil } - if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) { + if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } @@ -311,7 +311,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if err != nil { return nil, nil, nil, err } - nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, + nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err @@ -335,7 +335,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) - if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { + if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } @@ -355,7 +355,7 @@ func (g *genericScheduler) processPreemptionWithExtenders( newNodeToVictims, err := extender.ProcessPreemption( pod, nodeToVictims, - g.cachedNodeInfoMap, + g.nodeInfoSnapshot.NodeInfoMap, ) if err != nil { if extender.IsIgnorable() { @@ -452,14 +452,14 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v ctx, cancel := context.WithCancel(context.Background()) // We can use the same metadata producer for all nodes. - meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) + meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap) checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() fits, failedPredicates, err := podFitsOnNode( pod, meta, - g.cachedNodeInfoMap[nodeName], + g.nodeInfoSnapshot.NodeInfoMap[nodeName], g.predicates, g.schedulingQueue, g.alwaysCheckAllPredicates, @@ -476,7 +476,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v cancel() atomic.AddInt32(&filteredLen, -1) } else { - filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() + filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node() } } else { predicateResultLock.Lock() @@ -500,7 +500,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v if !extender.IsInterested(pod) { continue } - filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap) + filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap) if err != nil { if extender.IsIgnorable() { klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", @@ -1193,7 +1193,7 @@ func NewGenericScheduler( priorityMetaProducer: priorityMetaProducer, pluginSet: pluginSet, extenders: extenders, - cachedNodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), + nodeInfoSnapshot: schedulerinternalcache.NewNodeInfoSnapshot(), volumeBinder: volumeBinder, pvcLister: pvcLister, pdbLister: pdbLister, diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 73e3ce75fd..2efdfe2edd 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -513,7 +513,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes emptyPluginSet, nil, nil, nil, nil, false, false, schedulerapi.DefaultPercentageOfNodesToScore) - cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap) + cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot) return s.(*genericScheduler) } diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 3245378869..790a3bcc6e 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -47,6 +47,15 @@ func New(ttl time.Duration, stop <-chan struct{}) Cache { return cache } +// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly +// linked list. When a NodeInfo is updated, it goes to the head of the list. +// The items closer to the head are the most recently updated items. +type nodeInfoListItem struct { + info *schedulernodeinfo.NodeInfo + next *nodeInfoListItem + prev *nodeInfoListItem +} + type schedulerCache struct { stop <-chan struct{} ttl time.Duration @@ -59,8 +68,11 @@ type schedulerCache struct { assumedPods map[string]bool // a map from pod key to podState. podStates map[string]*podState - nodes map[string]*schedulernodeinfo.NodeInfo - nodeTree *NodeTree + nodes map[string]*nodeInfoListItem + // headNode points to the most recently updated NodeInfo in "nodes". It is the + // head of the linked list. + headNode *nodeInfoListItem + nodeTree *NodeTree // A map from image name to its imageState. imageStates map[string]*imageState } @@ -94,7 +106,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul period: period, stop: stop, - nodes: make(map[string]*schedulernodeinfo.NodeInfo), + nodes: make(map[string]*nodeInfoListItem), nodeTree: newNodeTree(nil), assumedPods: make(map[string]bool), podStates: make(map[string]*podState), @@ -102,15 +114,82 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul } } -// Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact, -// and should be only used in non-critical path. +// newNodeInfoListItem initializes a new nodeInfoListItem. +func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem { + return &nodeInfoListItem{ + info: ni, + } +} + +// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it. +func NewNodeInfoSnapshot() NodeInfoSnapshot { + return NodeInfoSnapshot{ + NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo), + } +} + +// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly +// linked list. The head is the most recently updated NodeInfo. +// We assume cache lock is already acquired. +func (cache *schedulerCache) moveNodeInfoToHead(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.Errorf("No NodeInfo with name %v found in the cache", name) + return + } + // if the node info list item is already at the head, we are done. + if ni == cache.headNode { + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + if cache.headNode != nil { + cache.headNode.prev = ni + } + ni.next = cache.headNode + ni.prev = nil + cache.headNode = ni +} + +// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly +// linked list. +// We assume cache lock is already acquired. +func (cache *schedulerCache) removeNodeInfoFromList(name string) { + ni, ok := cache.nodes[name] + if !ok { + klog.Errorf("No NodeInfo with name %v found in the cache", name) + return + } + + if ni.prev != nil { + ni.prev.next = ni.next + } + if ni.next != nil { + ni.next.prev = ni.prev + } + // if the removed item was at the head, we must update the head. + if ni == cache.headNode { + cache.headNode = ni.next + } + delete(cache.nodes, name) +} + +// Snapshot takes a snapshot of the current scheduler cache. This is used for +// debugging purposes only and shouldn't be confused with UpdateNodeInfoSnapshot +// function. +// This method is expensive, and should be only used in non-critical path. func (cache *schedulerCache) Snapshot() *Snapshot { cache.mu.RLock() defer cache.mu.RUnlock() nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes)) for k, v := range cache.nodes { - nodes[k] = v.Clone() + nodes[k] = v.info.Clone() } assumedPods := make(map[string]bool, len(cache.assumedPods)) @@ -124,22 +203,43 @@ func (cache *schedulerCache) Snapshot() *Snapshot { } } -func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) error { +// UpdateNodeInfoSnapshot takes a snapshot of cached NodeInfo map. This is called at +// beginning of every scheduling cycle. +// This function tracks generation number of NodeInfo and updates only the +// entries of an existing snapshot that have changed after the snapshot was taken. +func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error { cache.mu.Lock() defer cache.mu.Unlock() + balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) - for name, info := range cache.nodes { - if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil { - // Transient scheduler info is reset here. - info.TransientInfo.ResetTransientSchedulerInfo() + // Get the last generation of the the snapshot. + snapshotGeneration := nodeSnapshot.Generation + + // Start from the head of the NodeInfo doubly linked list and update snapshot + // of NodeInfos updated after the last snapshot. + for node := cache.headNode; node != nil; node = node.next { + if node.info.GetGeneration() <= snapshotGeneration { + // all the nodes are updated before the existing snapshot. We are done. + break } - if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() { - nodeNameToInfo[name] = info.Clone() + if balancedVolumesEnabled && node.info.TransientInfo != nil { + // Transient scheduler info is reset here. + node.info.TransientInfo.ResetTransientSchedulerInfo() + } + if np := node.info.Node(); np != nil { + nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } - for name := range nodeNameToInfo { - if _, ok := cache.nodes[name]; !ok { - delete(nodeNameToInfo, name) + // Update the snapshot generation with the latest NodeInfo generation. + if cache.headNode != nil { + nodeSnapshot.Generation = cache.headNode.info.GetGeneration() + } + + if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { + for name := range nodeSnapshot.NodeInfoMap { + if _, ok := cache.nodes[name]; !ok { + delete(nodeSnapshot.NodeInfoMap, name) + } } } return nil @@ -157,12 +257,12 @@ func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selecto // can avoid expensive array growth without wasting too much memory by // pre-allocating capacity. maxSize := 0 - for _, info := range cache.nodes { - maxSize += len(info.Pods()) + for _, n := range cache.nodes { + maxSize += len(n.info.Pods()) } pods := make([]*v1.Pod, 0, maxSize) - for _, info := range cache.nodes { - for _, pod := range info.Pods() { + for _, n := range cache.nodes { + for _, pod := range n.info.Pods() { if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, pod) } @@ -249,10 +349,11 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { func (cache *schedulerCache) addPod(pod *v1.Pod) { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[pod.Spec.NodeName] = n } - n.AddPod(pod) + n.info.AddPod(pod) + cache.moveNodeInfoToHead(pod.Spec.NodeName) } // Assumes that lock is already acquired. @@ -266,12 +367,17 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { // Assumes that lock is already acquired. func (cache *schedulerCache) removePod(pod *v1.Pod) error { - n := cache.nodes[pod.Spec.NodeName] - if err := n.RemovePod(pod); err != nil { + n, ok := cache.nodes[pod.Spec.NodeName] + if !ok { + return fmt.Errorf("node %v is not found", pod.Spec.NodeName) + } + if err := n.info.RemovePod(pod); err != nil { return err } - if len(n.Pods()) == 0 && n.Node() == nil { - delete(cache.nodes, pod.Spec.NodeName) + if len(n.info.Pods()) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(pod.Spec.NodeName) + } else { + cache.moveNodeInfoToHead(pod.Spec.NodeName) } return nil } @@ -407,15 +513,16 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error { n, ok := cache.nodes[node.Name] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[node.Name] = n } else { - cache.removeNodeImageStates(n.Node()) + cache.removeNodeImageStates(n.info.Node()) } + cache.moveNodeInfoToHead(node.Name) cache.nodeTree.AddNode(node) - cache.addNodeImageStates(node, n) - return n.SetNode(node) + cache.addNodeImageStates(node, n.info) + return n.info.SetNode(node) } func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { @@ -424,31 +531,37 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error { n, ok := cache.nodes[newNode.Name] if !ok { - n = schedulernodeinfo.NewNodeInfo() + n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo()) cache.nodes[newNode.Name] = n } else { - cache.removeNodeImageStates(n.Node()) + cache.removeNodeImageStates(n.info.Node()) } + cache.moveNodeInfoToHead(newNode.Name) cache.nodeTree.UpdateNode(oldNode, newNode) - cache.addNodeImageStates(newNode, n) - return n.SetNode(newNode) + cache.addNodeImageStates(newNode, n.info) + return n.info.SetNode(newNode) } func (cache *schedulerCache) RemoveNode(node *v1.Node) error { cache.mu.Lock() defer cache.mu.Unlock() - n := cache.nodes[node.Name] - if err := n.RemoveNode(node); err != nil { + n, ok := cache.nodes[node.Name] + if !ok { + return fmt.Errorf("node %v is not found", node.Name) + } + if err := n.info.RemoveNode(node); err != nil { return err } // We remove NodeInfo for this node only if there aren't any pods on this node. // We can't do it unconditionally, because notifications about pods are delivered // in a different watch, and thus can potentially be observed later, even though // they happened before node removal. - if len(n.Pods()) == 0 && n.Node() == nil { - delete(cache.nodes, node.Name) + if len(n.info.Pods()) == 0 && n.info.Node() == nil { + cache.removeNodeInfoFromList(node.Name) + } else { + cache.moveNodeInfoToHead(node.Name) } cache.nodeTree.RemoveNode(node) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index e7f15291c6..e3eb74163e 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -1132,7 +1132,7 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.UpdateNodeNameToInfoMap(cachedNodes) + cache.SnapshotNodeInfo(cachedNodes) } } diff --git a/pkg/scheduler/internal/cache/fake/BUILD b/pkg/scheduler/internal/cache/fake/BUILD index 06c1d501a8..caed79bb71 100644 --- a/pkg/scheduler/internal/cache/fake/BUILD +++ b/pkg/scheduler/internal/cache/fake/BUILD @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", - "//pkg/scheduler/nodeinfo:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", ], diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index 05873d35cb..fe814d8766 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) // Cache is used for testing @@ -75,8 +74,8 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil } // RemoveNode is a fake method for testing. func (c *Cache) RemoveNode(node *v1.Node) error { return nil } -// UpdateNodeNameToInfoMap is a fake method for testing. -func (c *Cache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error { +// UpdateNodeInfoSnapshot is a fake method for testing. +func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulerinternalcache.NodeInfoSnapshot) error { return nil } diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index 204659d862..699818b1e6 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -95,10 +95,10 @@ type Cache interface { // RemoveNode removes overall information about node. RemoveNode(node *v1.Node) error - // UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache. + // UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache. // The node info contains aggregated information of pods scheduled (including assumed to be) // on this node. - UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error + UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error // List lists all cached pods (including assumed ones). List(labels.Selector) ([]*v1.Pod, error) @@ -118,3 +118,11 @@ type Snapshot struct { AssumedPods map[string]bool Nodes map[string]*schedulernodeinfo.NodeInfo } + +// NodeInfoSnapshot is a snapshot of cache NodeInfo. The scheduler takes a +// snapshot at the beginning of each scheduling cycle and uses it for its +// operations in that cycle. +type NodeInfoSnapshot struct { + NodeInfoMap map[string]*schedulernodeinfo.NodeInfo + Generation int64 +} From 337cb7036cc260264ebc8e4ad2925040de75ba65 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 12 Feb 2019 17:15:29 -0800 Subject: [PATCH 2/2] Add tests for the new cache snapshotting mechanism. --- pkg/scheduler/internal/cache/cache_test.go | 307 +++++++++++++++++++-- 1 file changed, 283 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index e3eb74163e..1964b00e1b 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -35,16 +35,19 @@ import ( schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *schedulernodeinfo.NodeInfo) { +func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) { + if (actual == nil) != (expected == nil) { + t.Error("One of the actual or expeted is nil and the other is not!") + } // Ignore generation field. if actual != nil { - actual.SetGeneration(0) + actual.info.SetGeneration(0) } if expected != nil { expected.SetGeneration(0) } - if !reflect.DeepEqual(actual, expected) { - t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected) + if actual != nil && !reflect.DeepEqual(actual.info, expected) { + t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected) } } @@ -372,21 +375,27 @@ func TestSnapshot(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { - t.Fatalf("assumePod failed: %v", err) + t.Errorf("assumePod failed: %v", err) } } for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + t.Errorf("AddPod failed: %v", err) } } snapshot := cache.Snapshot() - if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) { - t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes) + if len(snapshot.Nodes) != len(cache.nodes) { + t.Errorf("Unequal number of nodes in the cache and its snapshot. expeted: %v, got: %v", len(cache.nodes), len(snapshot.Nodes)) + } + for name, ni := range snapshot.Nodes { + nItem := cache.nodes[name] + if !reflect.DeepEqual(ni, nItem.info) { + t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni) + } } if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { - t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) + t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) } } } @@ -765,7 +774,7 @@ func TestEphemeralStorageResource(t *testing.T) { n = cache.nodes[nodeName] if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -810,7 +819,7 @@ func TestRemovePod(t *testing.T) { n = cache.nodes[nodeName] if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -864,7 +873,7 @@ func TestForgetPod(t *testing.T) { } cache.cleanupAssumedPods(now.Add(2 * ttl)) if n := cache.nodes[nodeName]; n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -1062,16 +1071,16 @@ func TestNodeOperators(t *testing.T) { } // Generations are globally unique. We check in our unit tests that they are incremented correctly. - expected.SetGeneration(got.GetGeneration()) - if !reflect.DeepEqual(got, expected) { + expected.SetGeneration(got.info.GetGeneration()) + if !reflect.DeepEqual(got.info, expected) { t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) } // Case 2: dump cached nodes successfully. - cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.UpdateNodeNameToInfoMap(cachedNodes) - newNode, found := cachedNodes[node.Name] - if !found || len(cachedNodes) != 1 { + cachedNodes := NewNodeInfoSnapshot() + cache.UpdateNodeInfoSnapshot(&cachedNodes) + newNode, found := cachedNodes.NodeInfoMap[node.Name] + if !found || len(cachedNodes.NodeInfoMap) != 1 { t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) } expected.SetGeneration(newNode.GetGeneration()) @@ -1091,12 +1100,12 @@ func TestNodeOperators(t *testing.T) { if !found { t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name) } - if got.GetGeneration() <= expected.GetGeneration() { - t.Errorf("Generation is not incremented. got: %v, expected: %v", got.GetGeneration(), expected.GetGeneration()) + if got.info.GetGeneration() <= expected.GetGeneration() { + t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration()) } - expected.SetGeneration(got.GetGeneration()) + expected.SetGeneration(got.info.GetGeneration()) - if !reflect.DeepEqual(got, expected) { + if !reflect.DeepEqual(got.info, expected) { t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) } // Check nodeTree after update @@ -1117,6 +1126,256 @@ func TestNodeOperators(t *testing.T) { } } +// TestSchedulerCache_UpdateNodeInfoSnapshot tests UpdateNodeInfoSnapshot function of cache. +func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { + // Create a few nodes to be used in tests. + nodes := []*v1.Node{} + for i := 0; i < 10; i++ { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-node%v", i), + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("100m"), + }, + }, + } + nodes = append(nodes, node) + } + // Create a few nodes as updated versions of the above nodes + updatedNodes := []*v1.Node{} + for _, n := range nodes { + updatedNode := n.DeepCopy() + updatedNode.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("500m"), + } + updatedNodes = append(updatedNodes, updatedNode) + } + + // Create a few pods for tests. + pods := []*v1.Pod{} + for i := 0; i < 10; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod%v", i), + Namespace: "test-ns", + UID: types.UID(fmt.Sprintf("test-puid%v", i)), + }, + Spec: v1.PodSpec{ + NodeName: fmt.Sprintf("test-node%v", i), + }, + } + pods = append(pods, pod) + } + // Create a few pods as updated versions of the above pods. + updatedPods := []*v1.Pod{} + for _, p := range pods { + updatedPod := p.DeepCopy() + priority := int32(1000) + updatedPod.Spec.Priority = &priority + updatedPods = append(updatedPods, updatedPod) + } + + var cache *schedulerCache + var snapshot NodeInfoSnapshot + type operation = func() + + addNode := func(i int) operation { + return func() { + cache.AddNode(nodes[i]) + } + } + removeNode := func(i int) operation { + return func() { + cache.RemoveNode(nodes[i]) + } + } + updateNode := func(i int) operation { + return func() { + cache.UpdateNode(nodes[i], updatedNodes[i]) + } + } + addPod := func(i int) operation { + return func() { + cache.AddPod(pods[i]) + } + } + removePod := func(i int) operation { + return func() { + cache.RemovePod(pods[i]) + } + } + updatePod := func(i int) operation { + return func() { + cache.UpdatePod(pods[i], updatedPods[i]) + } + } + updateSnapshot := func() operation { + return func() { + cache.UpdateNodeInfoSnapshot(&snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + t.Error(err) + } + } + } + + tests := []struct { + name string + operations []operation + expected []*v1.Node + }{ + { + name: "Empty cache", + operations: []operation{}, + expected: []*v1.Node{}, + }, + { + name: "Single node", + operations: []operation{addNode(1)}, + expected: []*v1.Node{nodes[1]}, + }, + { + name: "Add node, remove it, add it again", + operations: []operation{ + addNode(1), updateSnapshot(), removeNode(1), addNode(1), + }, + expected: []*v1.Node{nodes[1]}, + }, + { + name: "Add a few nodes, and snapshot in the middle", + operations: []operation{ + addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2), + updateSnapshot(), addNode(3), + }, + expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + }, + { + name: "Add a few nodes, and snapshot in the end", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), + }, + expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + }, + { + name: "Remove non-existing node", + operations: []operation{ + addNode(0), addNode(1), updateSnapshot(), removeNode(8), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + }, + { + name: "Update some nodes", + operations: []operation{ + addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + }, + { + name: "Add a few nodes, and remove all of them", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(0), removeNode(2), removeNode(5), removeNode(6), + }, + expected: []*v1.Node{}, + }, + { + name: "Add a few nodes, and remove some of them", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(0), removeNode(6), + }, + expected: []*v1.Node{nodes[5], nodes[2]}, + }, + { + name: "Add a few nodes, remove all of them, and add more", + operations: []operation{ + addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(2), removeNode(5), removeNode(6), updateSnapshot(), + addNode(7), addNode(9), + }, + expected: []*v1.Node{nodes[9], nodes[7]}, + }, + { + name: "Update nodes in particular order", + operations: []operation{ + addNode(8), updateNode(2), updateNode(8), updateSnapshot(), + addNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + }, + { + name: "Add some nodes and some pods", + operations: []operation{ + addNode(0), addNode(2), addNode(8), updateSnapshot(), + addPod(8), addPod(2), + }, + expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + }, + { + name: "Updating a pod moves its node to the head", + operations: []operation{ + addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0), + }, + expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + }, + { + name: "Remove pod from non-existing node", + operations: []operation{ + addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3), + }, + expected: []*v1.Node{nodes[2], nodes[0]}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cache = newSchedulerCache(time.Second, time.Second, nil) + snapshot = NewNodeInfoSnapshot() + + for _, op := range test.operations { + op() + } + + if len(test.expected) != len(cache.nodes) { + t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes)) + } + var i int + // Check that cache is in the expected state. + for node := cache.headNode; node != nil; node = node.next { + if node.info.Node().Name != test.expected[i].Name { + t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i) + } + i++ + } + // Make sure we visited all the cached nodes in the above for loop. + if i != len(cache.nodes) { + t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) + } + + // Always update the snapshot at the end of operations and compare it. + cache.UpdateNodeInfoSnapshot(&snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + t.Error(err) + } + }) + } +} + +func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error { + if len(snapshot.NodeInfoMap) != len(cache.nodes) { + return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) + } + for name, ni := range cache.nodes { + if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) { + return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name]) + } + } + return nil +} + func BenchmarkList1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() @@ -1131,8 +1390,8 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() for n := 0; n < b.N; n++ { - cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.SnapshotNodeInfo(cachedNodes) + cachedNodes := NewNodeInfoSnapshot() + cache.UpdateNodeInfoSnapshot(&cachedNodes) } }