diff --git a/pkg/scheduler/algorithm/priorities/image_locality.go b/pkg/scheduler/algorithm/priorities/image_locality.go index 00820f2b4e..041e52d4fc 100644 --- a/pkg/scheduler/algorithm/priorities/image_locality.go +++ b/pkg/scheduler/algorithm/priorities/image_locality.go @@ -26,11 +26,12 @@ import ( "k8s.io/kubernetes/pkg/util/parsers" ) -// This is a reasonable size range of all container images. 90%ile of images on dockerhub drops into this range. +// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for +// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range. const ( - mb int64 = 1024 * 1024 - minImgSize int64 = 23 * mb - maxImgSize int64 = 1000 * mb + mb int64 = 1024 * 1024 + minThreshold int64 = 23 * mb + maxThreshold int64 = 1000 * mb ) // ImageLocalityPriorityMap is a priority function that favors nodes that already have requested pod container's images. @@ -44,44 +45,55 @@ func ImageLocalityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *scheduler return schedulerapi.HostPriority{}, fmt.Errorf("node not found") } - sumSize := totalImageSize(nodeInfo, pod.Spec.Containers) + var score int + if priorityMeta, ok := meta.(*priorityMetadata); ok { + score = calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, priorityMeta.totalNumNodes)) + } else { + // if we are not able to parse priority meta data, skip this priority + score = 0 + } return schedulerapi.HostPriority{ Host: node.Name, - Score: calculateScoreFromSize(sumSize), + Score: score, }, nil } -// calculateScoreFromSize calculates the priority of a node. sumSize is sum size of requested images on this node. -// 1. Split image size range into 10 buckets. -// 2. Decide the priority of a given sumSize based on which bucket it belongs to. -func calculateScoreFromSize(sumSize int64) int { - switch { - case sumSize == 0 || sumSize < minImgSize: - // 0 means none of the images required by this pod are present on this - // node or the total size of the images present are too small to be taken into further consideration. - return 0 - - case sumSize >= maxImgSize: - // If existing images' total size is larger than max, just make it highest priority. - return schedulerapi.MaxPriority +// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's +// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores. +func calculatePriority(sumScores int64) int { + if sumScores < minThreshold { + sumScores = minThreshold + } else if sumScores > maxThreshold { + sumScores = maxThreshold } - return int((int64(schedulerapi.MaxPriority) * (sumSize - minImgSize) / (maxImgSize - minImgSize)) + 1) + return int(int64(schedulerapi.MaxPriority) * (sumScores - minThreshold) / (maxThreshold - minThreshold)) } -// totalImageSize returns the total image size of all the containers that are already on the node. -func totalImageSize(nodeInfo *schedulercache.NodeInfo, containers []v1.Container) int64 { - var total int64 +// sumImageScores returns the sum of image scores of all the containers that are already on the node. +// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate +// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers. +func sumImageScores(nodeInfo *schedulercache.NodeInfo, containers []v1.Container, totalNumNodes int) int64 { + var sum int64 + imageStates := nodeInfo.ImageStates() - imageSizes := nodeInfo.ImageSizes() for _, container := range containers { - if size, ok := imageSizes[normalizedImageName(container.Image)]; ok { - total += size + if state, ok := imageStates[normalizedImageName(container.Image)]; ok { + sum += scaledImageScore(state, totalNumNodes) } } - return total + return sum +} + +// scaledImageScore returns an adaptively scaled score for the given state of an image. +// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to. +// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or +// a few nodes due to image locality. +func scaledImageScore(imageState *schedulercache.ImageStateSummary, totalNumNodes int) int64 { + spread := float64(imageState.NumNodes) / float64(totalNumNodes) + return int64(float64(imageState.Size) * spread) } // normalizedImageName returns the CRI compliant name for a given image. diff --git a/pkg/scheduler/algorithm/priorities/image_locality_test.go b/pkg/scheduler/algorithm/priorities/image_locality_test.go index a8f76a03d3..58dc02a233 100644 --- a/pkg/scheduler/algorithm/priorities/image_locality_test.go +++ b/pkg/scheduler/algorithm/priorities/image_locality_test.go @@ -42,13 +42,13 @@ func TestImageLocalityPriority(t *testing.T) { }, } - test40140 := v1.PodSpec{ + test40300 := v1.PodSpec{ Containers: []v1.Container{ { Image: "gcr.io/40", }, { - Image: "gcr.io/140", + Image: "gcr.io/300", }, }, } @@ -64,7 +64,7 @@ func TestImageLocalityPriority(t *testing.T) { }, } - node401402000 := v1.NodeStatus{ + node403002000 := v1.NodeStatus{ Images: []v1.ContainerImage{ { Names: []string{ @@ -76,10 +76,10 @@ func TestImageLocalityPriority(t *testing.T) { }, { Names: []string{ - "gcr.io/140:" + parsers.DefaultImageTag, - "gcr.io/140:v1", + "gcr.io/300:" + parsers.DefaultImageTag, + "gcr.io/300:v1", }, - SizeBytes: int64(140 * mb), + SizeBytes: int64(300 * mb), }, { Names: []string{ @@ -120,29 +120,29 @@ func TestImageLocalityPriority(t *testing.T) { // Node1 // Image: gcr.io/40:latest 40MB - // Score: (40M-23M)/97.7M + 1 = 1 + // Score: 0 (40M/2 < 23M, min-threshold) // Node2 // Image: gcr.io/250:latest 250MB - // Score: (250M-23M)/97.7M + 1 = 3 + // Score: 10 * (250M/2 - 23M)/(1000M - 23M) = 1 pod: &v1.Pod{Spec: test40250}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, - expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 3}}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}}, name: "two images spread on two nodes, prefer the larger image one", }, { - // Pod: gcr.io/40 gcr.io/140 + // Pod: gcr.io/40 gcr.io/300 // Node1 - // Image: gcr.io/40:latest 40MB, gcr.io/140:latest 140MB - // Score: (40M+140M-23M)/97.7M + 1 = 2 + // Image: gcr.io/40:latest 40MB, gcr.io/300:latest 300MB + // Score: 10 * ((40M + 300M)/2 - 23M)/(1000M - 23M) = 1 // Node2 // Image: not present // Score: 0 - pod: &v1.Pod{Spec: test40140}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, - expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 2}, {Host: "machine2", Score: 0}}, + pod: &v1.Pod{Spec: test40300}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}}, name: "two images on one node, prefer this node", }, { @@ -150,13 +150,13 @@ func TestImageLocalityPriority(t *testing.T) { // Node1 // Image: gcr.io/2000:latest 2000MB - // Score: 2000 > max score = 10 + // Score: 10 (2000M/2 >= 1000M, max-threshold) // Node2 // Image: gcr.io/10:latest 10MB - // Score: 10 < min score = 0 + // Score: 0 (10M/2 < 23M, min-threshold) pod: &v1.Pod{Spec: testMinMax}, - nodes: []*v1.Node{makeImageNode("machine1", node401402000), makeImageNode("machine2", node25010)}, + nodes: []*v1.Node{makeImageNode("machine1", node403002000), makeImageNode("machine2", node25010)}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: schedulerapi.MaxPriority}, {Host: "machine2", Score: 0}}, name: "if exceed limit, use limit", }, @@ -165,7 +165,7 @@ func TestImageLocalityPriority(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, test.nodes) - list, err := priorityFunction(ImageLocalityPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) + list, err := priorityFunction(ImageLocalityPriorityMap, nil, &priorityMetadata{totalNumNodes: len(test.nodes)})(test.pod, nodeNameToInfo, test.nodes) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/scheduler/algorithm/priorities/metadata.go b/pkg/scheduler/algorithm/priorities/metadata.go index 3cb0f2f442..fe824b91c3 100644 --- a/pkg/scheduler/algorithm/priorities/metadata.go +++ b/pkg/scheduler/algorithm/priorities/metadata.go @@ -52,6 +52,7 @@ type priorityMetadata struct { podSelectors []labels.Selector controllerRef *metav1.OwnerReference podFirstServiceSelector labels.Selector + totalNumNodes int } // PriorityMetadata is a PriorityMetadataProducer. Node info can be nil. @@ -67,6 +68,7 @@ func (pmf *PriorityMetadataFactory) PriorityMetadata(pod *v1.Pod, nodeNameToInfo podSelectors: getSelectors(pod, pmf.serviceLister, pmf.controllerLister, pmf.replicaSetLister, pmf.statefulSetLister), controllerRef: priorityutil.GetControllerRef(pod), podFirstServiceSelector: getFirstServiceSelector(pod, pmf.serviceLister), + totalNumNodes: len(nodeNameToInfo), } } diff --git a/pkg/scheduler/cache/util.go b/pkg/scheduler/cache/util.go index c3037dcc3d..5a252b6402 100644 --- a/pkg/scheduler/cache/util.go +++ b/pkg/scheduler/cache/util.go @@ -16,7 +16,10 @@ limitations under the License. package cache -import "k8s.io/api/core/v1" +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" +) // CreateNodeNameToInfoMap obtains a list of pods and pivots that list into a map where the keys are node names // and the values are the aggregated information for that node. @@ -29,11 +32,47 @@ func CreateNodeNameToInfoMap(pods []*v1.Pod, nodes []*v1.Node) map[string]*NodeI } nodeNameToInfo[nodeName].AddPod(pod) } + imageExistenceMap := createImageExistenceMap(nodes) + for _, node := range nodes { if _, ok := nodeNameToInfo[node.Name]; !ok { nodeNameToInfo[node.Name] = NewNodeInfo() } - nodeNameToInfo[node.Name].SetNode(node) + nodeInfo := nodeNameToInfo[node.Name] + nodeInfo.SetNode(node) + nodeInfo.imageStates = getNodeImageStates(node, imageExistenceMap) } return nodeNameToInfo } + +// getNodeImageStates returns the given node's image states based on the given imageExistence map. +func getNodeImageStates(node *v1.Node, imageExistenceMap map[string]sets.String) map[string]*ImageStateSummary { + imageStates := make(map[string]*ImageStateSummary) + + for _, image := range node.Status.Images { + for _, name := range image.Names { + imageStates[name] = &ImageStateSummary{ + Size: image.SizeBytes, + NumNodes: len(imageExistenceMap[name]), + } + } + } + return imageStates +} + +// createImageExistenceMap returns a map recording on which nodes the images exist, keyed by the images' names. +func createImageExistenceMap(nodes []*v1.Node) map[string]sets.String { + imageExistenceMap := make(map[string]sets.String) + for _, node := range nodes { + for _, image := range node.Status.Images { + for _, name := range image.Names { + if _, ok := imageExistenceMap[name]; !ok { + imageExistenceMap[name] = sets.NewString(node.Name) + } else { + imageExistenceMap[name].Insert(node.Name) + } + } + } + } + return imageExistenceMap +} diff --git a/pkg/scheduler/cache/util_test.go b/pkg/scheduler/cache/util_test.go new file mode 100644 index 0000000000..7b0b5f111c --- /dev/null +++ b/pkg/scheduler/cache/util_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +const mb int64 = 1024 * 1024 + +func TestGetNodeImageStates(t *testing.T) { + tests := []struct { + node *v1.Node + imageExistenceMap map[string]sets.String + expected map[string]*ImageStateSummary + }{ + { + node: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-0"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + { + Names: []string{ + "gcr.io/200:v1", + }, + SizeBytes: int64(200 * mb), + }, + }, + }, + }, + imageExistenceMap: map[string]sets.String{ + "gcr.io/10:v1": sets.NewString("node-0", "node-1"), + "gcr.io/200:v1": sets.NewString("node-0"), + }, + expected: map[string]*ImageStateSummary{ + "gcr.io/10:v1": { + Size: int64(10 * mb), + NumNodes: 2, + }, + "gcr.io/200:v1": { + Size: int64(200 * mb), + NumNodes: 1, + }, + }, + }, + } + + for _, test := range tests { + imageStates := getNodeImageStates(test.node, test.imageExistenceMap) + if !reflect.DeepEqual(test.expected, imageStates) { + t.Errorf("expected: %#v, got: %#v", test.expected, imageStates) + } + } +} + +func TestCreateImageExistenceMap(t *testing.T) { + tests := []struct { + nodes []*v1.Node + expected map[string]sets.String + }{ + { + nodes: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node-0"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Status: v1.NodeStatus{ + Images: []v1.ContainerImage{ + { + Names: []string{ + "gcr.io/10:v1", + }, + SizeBytes: int64(10 * mb), + }, + { + Names: []string{ + "gcr.io/200:v1", + }, + SizeBytes: int64(200 * mb), + }, + }, + }, + }, + }, + expected: map[string]sets.String{ + "gcr.io/10:v1": sets.NewString("node-0", "node-1"), + "gcr.io/200:v1": sets.NewString("node-1"), + }, + }, + } + + for _, test := range tests { + imageMap := createImageExistenceMap(test.nodes) + if !reflect.DeepEqual(test.expected, imageMap) { + t.Errorf("expected: %#v, got: %#v", test.expected, imageMap) + } + } +}