Add image states to scheduler cache

pull/8/head
Silvery Fu 2018-07-11 23:58:02 -07:00
parent 05293233cf
commit c3f111f74a
4 changed files with 121 additions and 87 deletions

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features"
@ -59,6 +60,8 @@ type schedulerCache struct {
podStates map[string]*podState
nodes map[string]*NodeInfo
pdbs map[string]*policy.PodDisruptionBudget
// A map from image name to its imageState.
imageStates map[string]*imageState
}
type podState struct {
@ -69,6 +72,29 @@ type podState struct {
bindingFinished bool
}
type imageState struct {
// Size of the image
size int64
// A set of node names for nodes having this image present
nodes sets.String
}
// ImageStateSummary provides summarized information about the state of an image.
type ImageStateSummary struct {
// Size of the image
Size int64
// Used to track how many nodes have this image
NumNodes int
}
// createImageStateSummary returns a summarizing snapshot of the given image's state.
func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary {
return &ImageStateSummary{
Size: state.size,
NumNodes: len(state.nodes),
}
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
return &schedulerCache{
ttl: ttl,
@ -79,6 +105,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
assumedPods: make(map[string]bool),
podStates: make(map[string]*podState),
pdbs: make(map[string]*policy.PodDisruptionBudget),
imageStates: make(map[string]*imageState),
}
}
@ -113,6 +140,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for name, info := range cache.nodes {
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
// Transient scheduler info is reset here.
@ -394,7 +422,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
if !ok {
n = NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.node)
}
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
@ -406,7 +438,11 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
if !ok {
n = NewNodeInfo()
cache.nodes[newNode.Name] = n
} else {
cache.removeNodeImageStates(n.node)
}
cache.addNodeImageStates(newNode, n)
return n.SetNode(newNode)
}
@ -425,9 +461,62 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
if len(n.pods) == 0 && n.node == nil {
delete(cache.nodes, node.Name)
}
cache.removeNodeImageStates(node)
return nil
}
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) {
newSum := make(map[string]*ImageStateSummary)
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
nodeInfo.imageStates = newSum
}
// removeNodeImageStates removes the given node record from image entries having the node
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
// is no longer available on any node, the image entry will be removed from imageStates.
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
if node == nil {
return
}
for _, image := range node.Status.Images {
for _, name := range image.Names {
state, ok := cache.imageStates[name]
if ok {
state.nodes.Delete(node.Name)
}
if len(state.nodes) == 0 {
// Remove the unused image to make sure the length of
// imageStates represents the total number of different
// images on all nodes
delete(cache.imageStates, name)
}
}
}
}
func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
cache.mu.Lock()
defer cache.mu.Unlock()

View File

@ -108,7 +108,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}, {
pods: []*v1.Pod{testPods[1], testPods[2]},
@ -125,7 +125,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1], testPods[2]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}, { // test non-zero request
pods: []*v1.Pod{testPods[3]},
@ -142,7 +142,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[3]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}, {
pods: []*v1.Pod{testPods[4]},
@ -160,7 +160,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}, {
pods: []*v1.Pod{testPods[4], testPods[5]},
@ -178,7 +178,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[4], testPods[5]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}, {
pods: []*v1.Pod{testPods[6]},
@ -195,7 +195,7 @@ func TestAssumePodScheduled(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[6]},
usedPorts: newHostPortInfoBuilder().build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
},
}
@ -275,7 +275,7 @@ func TestExpirePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}}
@ -328,7 +328,7 @@ func TestAddPodWillConfirm(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}}
@ -425,7 +425,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{updatedPod.DeepCopy()},
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
},
}}
@ -481,7 +481,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}}
@ -537,7 +537,7 @@ func TestUpdatePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
}, {
requestedResource: &Resource{
MilliCPU: 100,
@ -551,7 +551,7 @@ func TestUpdatePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
}},
}}
@ -669,7 +669,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
}, {
requestedResource: &Resource{
MilliCPU: 100,
@ -683,7 +683,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
}},
}}
@ -761,7 +761,7 @@ func TestEphemeralStorageResource(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{podE},
usedPorts: schedutil.HostPortInfo{},
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
},
}
@ -808,7 +808,7 @@ func TestRemovePod(t *testing.T) {
allocatableResource: &Resource{},
pods: []*v1.Pod{basePod},
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
imageSizes: map[string]int64{},
imageStates: make(map[string]*ImageStateSummary),
},
}}

