mirror of https://github.com/k3s-io/k3s
Merge pull request #65745 from silveryfu/image-locality-scoring
Automatic merge from submit-queue (batch tested with PRs 66011, 66111, 66106, 66039, 65745). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Enable adaptive scoring in ImageLocalityPriority **What this PR does / why we need it**: This PR replaces the original, pure image-size based scoring to an adaptive scoring scheme. The new scoring scheme considers not only the image size but also its `"spread" `- the definition of `"spread"` is described in what follows: > Given an image`i`, `spread_i = num_node_has_i / total_num_nodes` And the image receives the score: `score_i = size_i * spread_i`, as proposed by @resouer. The final node score is the summation of image scores for all images found existing on the node that are mentioned in the pod spec. The goal of this heuristic is to better _balance image locality with other scheduling policies_. In particular, it aims to mitigate and prevent the undesirable "node heating problem", _i.e._, pods get assigned to the same or a few nodes due to preferred image locality. Given an image, the larger `spread` it has the more image locality we can consider for it - since we can expect more nodes having this image. The new image state information in scheduler cache, enabled in this PR, allows other potential heuristics to be explored. **Special notes for your reviewer**: @resouer Additional unit tests are WIP. **Release note**: ```release-note NONE ```pull/8/head
commit
b883f4cff8
|
@ -26,11 +26,12 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/parsers"
|
||||
)
|
||||
|
||||
// This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range.
|
||||
// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
|
||||
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
|
||||
const (
|
||||
mb int64 = 1024 * 1024
|
||||
minImgSize int64 = 23 * mb
|
||||
maxImgSize int64 = 1000 * mb
|
||||
minThreshold int64 = 23 * mb
|
||||
maxThreshold int64 = 1000 * mb
|
||||
)
|
||||
|
||||
// ImageLocalityPriorityMap is a priority function that favors nodes that already have requested pod container's images.
|
||||
|
@ -44,44 +45,55 @@ func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *scheduler
|
|||
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
|
||||
}
|
||||
|
||||
sumSize := totalImageSize(nodeInfo, pod.Spec.Containers)
|
||||
var score int
|
||||
if priorityMeta, ok := meta.(*priorityMetadata); ok {
|
||||
score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes))
|
||||
} else {
|
||||
// if we are not able to parse priority meta data, skip this priority
|
||||
score = 0
|
||||
}
|
||||
|
||||
return schedulerapi.HostPriority{
|
||||
Host: node.Name,
|
||||
Score: calculateScoreFromSize(sumSize),
|
||||
Score: score,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// calculateScoreFromSize calculates the priority of a node. sumSize is sum size of requested images on this node.
|
||||
// 1. Split image size range into 10 buckets.
|
||||
// 2. Decide the priority of a given sumSize based on which bucket it belongs to.
|
||||
func calculateScoreFromSize(sumSize int64) int {
|
||||
switch {
|
||||
case sumSize == 0 || sumSize < minImgSize:
|
||||
// 0 means none of the images required by this pod are present on this
|
||||
// node or the total size of the images present are too small to be taken into further consideration.
|
||||
return 0
|
||||
|
||||
case sumSize >= maxImgSize:
|
||||
// If existing images' total size is larger than max, just make it highest priority.
|
||||
return schedulerapi.MaxPriority
|
||||
// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
|
||||
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
|
||||
func calculatePriority(sumScores int64) int {
|
||||
if sumScores < minThreshold {
|
||||
sumScores = minThreshold
|
||||
} else if sumScores > maxThreshold {
|
||||
sumScores = maxThreshold
|
||||
}
|
||||
|
||||
return int((int64(schedulerapi.MaxPriority) * (sumSize - minImgSize) / (maxImgSize - minImgSize)) + 1)
|
||||
return int(int64(schedulerapi.MaxPriority) * (sumScores - minThreshold) / (maxThreshold - minThreshold))
|
||||
}
|
||||
|
||||
// totalImageSize returns the total image size of all the containers that are already on the node.
|
||||
func totalImageSize(nodeInfo *schedulercache.NodeInfo, containers []v1.Container) int64 {
|
||||
var total int64
|
||||
// sumImageScores returns the sum of image scores of all the containers that are already on the node.
|
||||
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
|
||||
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
|
||||
func sumImageScores(nodeInfo *schedulercache.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
|
||||
var sum int64
|
||||
imageStates := nodeInfo.ImageStates()
|
||||
|
||||
imageSizes := nodeInfo.ImageSizes()
|
||||
for _, container := range containers {
|
||||
if size, ok := imageSizes[normalizedImageName(container.Image)]; ok {
|
||||
total += size
|
||||
if state, ok := imageStates[normalizedImageName(container.Image)]; ok {
|
||||
sum += scaledImageScore(state, totalNumNodes)
|
||||
}
|
||||
}
|
||||
|
||||
return total
|
||||
return sum
|
||||
}
|
||||
|
||||
// scaledImageScore returns an adaptively scaled score for the given state of an image.
|
||||
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
|
||||
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
|
||||
// a few nodes due to image locality.
|
||||
func scaledImageScore(imageState *schedulercache.ImageStateSummary, totalNumNodes int) int64 {
|
||||
spread := float64(imageState.NumNodes) / float64(totalNumNodes)
|
||||
return int64(float64(imageState.Size) * spread)
|
||||
}
|
||||
|
||||
// normalizedImageName returns the CRI compliant name for a given image.
|
||||
|
|
|
@ -42,13 +42,13 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
test40140 := v1.PodSpec{
|
||||
test40300 := v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Image: "gcr.io/40",
|
||||
},
|
||||
{
|
||||
Image: "gcr.io/140",
|
||||
Image: "gcr.io/300",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
node401402000 := v1.NodeStatus{
|
||||
node403002000 := v1.NodeStatus{
|
||||
Images: []v1.ContainerImage{
|
||||
{
|
||||
Names: []string{
|
||||
|
@ -76,10 +76,10 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
},
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/140:" + parsers.DefaultImageTag,
|
||||
"gcr.io/140:v1",
|
||||
"gcr.io/300:" + parsers.DefaultImageTag,
|
||||
"gcr.io/300:v1",
|
||||
},
|
||||
SizeBytes: int64(140 * mb),
|
||||
SizeBytes: int64(300 * mb),
|
||||
},
|
||||
{
|
||||
Names: []string{
|
||||
|
@ -120,29 +120,29 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
|
||||
// Node1
|
||||
// Image: gcr.io/40:latest 40MB
|
||||
// Score: (40M-23M)/97.7M + 1 = 1
|
||||
// Score: 0 (40M/2 < 23M, min-threshold)
|
||||
|
||||
// Node2
|
||||
// Image: gcr.io/250:latest 250MB
|
||||
// Score: (250M-23M)/97.7M + 1 = 3
|
||||
// Score: 10 * (250M/2 - 23M)/(1000M - 23M) = 1
|
||||
pod: &v1.Pod{Spec: test40250},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 3}},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}},
|
||||
name: "two images spread on two nodes, prefer the larger image one",
|
||||
},
|
||||
{
|
||||
// Pod: gcr.io/40 gcr.io/140
|
||||
// Pod: gcr.io/40 gcr.io/300
|
||||
|
||||
// Node1
|
||||
// Image: gcr.io/40:latest 40MB, gcr.io/140:latest 140MB
|
||||
// Score: (40M+140M-23M)/97.7M + 1 = 2
|
||||
// Image: gcr.io/40:latest 40MB, gcr.io/300:latest 300MB
|
||||
// Score: 10 * ((40M + 300M)/2 - 23M)/(1000M - 23M) = 1
|
||||
|
||||
// Node2
|
||||
// Image: not present
|
||||
// Score: 0
|
||||
pod: &v1.Pod{Spec: test40140},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 2}, {Host: "machine2", Score: 0}},
|
||||
pod: &v1.Pod{Spec: test40300},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}},
|
||||
name: "two images on one node, prefer this node",
|
||||
},
|
||||
{
|
||||
|
@ -150,13 +150,13 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
|
||||
// Node1
|
||||
// Image: gcr.io/2000:latest 2000MB
|
||||
// Score: 2000 > max score = 10
|
||||
// Score: 10 (2000M/2 >= 1000M, max-threshold)
|
||||
|
||||
// Node2
|
||||
// Image: gcr.io/10:latest 10MB
|
||||
// Score: 10 < min score = 0
|
||||
// Score: 0 (10M/2 < 23M, min-threshold)
|
||||
pod: &v1.Pod{Spec: testMinMax},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)},
|
||||
nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: schedulerapi.MaxPriority}, {Host: "machine2", Score: 0}},
|
||||
name: "if exceed limit, use limit",
|
||||
},
|
||||
|
@ -165,7 +165,7 @@ func TestImageLocalityPriority(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes)
|
||||
list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
|
||||
list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, nodeNameToInfo, test.nodes)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ type priorityMetadata struct {
|
|||
podSelectors []labels.Selector
|
||||
controllerRef *metav1.OwnerReference
|
||||
podFirstServiceSelector labels.Selector
|
||||
totalNumNodes int
|
||||
}
|
||||
|
||||
// PriorityMetadata is a PriorityMetadataProducer. Node info can be nil.
|
||||
|
@ -67,6 +68,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo
|
|||
podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister),
|
||||
controllerRef: priorityutil.GetControllerRef(pod),
|
||||
podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister),
|
||||
totalNumNodes: len(nodeNameToInfo),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ go_library(
|
|||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
|
@ -30,13 +31,13 @@ go_test(
|
|||
srcs = [
|
||||
"cache_test.go",
|
||||
"node_info_test.go",
|
||||
"util_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/scheduler/algorithm/priorities/util:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//pkg/util/parsers:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -44,6 +45,7 @@ go_test(
|
|||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
],
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
|
@ -59,6 +60,8 @@ type schedulerCache struct {
|
|||
podStates map[string]*podState
|
||||
nodes map[string]*NodeInfo
|
||||
pdbs map[string]*policy.PodDisruptionBudget
|
||||
// A map from image name to its imageState.
|
||||
imageStates map[string]*imageState
|
||||
}
|
||||
|
||||
type podState struct {
|
||||
|
@ -69,6 +72,29 @@ type podState struct {
|
|||
bindingFinished bool
|
||||
}
|
||||
|
||||
type imageState struct {
|
||||
// Size of the image
|
||||
size int64
|
||||
// A set of node names for nodes having this image present
|
||||
nodes sets.String
|
||||
}
|
||||
|
||||
// ImageStateSummary provides summarized information about the state of an image.
|
||||
type ImageStateSummary struct {
|
||||
// Size of the image
|
||||
Size int64
|
||||
// Used to track how many nodes have this image
|
||||
NumNodes int
|
||||
}
|
||||
|
||||
// createImageStateSummary returns a summarizing snapshot of the given image's state.
|
||||
func (cache *schedulerCache) createImageStateSummary(state *imageState) *ImageStateSummary {
|
||||
return &ImageStateSummary{
|
||||
Size: state.size,
|
||||
NumNodes: len(state.nodes),
|
||||
}
|
||||
}
|
||||
|
||||
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
|
||||
return &schedulerCache{
|
||||
ttl: ttl,
|
||||
|
@ -79,6 +105,7 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
|
|||
assumedPods: make(map[string]bool),
|
||||
podStates: make(map[string]*podState),
|
||||
pdbs: make(map[string]*policy.PodDisruptionBudget),
|
||||
imageStates: make(map[string]*imageState),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,6 +140,7 @@ func (cache *schedulerCache) Snapshot() *Snapshot {
|
|||
func (cache *schedulerCache) UpdateNodeNameToInfoMap(nodeNameToInfo map[string]*NodeInfo) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
for name, info := range cache.nodes {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && info.TransientInfo != nil {
|
||||
// Transient scheduler info is reset here.
|
||||
|
@ -394,7 +422,11 @@ func (cache *schedulerCache) AddNode(node *v1.Node) error {
|
|||
if !ok {
|
||||
n = NewNodeInfo()
|
||||
cache.nodes[node.Name] = n
|
||||
} else {
|
||||
cache.removeNodeImageStates(n.node)
|
||||
}
|
||||
|
||||
cache.addNodeImageStates(node, n)
|
||||
return n.SetNode(node)
|
||||
}
|
||||
|
||||
|
@ -406,7 +438,11 @@ func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) error {
|
|||
if !ok {
|
||||
n = NewNodeInfo()
|
||||
cache.nodes[newNode.Name] = n
|
||||
} else {
|
||||
cache.removeNodeImageStates(n.node)
|
||||
}
|
||||
|
||||
cache.addNodeImageStates(newNode, n)
|
||||
return n.SetNode(newNode)
|
||||
}
|
||||
|
||||
|
@ -425,9 +461,62 @@ func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
|
|||
if len(n.pods) == 0 && n.node == nil {
|
||||
delete(cache.nodes, node.Name)
|
||||
}
|
||||
|
||||
cache.removeNodeImageStates(node)
|
||||
return nil
|
||||
}
|
||||
|
||||
// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
|
||||
// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
|
||||
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *NodeInfo) {
|
||||
newSum := make(map[string]*ImageStateSummary)
|
||||
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
// update the entry in imageStates
|
||||
state, ok := cache.imageStates[name]
|
||||
if !ok {
|
||||
state = &imageState{
|
||||
size: image.SizeBytes,
|
||||
nodes: sets.NewString(node.Name),
|
||||
}
|
||||
cache.imageStates[name] = state
|
||||
} else {
|
||||
state.nodes.Insert(node.Name)
|
||||
}
|
||||
// create the imageStateSummary for this image
|
||||
if _, ok := newSum[name]; !ok {
|
||||
newSum[name] = cache.createImageStateSummary(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
nodeInfo.imageStates = newSum
|
||||
}
|
||||
|
||||
// removeNodeImageStates removes the given node record from image entries having the node
|
||||
// in imageStates cache. After the removal, if any image becomes free, i.e., the image
|
||||
// is no longer available on any node, the image entry will be removed from imageStates.
|
||||
func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
|
||||
if node == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
state, ok := cache.imageStates[name]
|
||||
if ok {
|
||||
state.nodes.Delete(node.Name)
|
||||
}
|
||||
if len(state.nodes) == 0 {
|
||||
// Remove the unused image to make sure the length of
|
||||
// imageStates represents the total number of different
|
||||
// images on all nodes
|
||||
delete(cache.imageStates, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cache *schedulerCache) AddPDB(pdb *policy.PodDisruptionBudget) error {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
|
|
@ -108,7 +108,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[1], testPods[2]},
|
||||
|
@ -125,7 +125,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1], testPods[2]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}, { // test non-zero request
|
||||
pods: []*v1.Pod{testPods[3]},
|
||||
|
@ -142,7 +142,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[3]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[4]},
|
||||
|
@ -160,7 +160,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[4]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[4], testPods[5]},
|
||||
|
@ -178,7 +178,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[4], testPods[5]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}, {
|
||||
pods: []*v1.Pod{testPods[6]},
|
||||
|
@ -195,7 +195,7 @@ func TestAssumePodScheduled(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[6]},
|
||||
usedPorts: newHostPortInfoBuilder().build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -275,7 +275,7 @@ func TestExpirePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}}
|
||||
|
||||
|
@ -328,7 +328,7 @@ func TestAddPodWillConfirm(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}}
|
||||
|
||||
|
@ -425,7 +425,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{updatedPod.DeepCopy()},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
@ -481,7 +481,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{basePod},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}}
|
||||
|
||||
|
@ -537,7 +537,7 @@ func TestUpdatePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
}, {
|
||||
requestedResource: &Resource{
|
||||
MilliCPU: 100,
|
||||
|
@ -551,7 +551,7 @@ func TestUpdatePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
}},
|
||||
}}
|
||||
|
||||
|
@ -669,7 +669,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[1]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
}, {
|
||||
requestedResource: &Resource{
|
||||
MilliCPU: 100,
|
||||
|
@ -683,7 +683,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{testPods[0]},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
}},
|
||||
}}
|
||||
|
||||
|
@ -761,7 +761,7 @@ func TestEphemeralStorageResource(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{podE},
|
||||
usedPorts: schedutil.HostPortInfo{},
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -808,7 +808,7 @@ func TestRemovePod(t *testing.T) {
|
|||
allocatableResource: &Resource{},
|
||||
pods: []*v1.Pod{basePod},
|
||||
usedPorts: newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(),
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
},
|
||||
}}
|
||||
|
||||
|
|
|
@ -58,9 +58,10 @@ type NodeInfo struct {
|
|||
taints []v1.Taint
|
||||
taintsErr error
|
||||
|
||||
// This is a map from image name to image size, also for checking image existence on the node
|
||||
// Cache it here to avoid rebuilding the map during scheduling, e.g., in image_locality.go
|
||||
imageSizes map[string]int64
|
||||
// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
|
||||
// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
|
||||
// state information.
|
||||
imageStates map[string]*ImageStateSummary
|
||||
|
||||
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
|
||||
// scheduling cycle.
|
||||
|
@ -261,7 +262,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
|
|||
TransientInfo: newTransientSchedulerInfo(),
|
||||
generation: nextGeneration(),
|
||||
usedPorts: make(util.HostPortInfo),
|
||||
imageSizes: make(map[string]int64),
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
}
|
||||
for _, pod := range pods {
|
||||
ni.AddPod(pod)
|
||||
|
@ -293,12 +294,12 @@ func (n *NodeInfo) UsedPorts() util.HostPortInfo {
|
|||
return n.usedPorts
|
||||
}
|
||||
|
||||
// ImageSizes returns the image size information on this node.
|
||||
func (n *NodeInfo) ImageSizes() map[string]int64 {
|
||||
// ImageStates returns the state information of all images.
|
||||
func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return n.imageSizes
|
||||
return n.imageStates
|
||||
}
|
||||
|
||||
// PodsWithAffinity return all pods with (anti)affinity constraints on this node.
|
||||
|
@ -392,7 +393,7 @@ func (n *NodeInfo) Clone() *NodeInfo {
|
|||
diskPressureCondition: n.diskPressureCondition,
|
||||
pidPressureCondition: n.pidPressureCondition,
|
||||
usedPorts: make(util.HostPortInfo),
|
||||
imageSizes: n.imageSizes,
|
||||
imageStates: make(map[string]*ImageStateSummary),
|
||||
generation: n.generation,
|
||||
}
|
||||
if len(n.pods) > 0 {
|
||||
|
@ -547,17 +548,6 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *NodeInfo) updateImageSizes() {
|
||||
node := n.Node()
|
||||
imageSizes := make(map[string]int64)
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
imageSizes[name] = image.SizeBytes
|
||||
}
|
||||
}
|
||||
n.imageSizes = imageSizes
|
||||
}
|
||||
|
||||
// SetNode sets the overall node information.
|
||||
func (n *NodeInfo) SetNode(node *v1.Node) error {
|
||||
n.node = node
|
||||
|
@ -579,7 +569,6 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
|
|||
}
|
||||
}
|
||||
n.TransientInfo = newTransientSchedulerInfo()
|
||||
n.updateImageSizes()
|
||||
n.generation = nextGeneration()
|
||||
return nil
|
||||
}
|
||||
|
@ -596,6 +585,7 @@ func (n *NodeInfo) RemoveNode(node *v1.Node) error {
|
|||
n.memoryPressureCondition = v1.ConditionUnknown
|
||||
n.diskPressureCondition = v1.ConditionUnknown
|
||||
n.pidPressureCondition = v1.ConditionUnknown
|
||||
n.imageStates = make(map[string]*ImageStateSummary)
|
||||
n.generation = nextGeneration()
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
"k8s.io/kubernetes/pkg/util/parsers"
|
||||
)
|
||||
|
||||
func TestNewResource(t *testing.T) {
|
||||
|
@ -241,46 +240,6 @@ func TestSetMaxResource(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestImageSizes(t *testing.T) {
|
||||
ni := fakeNodeInfo()
|
||||
ni.node = &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-node",
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Images: []v1.ContainerImage{
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/10:" + parsers.DefaultImageTag,
|
||||
"gcr.io/10:v1",
|
||||
},
|
||||
SizeBytes: int64(10 * 1024 * 1024),
|
||||
},
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/50:" + parsers.DefaultImageTag,
|
||||
"gcr.io/50:v1",
|
||||
},
|
||||
SizeBytes: int64(50 * 1024 * 1024),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ni.updateImageSizes()
|
||||
expected := map[string]int64{
|
||||
"gcr.io/10:" + parsers.DefaultImageTag: 10 * 1024 * 1024,
|
||||
"gcr.io/10:v1": 10 * 1024 * 1024,
|
||||
"gcr.io/50:" + parsers.DefaultImageTag: 50 * 1024 * 1024,
|
||||
"gcr.io/50:v1": 50 * 1024 * 1024,
|
||||
}
|
||||
|
||||
imageSizes := ni.ImageSizes()
|
||||
if !reflect.DeepEqual(expected, imageSizes) {
|
||||
t.Errorf("expected: %#v, got: %#v", expected, imageSizes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewNodeInfo(t *testing.T) {
|
||||
nodeName := "test-node"
|
||||
pods := []*v1.Pod{
|
||||
|
@ -312,7 +271,7 @@ func TestNewNodeInfo(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -401,9 +360,7 @@ func TestNodeInfoClone(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{
|
||||
"gcr.io/10": 10 * 1024 * 1024,
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -473,9 +430,7 @@ func TestNodeInfoClone(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{
|
||||
"gcr.io/10": 10 * 1024 * 1024,
|
||||
},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -633,7 +588,7 @@ func TestNodeInfoAddPod(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -752,7 +707,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -868,7 +823,7 @@ func TestNodeInfoRemovePod(t *testing.T) {
|
|||
{Protocol: "TCP", Port: 8080}: {},
|
||||
},
|
||||
},
|
||||
imageSizes: map[string]int64{},
|
||||
imageStates: map[string]*ImageStateSummary{},
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
|
@ -16,7 +16,10 @@ limitations under the License.
|
|||
|
||||
package cache
|
||||
|
||||
import "k8s.io/api/core/v1"
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
// CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names
|
||||
// and the values are the aggregated information for that node.
|
||||
|
@ -29,11 +32,47 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI
|
|||
}
|
||||
nodeNameToInfo[nodeName].AddPod(pod)
|
||||
}
|
||||
imageExistenceMap := createImageExistenceMap(nodes)
|
||||
|
||||
for _, node := range nodes {
|
||||
if _, ok := nodeNameToInfo[node.Name]; !ok {
|
||||
nodeNameToInfo[node.Name] = NewNodeInfo()
|
||||
}
|
||||
nodeNameToInfo[node.Name].SetNode(node)
|
||||
nodeInfo := nodeNameToInfo[node.Name]
|
||||
nodeInfo.SetNode(node)
|
||||
nodeInfo.imageStates = getNodeImageStates(node, imageExistenceMap)
|
||||
}
|
||||
return nodeNameToInfo
|
||||
}
|
||||
|
||||
// getNodeImageStates returns the given node's image states based on the given imageExistence map.
|
||||
func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*ImageStateSummary {
|
||||
imageStates := make(map[string]*ImageStateSummary)
|
||||
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
imageStates[name] = &ImageStateSummary{
|
||||
Size: image.SizeBytes,
|
||||
NumNodes: len(imageExistenceMap[name]),
|
||||
}
|
||||
}
|
||||
}
|
||||
return imageStates
|
||||
}
|
||||
|
||||
// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names.
|
||||
func createImageExistenceMap(nodes []*v1.Node) map[string]sets.String {
|
||||
imageExistenceMap := make(map[string]sets.String)
|
||||
for _, node := range nodes {
|
||||
for _, image := range node.Status.Images {
|
||||
for _, name := range image.Names {
|
||||
if _, ok := imageExistenceMap[name]; !ok {
|
||||
imageExistenceMap[name] = sets.NewString(node.Name)
|
||||
} else {
|
||||
imageExistenceMap[name].Insert(node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return imageExistenceMap
|
||||
}
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
const mb int64 = 1024 * 1024
|
||||
|
||||
func TestGetNodeImageStates(t *testing.T) {
|
||||
tests := []struct {
|
||||
node *v1.Node
|
||||
imageExistenceMap map[string]sets.String
|
||||
expected map[string]*ImageStateSummary
|
||||
}{
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "node-0"},
|
||||
Status: v1.NodeStatus{
|
||||
Images: []v1.ContainerImage{
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/10:v1",
|
||||
},
|
||||
SizeBytes: int64(10 * mb),
|
||||
},
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/200:v1",
|
||||
},
|
||||
SizeBytes: int64(200 * mb),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
imageExistenceMap: map[string]sets.String{
|
||||
"gcr.io/10:v1": sets.NewString("node-0", "node-1"),
|
||||
"gcr.io/200:v1": sets.NewString("node-0"),
|
||||
},
|
||||
expected: map[string]*ImageStateSummary{
|
||||
"gcr.io/10:v1": {
|
||||
Size: int64(10 * mb),
|
||||
NumNodes: 2,
|
||||
},
|
||||
"gcr.io/200:v1": {
|
||||
Size: int64(200 * mb),
|
||||
NumNodes: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
imageStates := getNodeImageStates(test.node, test.imageExistenceMap)
|
||||
if !reflect.DeepEqual(test.expected, imageStates) {
|
||||
t.Errorf("expected: %#v, got: %#v", test.expected, imageStates)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateImageExistenceMap(t *testing.T) {
|
||||
tests := []struct {
|
||||
nodes []*v1.Node
|
||||
expected map[string]sets.String
|
||||
}{
|
||||
{
|
||||
nodes: []*v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "node-0"},
|
||||
Status: v1.NodeStatus{
|
||||
Images: []v1.ContainerImage{
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/10:v1",
|
||||
},
|
||||
SizeBytes: int64(10 * mb),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "node-1"},
|
||||
Status: v1.NodeStatus{
|
||||
Images: []v1.ContainerImage{
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/10:v1",
|
||||
},
|
||||
SizeBytes: int64(10 * mb),
|
||||
},
|
||||
{
|
||||
Names: []string{
|
||||
"gcr.io/200:v1",
|
||||
},
|
||||
SizeBytes: int64(200 * mb),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: map[string]sets.String{
|
||||
"gcr.io/10:v1": sets.NewString("node-0", "node-1"),
|
||||
"gcr.io/200:v1": sets.NewString("node-1"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
imageMap := createImageExistenceMap(test.nodes)
|
||||
if !reflect.DeepEqual(test.expected, imageMap) {
|
||||
t.Errorf("expected: %#v, got: %#v", test.expected, imageMap)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue