Merge pull request #74041 from bsalamat/optimize_snapshot

Optimize scheduler cache snapshot to improve scheduling throughput
pull/564/head
Kubernetes Prow Robot 2019-02-20 19:24:59 -08:00 committed by GitHub
commit 50328e0ba7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 460 additions and 82 deletions

View File

@ -141,7 +141,7 @@ type genericScheduler struct {
extenders []algorithm.SchedulerExtender
lastNodeIndex uint64
alwaysCheckAllPredicates bool
cachedNodeInfoMap map[string]*schedulernodeinfo.NodeInfo
nodeInfoSnapshot schedulerinternalcache.NodeInfoSnapshot
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister
@ -153,7 +153,7 @@ type genericScheduler struct {
// functions.
func (g *genericScheduler) snapshot() error {
// Used for all fit and priority funcs.
return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
return g.cache.UpdateNodeInfoSnapshot(&g.nodeInfoSnapshot)
}
// Schedule tries to schedule the given pod to one of the nodes in the node list.
@ -210,8 +210,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
}, nil
}
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return result, err
}
@ -290,7 +290,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
if !ok || fitError == nil {
return nil, nil, nil, nil
}
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
@ -311,7 +311,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
if err != nil {
return nil, nil, nil, err
}
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
nodeToVictims, err := selectNodesForPreemption(pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
@ -335,7 +335,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
}
@ -355,7 +355,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
newNodeToVictims, err := extender.ProcessPreemption(
pod,
nodeToVictims,
g.cachedNodeInfoMap,
g.nodeInfoSnapshot.NodeInfoMap,
)
if err != nil {
if extender.IsIgnorable() {
@ -452,14 +452,14 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
ctx, cancel := context.WithCancel(context.Background())
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
checkNode := func(i int) {
nodeName := g.cache.NodeTree().Next()
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.nodeInfoSnapshot.NodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
@ -476,7 +476,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
}
} else {
predicateResultLock.Lock()
@ -500,7 +500,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
@ -1193,7 +1193,7 @@ func NewGenericScheduler(
priorityMetaProducer: priorityMetaProducer,
pluginSet: pluginSet,
extenders: extenders,
cachedNodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
nodeInfoSnapshot: schedulerinternalcache.NewNodeInfoSnapshot(),
volumeBinder: volumeBinder,
pvcLister: pvcLister,
pdbLister: pdbLister,

View File

@ -513,7 +513,7 @@ func makeScheduler(predicates map[string]algorithmpredicates.FitPredicate, nodes
emptyPluginSet,
nil, nil, nil, nil, false, false,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateNodeNameToInfoMap(s.(*genericScheduler).cachedNodeInfoMap)
cache.UpdateNodeInfoSnapshot(&s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
}

View File

@ -47,6 +47,15 @@ func New(ttl time.Duration, stop <-chan struct{}) Cache {
return cache
}
// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
// linked list. When a NodeInfo is updated, it goes to the head of the list.
// The items closer to the head are the most recently updated items.
type nodeInfoListItem struct {
info *schedulernodeinfo.NodeInfo
next *nodeInfoListItem
prev *nodeInfoListItem
}
type schedulerCache struct {
stop <-chan struct{}
ttl time.Duration
@ -59,8 +68,11 @@ type schedulerCache struct {
assumedPods map[string]bool
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*schedulernodeinfo.NodeInfo
nodeTree *NodeTree
nodes map[string]*nodeInfoListItem
// headNode points to the most recently updated NodeInfo in "nodes". It is the
// head of the linked list.
headNode *nodeInfoListItem
nodeTree *NodeTree
// A map from image name to its imageState.
imageStates map[string]*imageState
}
@ -94,7 +106,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
period: period,
stop: stop,
nodes: make(map[string]*schedulernodeinfo.NodeInfo),
nodes: make(map[string]*nodeInfoListItem),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
@ -102,15 +114,82 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
}
}
// Snapshot takes a snapshot of the current schedulerinternalcache. The method has performance impact,
// and should be only used in non-critical path.
// newNodeInfoListItem initializes a new nodeInfoListItem.
func newNodeInfoListItem(ni *schedulernodeinfo.NodeInfo) *nodeInfoListItem {
return &nodeInfoListItem{
info: ni,
}
}
// NewNodeInfoSnapshot initializes a NodeInfoSnapshot struct and returns it.
func NewNodeInfoSnapshot() NodeInfoSnapshot {
return NodeInfoSnapshot{
NodeInfoMap: make(map[string]*schedulernodeinfo.NodeInfo),
}
}
// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
// linked list. The head is the most recently updated NodeInfo.
// We assume cache lock is already acquired.
func (cache *schedulerCache) moveNodeInfoToHead(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.Errorf("No NodeInfo with name %v found in the cache", name)
return
}
// if the node info list item is already at the head, we are done.
if ni == cache.headNode {
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
if cache.headNode != nil {
cache.headNode.prev = ni
}
ni.next = cache.headNode
ni.prev = nil
cache.headNode = ni
}
// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
// linked list.
// We assume cache lock is already acquired.
func (cache *schedulerCache) removeNodeInfoFromList(name string) {
ni, ok := cache.nodes[name]
if !ok {
klog.Errorf("No NodeInfo with name %v found in the cache", name)
return
}
if ni.prev != nil {
ni.prev.next = ni.next
}
if ni.next != nil {
ni.next.prev = ni.prev
}
// if the removed item was at the head, we must update the head.
if ni == cache.headNode {
cache.headNode = ni.next
}
delete(cache.nodes, name)
}
// Snapshot takes a snapshot of the current scheduler cache. This is used for
// debugging purposes only and shouldn't be confused with UpdateNodeInfoSnapshot
// function.
// This method is expensive, and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*schedulernodeinfo.NodeInfo, len(cache.nodes))
for k, v := range cache.nodes {
nodes[k] = v.Clone()
nodes[k] = v.info.Clone()
}
assumedPods := make(map[string]bool, len(cache.assumedPods))
@ -124,22 +203,43 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
}
}
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) error {
// UpdateNodeInfoSnapshot takes a snapshot of cached NodeInfo map. This is called at
// beginning of every scheduling cycle.
// This function tracks generation number of NodeInfo and updates only the
// entries of an existing snapshot that have changed after the snapshot was taken.
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error {
cache.mu.Lock()
defer cache.mu.Unlock()
balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here.
info.TransientInfo.ResetTransientSchedulerInfo()
// Get the last generation of the the snapshot.
snapshotGeneration := nodeSnapshot.Generation
// Start from the head of the NodeInfo doubly linked list and update snapshot
// of NodeInfos updated after the last snapshot.
for node := cache.headNode; node != nil; node = node.next {
if node.info.GetGeneration() <= snapshotGeneration {
// all the nodes are updated before the existing snapshot. We are done.
break
}
if current, ok := nodeNameToInfo[name]; !ok || current.GetGeneration() != info.GetGeneration() {
nodeNameToInfo[name] = info.Clone()
if balancedVolumesEnabled && node.info.TransientInfo != nil {
// Transient scheduler info is reset here.
node.info.TransientInfo.ResetTransientSchedulerInfo()
}
if np := node.info.Node(); np != nil {
nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
}
}
for name := range nodeNameToInfo {
if _, ok := cache.nodes[name]; !ok {
delete(nodeNameToInfo, name)
// Update the snapshot generation with the latest NodeInfo generation.
if cache.headNode != nil {
nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
}
if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
for name := range nodeSnapshot.NodeInfoMap {
if _, ok := cache.nodes[name]; !ok {
delete(nodeSnapshot.NodeInfoMap, name)
}
}
}
return nil
@ -157,12 +257,12 @@ func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selecto
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
maxSize := 0
for _, info := range cache.nodes {
maxSize += len(info.Pods())
for _, n := range cache.nodes {
maxSize += len(n.info.Pods())
}
pods := make([]*v1.Pod, 0, maxSize)
for _, info := range cache.nodes {
for _, pod := range info.Pods() {
for _, n := range cache.nodes {
for _, pod := range n.info.Pods() {
if podFilter(pod) && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
}
@ -249,10 +349,11 @@ func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = schedulernodeinfo.NewNodeInfo()
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[pod.Spec.NodeName] = n
}
n.AddPod(pod)
n.info.AddPod(pod)
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
// Assumes that lock is already acquired.
@ -266,12 +367,17 @@ func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *v1.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.RemovePod(pod); err != nil {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
return fmt.Errorf("node %v is not found", pod.Spec.NodeName)
}
if err := n.info.RemovePod(pod); err != nil {
return err
}
if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, pod.Spec.NodeName)
if len(n.info.Pods()) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(pod.Spec.NodeName)
} else {
cache.moveNodeInfoToHead(pod.Spec.NodeName)
}
return nil
}
@ -407,15 +513,16 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
n, ok := cache.nodes[node.Name]
if !ok {
n = schedulernodeinfo.NewNodeInfo()
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(node.Name)
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n)
return n.SetNode(node)
cache.addNodeImageStates(node, n.info)
return n.info.SetNode(node)
}
func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
@ -424,31 +531,37 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
n, ok := cache.nodes[newNode.Name]
if !ok {
n = schedulernodeinfo.NewNodeInfo()
n = newNodeInfoListItem(schedulernodeinfo.NewNodeInfo())
cache.nodes[newNode.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
cache.removeNodeImageStates(n.info.Node())
}
cache.moveNodeInfoToHead(newNode.Name)
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
cache.addNodeImageStates(newNode, n.info)
return n.info.SetNode(newNode)
}
func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n := cache.nodes[node.Name]
if err := n.RemoveNode(node); err != nil {
n, ok := cache.nodes[node.Name]
if !ok {
return fmt.Errorf("node %v is not found", node.Name)
}
if err := n.info.RemoveNode(node); err != nil {
return err
}
// We remove NodeInfo for this node only if there aren't any pods on this node.
// We can't do it unconditionally, because notifications about pods are delivered
// in a different watch, and thus can potentially be observed later, even though
// they happened before node removal.
if len(n.Pods()) == 0 && n.Node() == nil {
delete(cache.nodes, node.Name)
if len(n.info.Pods()) == 0 && n.info.Node() == nil {
cache.removeNodeInfoFromList(node.Name)
} else {
cache.moveNodeInfoToHead(node.Name)
}
cache.nodeTree.RemoveNode(node)

View File

@ -35,16 +35,19 @@ import (
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual, expected *schedulernodeinfo.NodeInfo) {
func deepEqualWithoutGeneration(t *testing.T, testcase int, actual *nodeInfoListItem, expected *schedulernodeinfo.NodeInfo) {
if (actual == nil) != (expected == nil) {
t.Error("One of the actual or expeted is nil and the other is not!")
}
// Ignore generation field.
if actual != nil {
actual.SetGeneration(0)
actual.info.SetGeneration(0)
}
if expected != nil {
expected.SetGeneration(0)
}
if !reflect.DeepEqual(actual, expected) {
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual, expected)
if actual != nil && !reflect.DeepEqual(actual.info, expected) {
t.Errorf("#%d: node info get=%s, want=%s", testcase, actual.info, expected)
}
}
@ -372,21 +375,27 @@ func TestSnapshot(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
t.Errorf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
t.Errorf("AddPod failed: %v", err)
}
}
snapshot := cache.Snapshot()
if !reflect.DeepEqual(snapshot.Nodes, cache.nodes) {
t.Fatalf("expect \n%+v; got \n%+v", cache.nodes, snapshot.Nodes)
if len(snapshot.Nodes) != len(cache.nodes) {
t.Errorf("Unequal number of nodes in the cache and its snapshot. expeted: %v, got: %v", len(cache.nodes), len(snapshot.Nodes))
}
for name, ni := range snapshot.Nodes {
nItem := cache.nodes[name]
if !reflect.DeepEqual(ni, nItem.info) {
t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni)
}
}
if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) {
t.Fatalf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods)
}
}
}
@ -765,7 +774,7 @@ func TestEphemeralStorageResource(t *testing.T) {
n = cache.nodes[nodeName]
if n != nil {
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n)
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
}
}
}
@ -810,7 +819,7 @@ func TestRemovePod(t *testing.T) {
n = cache.nodes[nodeName]
if n != nil {
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n)
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
}
}
}
@ -864,7 +873,7 @@ func TestForgetPod(t *testing.T) {
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
if n := cache.nodes[nodeName]; n != nil {
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n)
t.Errorf("#%d: expecting pod deleted and nil node info, get=%s", i, n.info)
}
}
}
@ -1062,16 +1071,16 @@ func TestNodeOperators(t *testing.T) {
}
// Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.SetGeneration(got.GetGeneration())
if !reflect.DeepEqual(got, expected) {
expected.SetGeneration(got.info.GetGeneration())
if !reflect.DeepEqual(got.info, expected) {
t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected)
}
// Case 2: dump cached nodes successfully.
cachedNodes := map[string]*schedulernodeinfo.NodeInfo{}
cache.UpdateNodeNameToInfoMap(cachedNodes)
newNode, found := cachedNodes[node.Name]
if !found || len(cachedNodes) != 1 {
cachedNodes := NewNodeInfoSnapshot()
cache.UpdateNodeInfoSnapshot(&cachedNodes)
newNode, found := cachedNodes.NodeInfoMap[node.Name]
if !found || len(cachedNodes.NodeInfoMap) != 1 {
t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes)
}
expected.SetGeneration(newNode.GetGeneration())
@ -1091,12 +1100,12 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in schedulernodeinfo after UpdateNode.", node.Name)
}
if got.GetGeneration() <= expected.GetGeneration() {
t.Errorf("Generation is not incremented. got: %v, expected: %v", got.GetGeneration(), expected.GetGeneration())
if got.info.GetGeneration() <= expected.GetGeneration() {
t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.GetGeneration(), expected.GetGeneration())
}
expected.SetGeneration(got.GetGeneration())
expected.SetGeneration(got.info.GetGeneration())
if !reflect.DeepEqual(got, expected) {
if !reflect.DeepEqual(got.info, expected) {
t.Errorf("Failed to update node in schedulernodeinfo:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
@ -1117,6 +1126,256 @@ func TestNodeOperators(t *testing.T) {
}
}
// TestSchedulerCache_UpdateNodeInfoSnapshot tests UpdateNodeInfoSnapshot function of cache.
func TestSchedulerCache_UpdateNodeInfoSnapshot(t *testing.T) {
// Create a few nodes to be used in tests.
nodes := []*v1.Node{}
for i := 0; i < 10; i++ {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-node%v", i),
},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("100m"),
},
},
}
nodes = append(nodes, node)
}
// Create a few nodes as updated versions of the above nodes
updatedNodes := []*v1.Node{}
for _, n := range nodes {
updatedNode := n.DeepCopy()
updatedNode.Status.Allocatable = v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("500m"),
}
updatedNodes = append(updatedNodes, updatedNode)
}
// Create a few pods for tests.
pods := []*v1.Pod{}
for i := 0; i < 10; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("test-pod%v", i),
Namespace: "test-ns",
UID: types.UID(fmt.Sprintf("test-puid%v", i)),
},
Spec: v1.PodSpec{
NodeName: fmt.Sprintf("test-node%v", i),
},
}
pods = append(pods, pod)
}
// Create a few pods as updated versions of the above pods.
updatedPods := []*v1.Pod{}
for _, p := range pods {
updatedPod := p.DeepCopy()
priority := int32(1000)
updatedPod.Spec.Priority = &priority
updatedPods = append(updatedPods, updatedPod)
}
var cache *schedulerCache
var snapshot NodeInfoSnapshot
type operation = func()
addNode := func(i int) operation {
return func() {
cache.AddNode(nodes[i])
}
}
removeNode := func(i int) operation {
return func() {
cache.RemoveNode(nodes[i])
}
}
updateNode := func(i int) operation {
return func() {
cache.UpdateNode(nodes[i], updatedNodes[i])
}
}
addPod := func(i int) operation {
return func() {
cache.AddPod(pods[i])
}
}
removePod := func(i int) operation {
return func() {
cache.RemovePod(pods[i])
}
}
updatePod := func(i int) operation {
return func() {
cache.UpdatePod(pods[i], updatedPods[i])
}
}
updateSnapshot := func() operation {
return func() {
cache.UpdateNodeInfoSnapshot(&snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil {
t.Error(err)
}
}
}
tests := []struct {
name string
operations []operation
expected []*v1.Node
}{
{
name: "Empty cache",
operations: []operation{},
expected: []*v1.Node{},
},
{
name: "Single node",
operations: []operation{addNode(1)},
expected: []*v1.Node{nodes[1]},
},
{
name: "Add node, remove it, add it again",
operations: []operation{
addNode(1), updateSnapshot(), removeNode(1), addNode(1),
},
expected: []*v1.Node{nodes[1]},
},
{
name: "Add a few nodes, and snapshot in the middle",
operations: []operation{
addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2),
updateSnapshot(), addNode(3),
},
expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]},
},
{
name: "Add a few nodes, and snapshot in the end",
operations: []operation{
addNode(0), addNode(2), addNode(5), addNode(6),
},
expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]},
},
{
name: "Remove non-existing node",
operations: []operation{
addNode(0), addNode(1), updateSnapshot(), removeNode(8),
},
expected: []*v1.Node{nodes[1], nodes[0]},
},
{
name: "Update some nodes",
operations: []operation{
addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1),
},
expected: []*v1.Node{nodes[1], nodes[5], nodes[0]},
},
{
name: "Add a few nodes, and remove all of them",
operations: []operation{
addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
removeNode(0), removeNode(2), removeNode(5), removeNode(6),
},
expected: []*v1.Node{},
},
{
name: "Add a few nodes, and remove some of them",
operations: []operation{
addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(),
removeNode(0), removeNode(6),
},
expected: []*v1.Node{nodes[5], nodes[2]},
},
{
name: "Add a few nodes, remove all of them, and add more",
operations: []operation{
addNode(2), addNode(5), addNode(6), updateSnapshot(),
removeNode(2), removeNode(5), removeNode(6), updateSnapshot(),
addNode(7), addNode(9),
},
expected: []*v1.Node{nodes[9], nodes[7]},
},
{
name: "Update nodes in particular order",
operations: []operation{
addNode(8), updateNode(2), updateNode(8), updateSnapshot(),
addNode(1),
},
expected: []*v1.Node{nodes[1], nodes[8], nodes[2]},
},
{
name: "Add some nodes and some pods",
operations: []operation{
addNode(0), addNode(2), addNode(8), updateSnapshot(),
addPod(8), addPod(2),
},
expected: []*v1.Node{nodes[2], nodes[8], nodes[0]},
},
{
name: "Updating a pod moves its node to the head",
operations: []operation{
addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0),
},
expected: []*v1.Node{nodes[0], nodes[4], nodes[2]},
},
{
name: "Remove pod from non-existing node",
operations: []operation{
addNode(0), addPod(0), addNode(2), updateSnapshot(), removePod(3),
},
expected: []*v1.Node{nodes[2], nodes[0]},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cache = newSchedulerCache(time.Second, time.Second, nil)
snapshot = NewNodeInfoSnapshot()
for _, op := range test.operations {
op()
}
if len(test.expected) != len(cache.nodes) {
t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes))
}
var i int
// Check that cache is in the expected state.
for node := cache.headNode; node != nil; node = node.next {
if node.info.Node().Name != test.expected[i].Name {
t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i)
}
i++
}
// Make sure we visited all the cached nodes in the above for loop.
if i != len(cache.nodes) {
t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i)
}
// Always update the snapshot at the end of operations and compare it.
cache.UpdateNodeInfoSnapshot(&snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, &snapshot); err != nil {
t.Error(err)
}
})
}
}
func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *NodeInfoSnapshot) error {
if len(snapshot.NodeInfoMap) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.NodeInfoMap))
}
for name, ni := range cache.nodes {
if !reflect.DeepEqual(snapshot.NodeInfoMap[name], ni.info) {
return fmt.Errorf("unexpected node info. Expected: %v, got: %v", ni.info, snapshot.NodeInfoMap[name])
}
}
return nil
}
func BenchmarkList1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
@ -1131,8 +1390,8 @@ func BenchmarkUpdate1kNodes30kPods(b *testing.B) {
cache := setupCacheOf1kNodes30kPods(b)
b.ResetTimer()
for n := 0; n < b.N; n++ {
cachedNodes := map[string]*schedulernodeinfo.NodeInfo{}
cache.UpdateNodeNameToInfoMap(cachedNodes)
cachedNodes := NewNodeInfoSnapshot()
cache.UpdateNodeInfoSnapshot(&cachedNodes)
}
}