View File

@ -58,9 +58,10 @@ type NodeInfo struct {
taints []v1.Taint
taintsErr error
// This is a map from image name to image size, also for checking image existence on the node
// Cache it here to avoid rebuilding the map during scheduling, e.g., in image_locality.go
imageSizes map[string]int64
// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
// state information.
imageStates map[string]*ImageStateSummary
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
// scheduling cycle.
@ -261,7 +262,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
TransientInfo: newTransientSchedulerInfo(),
generation: nextGeneration(),
usedPorts: make(util.HostPortInfo),
imageSizes: make(map[string]int64),
imageStates: make(map[string]*ImageStateSummary),
}
for _, pod := range pods {
ni.AddPod(pod)
@ -293,12 +294,12 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo {
return n.usedPorts
}
// ImageSizes returns the image size information on this node.
func (n *NodeInfo) ImageSizes() map[string]int64 {
// ImageStates returns the state information of all images.
func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
if n == nil {
return nil
}
return n.imageSizes
return n.imageStates
}
// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
@ -392,7 +393,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
diskPressureCondition: n.diskPressureCondition,
pidPressureCondition: n.pidPressureCondition,
usedPorts: make(util.HostPortInfo),
imageSizes: n.imageSizes,
imageStates: make(map[string]*ImageStateSummary),
generation: n.generation,
}
if len(n.pods) > 0 {
@ -547,17 +548,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
}
}
func (n *NodeInfo) updateImageSizes() {
node := n.Node()
imageSizes := make(map[string]int64)
for _, image := range node.Status.Images {
for _, name := range image.Names {
imageSizes[name] = image.SizeBytes
}
}
n.imageSizes = imageSizes
}
// SetNode sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error {
n.node = node
@ -579,7 +569,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
}
}
n.TransientInfo = newTransientSchedulerInfo()
n.updateImageSizes()
n.generation = nextGeneration()
return nil
}
@ -596,6 +585,7 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
n.memoryPressureCondition = v1.ConditionUnknown
n.diskPressureCondition = v1.ConditionUnknown
n.pidPressureCondition = v1.ConditionUnknown
n.imageStates = make(map[string]*ImageStateSummary)
n.generation = nextGeneration()
return nil
}

View File

@ -26,7 +26,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/util/parsers"
)
func TestNewResource(t *testing.T) {
@ -241,46 +240,6 @@ func TestSetMaxResource(t *testing.T) {
}
}
func TestImageSizes(t *testing.T) {
ni := fakeNodeInfo()
ni.node = &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Status: v1.NodeStatus{
Images: []v1.ContainerImage{
{
Names: []string{
"gcr.io/10:" + parsers.DefaultImageTag,
"gcr.io/10:v1",
},
SizeBytes: int64(10 * 1024 * 1024),
},
{
Names: []string{
"gcr.io/50:" + parsers.DefaultImageTag,
"gcr.io/50:v1",
},
SizeBytes: int64(50 * 1024 * 1024),
},
},
},
}
ni.updateImageSizes()
expected := map[string]int64{
"gcr.io/10:" + parsers.DefaultImageTag: 10 * 1024 * 1024,
"gcr.io/10:v1": 10 * 1024 * 1024,
"gcr.io/50:" + parsers.DefaultImageTag: 50 * 1024 * 1024,
"gcr.io/50:v1": 50 * 1024 * 1024,
}
imageSizes := ni.ImageSizes()
if !reflect.DeepEqual(expected, imageSizes) {
t.Errorf("expected: %#v, got: %#v", expected, imageSizes)
}
}
func TestNewNodeInfo(t *testing.T) {
nodeName := "test-node"
pods := []*v1.Pod{
@ -312,7 +271,7 @@ func TestNewNodeInfo(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@ -401,9 +360,7 @@ func TestNodeInfoClone(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{
"gcr.io/10": 10 * 1024 * 1024,
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@ -473,9 +430,7 @@ func TestNodeInfoClone(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{
"gcr.io/10": 10 * 1024 * 1024,
},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@ -633,7 +588,7 @@ func TestNodeInfoAddPod(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@ -752,7 +707,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
@ -868,7 +823,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
{Protocol: "TCP", Port: 8080}: {},
},
},
imageSizes: map[string]int64{},
imageStates: map[string]*ImageStateSummary{},
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{