From 337cb7036cc260264ebc8e4ad2925040de75ba65 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 12 Feb 2019 17:15:29 -0800 Subject: [PATCH] Add tests for the new cache snapshotting mechanism. --- pkg/scheduler/internal/cache/cache_test.go | 307 +++++++++++++++++++-- 1 file changed, 283 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/internal/cache/cache_test.go b/pkg/scheduler/internal/cache/cache_test.go index e3eb74163e..1964b00e1b 100644 --- a/pkg/scheduler/internal/cache/cache_test.go +++ b/pkg/scheduler/internal/cache/cache_test.go @@ -35,16 +35,19 @@ import ( schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *schedulernodeinfo.NodeInfo) { +func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) { + if (actual == nil) != (expected == nil) { + t.Error("One of the actual or expeted is nil and the other is not!") + } // Ignore generation field. if actual != nil { - actual.SetGeneration(0) + actual.info.SetGeneration(0) } if expected != nil { expected.SetGeneration(0) } - if !reflect.DeepEqual(actual, expected) { - t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected) + if actual != nil && !reflect.DeepEqual(actual.info, expected) { + t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected) } } @@ -372,21 +375,27 @@ func TestSnapshot(t *testing.T) { cache := newSchedulerCache(ttl, time.Second, nil) for _, podToAssume := range tt.podsToAssume { if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { - t.Fatalf("assumePod failed: %v", err) + t.Errorf("assumePod failed: %v", err) } } for _, podToAdd := range tt.podsToAdd { if err := cache.AddPod(podToAdd); err != nil { - t.Fatalf("AddPod failed: %v", err) + t.Errorf("AddPod failed: %v", err) } } snapshot := cache.Snapshot() - if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) { - t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes) + if len(snapshot.Nodes) != len(cache.nodes) { + t.Errorf("Unequal number of nodes in the cache and its snapshot. expeted: %v, got: %v", len(cache.nodes), len(snapshot.Nodes)) + } + for name, ni := range snapshot.Nodes { + nItem := cache.nodes[name] + if !reflect.DeepEqual(ni, nItem.info) { + t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni) + } } if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { - t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) + t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) } } } @@ -765,7 +774,7 @@ func TestEphemeralStorageResource(t *testing.T) { n = cache.nodes[nodeName] if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -810,7 +819,7 @@ func TestRemovePod(t *testing.T) { n = cache.nodes[nodeName] if n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -864,7 +873,7 @@ func TestForgetPod(t *testing.T) { } cache.cleanupAssumedPods(now.Add(2 * ttl)) if n := cache.nodes[nodeName]; n != nil { - t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n) + t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info) } } } @@ -1062,16 +1071,16 @@ func TestNodeOperators(t *testing.T) { } // Generations are globally unique. We check in our unit tests that they are incremented correctly. - expected.SetGeneration(got.GetGeneration()) - if !reflect.DeepEqual(got, expected) { + expected.SetGeneration(got.info.GetGeneration()) + if !reflect.DeepEqual(got.info, expected) { t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) } // Case 2: dump cached nodes successfully. - cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.UpdateNodeNameToInfoMap(cachedNodes) - newNode, found := cachedNodes[node.Name] - if !found || len(cachedNodes) != 1 { + cachedNodes := NewNodeInfoSnapshot() + cache.UpdateNodeInfoSnapshot(&cachedNodes) + newNode, found := cachedNodes.NodeInfoMap[node.Name] + if !found || len(cachedNodes.NodeInfoMap) != 1 { t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) } expected.SetGeneration(newNode.GetGeneration()) @@ -1091,12 +1100,12 @@ func TestNodeOperators(t *testing.T) { if !found { t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name) } - if got.GetGeneration() <= expected.GetGeneration() { - t.Errorf("Generation is not incremented. got: %v, expected: %v", got.GetGeneration(), expected.GetGeneration()) + if got.info.GetGeneration() <= expected.GetGeneration() { + t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration()) } - expected.SetGeneration(got.GetGeneration()) + expected.SetGeneration(got.info.GetGeneration()) - if !reflect.DeepEqual(got, expected) { + if !reflect.DeepEqual(got.info, expected) { t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected) } // Check nodeTree after update @@ -1117,6 +1126,256 @@ func TestNodeOperators(t *testing.T) { } } +// TestSchedulerCache_UpdateNodeInfoSnapshot tests UpdateNodeInfoSnapshot function of cache. +func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) { + // Create a few nodes to be used in tests. + nodes := []*v1.Node{} + for i := 0; i < 10; i++ { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-node%v", i), + }, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("100m"), + }, + }, + } + nodes = append(nodes, node) + } + // Create a few nodes as updated versions of the above nodes + updatedNodes := []*v1.Node{} + for _, n := range nodes { + updatedNode := n.DeepCopy() + updatedNode.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("500m"), + } + updatedNodes = append(updatedNodes, updatedNode) + } + + // Create a few pods for tests. + pods := []*v1.Pod{} + for i := 0; i < 10; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pod%v", i), + Namespace: "test-ns", + UID: types.UID(fmt.Sprintf("test-puid%v", i)), + }, + Spec: v1.PodSpec{ + NodeName: fmt.Sprintf("test-node%v", i), + }, + } + pods = append(pods, pod) + } + // Create a few pods as updated versions of the above pods. + updatedPods := []*v1.Pod{} + for _, p := range pods { + updatedPod := p.DeepCopy() + priority := int32(1000) + updatedPod.Spec.Priority = &priority + updatedPods = append(updatedPods, updatedPod) + } + + var cache *schedulerCache + var snapshot NodeInfoSnapshot + type operation = func() + + addNode := func(i int) operation { + return func() { + cache.AddNode(nodes[i]) + } + } + removeNode := func(i int) operation { + return func() { + cache.RemoveNode(nodes[i]) + } + } + updateNode := func(i int) operation { + return func() { + cache.UpdateNode(nodes[i], updatedNodes[i]) + } + } + addPod := func(i int) operation { + return func() { + cache.AddPod(pods[i]) + } + } + removePod := func(i int) operation { + return func() { + cache.RemovePod(pods[i]) + } + } + updatePod := func(i int) operation { + return func() { + cache.UpdatePod(pods[i], updatedPods[i]) + } + } + updateSnapshot := func() operation { + return func() { + cache.UpdateNodeInfoSnapshot(&snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + t.Error(err) + } + } + } + + tests := []struct { + name string + operations []operation + expected []*v1.Node + }{ + { + name: "Empty cache", + operations: []operation{}, + expected: []*v1.Node{}, + }, + { + name: "Single node", + operations: []operation{addNode(1)}, + expected: []*v1.Node{nodes[1]}, + }, + { + name: "Add node, remove it, add it again", + operations: []operation{ + addNode(1), updateSnapshot(), removeNode(1), addNode(1), + }, + expected: []*v1.Node{nodes[1]}, + }, + { + name: "Add a few nodes, and snapshot in the middle", + operations: []operation{ + addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2), + updateSnapshot(), addNode(3), + }, + expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, + }, + { + name: "Add a few nodes, and snapshot in the end", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), + }, + expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, + }, + { + name: "Remove non-existing node", + operations: []operation{ + addNode(0), addNode(1), updateSnapshot(), removeNode(8), + }, + expected: []*v1.Node{nodes[1], nodes[0]}, + }, + { + name: "Update some nodes", + operations: []operation{ + addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, + }, + { + name: "Add a few nodes, and remove all of them", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(0), removeNode(2), removeNode(5), removeNode(6), + }, + expected: []*v1.Node{}, + }, + { + name: "Add a few nodes, and remove some of them", + operations: []operation{ + addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(0), removeNode(6), + }, + expected: []*v1.Node{nodes[5], nodes[2]}, + }, + { + name: "Add a few nodes, remove all of them, and add more", + operations: []operation{ + addNode(2), addNode(5), addNode(6), updateSnapshot(), + removeNode(2), removeNode(5), removeNode(6), updateSnapshot(), + addNode(7), addNode(9), + }, + expected: []*v1.Node{nodes[9], nodes[7]}, + }, + { + name: "Update nodes in particular order", + operations: []operation{ + addNode(8), updateNode(2), updateNode(8), updateSnapshot(), + addNode(1), + }, + expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, + }, + { + name: "Add some nodes and some pods", + operations: []operation{ + addNode(0), addNode(2), addNode(8), updateSnapshot(), + addPod(8), addPod(2), + }, + expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, + }, + { + name: "Updating a pod moves its node to the head", + operations: []operation{ + addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0), + }, + expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, + }, + { + name: "Remove pod from non-existing node", + operations: []operation{ + addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3), + }, + expected: []*v1.Node{nodes[2], nodes[0]}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cache = newSchedulerCache(time.Second, time.Second, nil) + snapshot = NewNodeInfoSnapshot() + + for _, op := range test.operations { + op() + } + + if len(test.expected) != len(cache.nodes) { + t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes)) + } + var i int + // Check that cache is in the expected state. + for node := cache.headNode; node != nil; node = node.next { + if node.info.Node().Name != test.expected[i].Name { + t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i) + } + i++ + } + // Make sure we visited all the cached nodes in the above for loop. + if i != len(cache.nodes) { + t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) + } + + // Always update the snapshot at the end of operations and compare it. + cache.UpdateNodeInfoSnapshot(&snapshot) + if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil { + t.Error(err) + } + }) + } +} + +func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error { + if len(snapshot.NodeInfoMap) != len(cache.nodes) { + return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap)) + } + for name, ni := range cache.nodes { + if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) { + return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name]) + } + } + return nil +} + func BenchmarkList1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() @@ -1131,8 +1390,8 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) { cache := setupCacheOf1kNodes30kPods(b) b.ResetTimer() for n := 0; n < b.N; n++ { - cachedNodes := map[string]*schedulernodeinfo.NodeInfo{} - cache.SnapshotNodeInfo(cachedNodes) + cachedNodes := NewNodeInfoSnapshot() + cache.UpdateNodeInfoSnapshot(&cachedNodes) } }