View File

@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
],

View File

@ -21,7 +21,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
// Cache is used for testing
@ -75,8 +74,8 @@ func (c *Cache) UpdateNode(oldNode, newNode *v1.Node) error { return nil }
// RemoveNode is a fake method for testing.
func (c *Cache) RemoveNode(node *v1.Node) error { return nil }
// UpdateNodeNameToInfoMap is a fake method for testing.
func (c *Cache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error {
// UpdateNodeInfoSnapshot is a fake method for testing.
func (c *Cache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulerinternalcache.NodeInfoSnapshot) error {
return nil
}

View File

@ -95,10 +95,10 @@ type Cache interface {
// RemoveNode removes overall information about node.
RemoveNode(node *v1.Node) error
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// UpdateNodeInfoSnapshot updates the passed infoSnapshot to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*schedulernodeinfo.NodeInfo) error
UpdateNodeInfoSnapshot(nodeSnapshot *NodeInfoSnapshot) error
// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error)
@ -118,3 +118,11 @@ type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*schedulernodeinfo.NodeInfo
}
// NodeInfoSnapshot is a snapshot of cache NodeInfo. The scheduler takes a
// snapshot at the beginning of each scheduling cycle and uses it for its
// operations in that cycle.
type NodeInfoSnapshot struct {
NodeInfoMap map[string]*schedulernodeinfo.NodeInfo
Generation int64
}