Add NodeTree to the scheduler cache

pull/8/head
Bobby (Babak) Salamat 2018-07-27 11:56:29 -07:00
parent c1896c97ea
commit a5045d107e
5 changed files with 42 additions and 13 deletions

View File

@ -59,6 +59,7 @@ type schedulerCache struct {
// a map from pod key to podState.
podStates map[string]*podState
nodes map[string]*NodeInfo
nodeTree *NodeTree
pdbs map[string]*policy.PodDisruptionBudget
// A map from image name to its imageState.
imageStates map[string]*imageState
@ -102,6 +103,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
stop: stop,
nodes: make(map[string]*NodeInfo),
nodeTree: newNodeTree(nil),
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
@ -426,6 +428,7 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.AddNode(node)
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
@ -442,6 +445,7 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
cache.removeNodeImageStates(n.node)
}
cache.nodeTree.UpdateNode(oldNode, newNode)
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
}
@ -462,6 +466,7 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
delete(cache.nodes, node.Name)
}
cache.nodeTree.RemoveNode(node)
cache.removeNodeImageStates(node)
return nil
}
@ -598,3 +603,7 @@ func (cache *schedulerCache) expirePod(key string, ps *podState) error {
delete(cache.podStates, key)
return nil
}
func (cache *schedulerCache) NodeTree() *NodeTree {
return cache.nodeTree
}

View File

@ -1065,6 +1065,9 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in schedulercache.", node.Name)
}
if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
}
// Generations are globally unique. We check in our unit tests that they are incremented correctly.
expected.generation = got.generation
@ -1100,12 +1103,21 @@ func TestNodeOperators(t *testing.T) {
if !reflect.DeepEqual(got, expected) {
t.Errorf("Failed to update node in schedulercache:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
if cache.nodeTree.NumNodes != 1 || cache.nodeTree.Next() != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
}
// Case 4: the node can not be removed if pods is not empty.
cache.RemoveNode(node)
if _, found := cache.nodes[node.Name]; !found {
t.Errorf("The node %v should not be removed if pods is not empty.", node.Name)
}
// Check nodeTree after remove. The node should be removed from the nodeTree even if there are
// still pods on it.
if cache.nodeTree.NumNodes != 0 || cache.nodeTree.Next() != "" {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
}
}

View File

@ -125,6 +125,9 @@ type Cache interface {
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
IsUpToDate(n *NodeInfo) bool
// NodeTree returns a node tree structure
NodeTree() *NodeTree
}
// Snapshot is a snapshot of cache state

View File

@ -21,8 +21,8 @@ import (
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/staging/src/k8s.io/apimachinery/pkg/util/sets"
"github.com/golang/glog"
)
@ -33,7 +33,7 @@ type NodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
ExhaustedZones sets.String // set of zones that all of their nodes are returned by next()
exhaustedZones sets.String // set of zones that all of their nodes are returned by next()
NumNodes int
mu sync.RWMutex
}
@ -62,7 +62,7 @@ func (na *nodeArray) next() (nodeName string, exhausted bool) {
func newNodeTree(nodes []*v1.Node) *NodeTree {
nt := &NodeTree{
tree: make(map[string]*nodeArray),
ExhaustedZones: sets.NewString(),
exhaustedZones: sets.NewString(),
}
for _, n := range nodes {
nt.AddNode(n)
@ -83,18 +83,20 @@ func (nt *NodeTree) addNode(n *v1.Node) {
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
glog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
nt.tree[zone] = na
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
glog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
// RemoveNode removes a node from the NodeTree.
func (nt *NodeTree) RemoveNode(n *v1.Node) error {
nt.mu.Lock()
defer nt.mu.Unlock()
@ -106,19 +108,18 @@ func (nt *NodeTree) removeNode(n *v1.Node) error {
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
if nodeName == n.Name {
// delete without preserving order
na.nodes[i] = na.nodes[len(na.nodes)-1]
na.nodes = na.nodes[:len(na.nodes)-1]
nt.tree[zone] = na
na.nodes = append(na.nodes[:i], na.nodes[i+1:]...)
if len(na.nodes) == 0 {
nt.removeZone(zone)
}
glog.V(5).Infof("Removed node %v in group %v from NodeTree", n.Name, zone)
nt.NumNodes--
return nil
}
}
}
return fmt.Errorf("node %v in zone %v was not found", n.Name, zone)
glog.Errorf("Node %v in group %v was not found", n.Name, zone)
return fmt.Errorf("node %v in group %v was not found", n.Name, zone)
}
// removeZone removes a zone from tree.
@ -132,6 +133,7 @@ func (nt *NodeTree) removeZone(zone string) {
}
}
// UpdateNode updates a node in the NodeTree.
func (nt *NodeTree) UpdateNode(old, new *v1.Node) {
var oldZone string
if old != nil {
@ -153,7 +155,7 @@ func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.ExhaustedZones = sets.NewString()
nt.exhaustedZones = sets.NewString()
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
@ -174,8 +176,8 @@ func (nt *NodeTree) Next() string {
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
nt.ExhaustedZones.Insert(zone)
if len(nt.ExhaustedZones) == len(nt.zones) { // all zones are exhausted. we should reset.
nt.exhaustedZones.Insert(zone)
if len(nt.exhaustedZones) == len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {

View File

@ -106,5 +106,8 @@ func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{}
}
// IsUpToDate is a fake mthod for testing
// IsUpToDate is a fake method for testing
func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true }
// NodeTree is a fake method for testing.
func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil }