Merge pull request #50450 from aleksandra-malinowska/scalability-tests-clean
Add Cluster Autoscaler scalability test suite This suite is intended for manually testing Cluster Autoscaler on large clusters. It isn't supposed to be run automatically (at least for now). It can be run on Kubemark (with #50440) with the following setup: - start Kubemark with NUM_NODES=1 (as we require there to be exactly 1 replica per hollow-node replication controller in this setup) - set kubemark-master machine type manually to appropriate type for the Kubemark cluster size. Maximum Kubemark cluster size reached in test run is defined by maxNodes constant, so for maxNodes=1000, please upgrade to n1-standard-32. Adjust if modifying maxNodes. - start Cluster Autoscaler pod in the external cluster using image built from version with Kubemark cloud provider (release pending) - for grabbing metrics from ClusterAutoscaler (with #50382), add "--include-cluster-autoscaler=true" parameter in addition to regular flags for gathering components' metrics/resource usage during e2e tests
go_library(
name = "go_default_library",
name = "go_default_library",
srcs = [
srcs = [
@ -33,9 +34,11 @@ go_library(
@ -0,0 +1,458 @@
package autoscaling
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
testutils "k8s.io/kubernetes/test/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
const (
largeResizeTimeout = 10 * time.Minute
largeScaleUpTimeout = 10 * time.Minute
largeScaleDownTimeout = 20 * time.Minute
minute = 1 * time.Minute
maxNodes = 1000
type clusterPredicates struct {
nodes int
type scaleUpTestConfig struct {
initialNodes int
initialPods int
extraPods *testutils.RCConfig
expectedResult *clusterPredicates
var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() {
f := framework.NewDefaultFramework("autoscaling")
var c clientset.Interface
var nodeCount int
var coresPerNode int
var memCapacityMb int
var originalSizes map[string]int
var sum int
BeforeEach(func() {
framework.SkipUnlessProviderIs("gce", "gke", "kubemark")
c = f.ClientSet
if originalSizes == nil {
originalSizes = make(map[string]int)
sum = 0
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
size, err := framework.GroupSize(mig)
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
originalSizes[mig] = size
sum += size
framework.ExpectNoError(framework.WaitForClusterSize(c, sum, scaleUpTimeout))
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
nodeCount = len(nodes.Items)
cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
coresPerNode = int((&cpu).MilliValue() / 1000)
memCapacityMb = int((&mem).Value() / 1024 / 1024)
if framework.ProviderIs("gke") {
val, err := isAutoscalerEnabled(3)
if !val {
err = enableAutoscaler("default-pool", 3, 5)
AfterEach(func() {
By(fmt.Sprintf("Restoring initial size of the cluster"))
framework.ExpectNoError(framework.WaitForClusterSize(c, nodeCount, scaleDownTimeout))
nodes, err := c.Core().Nodes().List(metav1.ListOptions{})
s := time.Now()
for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
for _, n := range nodes.Items {
err = makeNodeSchedulable(c, &n, true)
switch err.(type) {
case CriticalAddonsOnlyError:
continue makeSchedulableLoop
glog.Infof("Made nodes schedulable again in %v", time.Now().Sub(s).String())
It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.95)
replicasPerNode := 10
additionalNodes := maxNodes - nodeCount
replicas := additionalNodes * replicasPerNode
additionalReservation := additionalNodes * perNodeReservation
// saturate cluster
reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, scaleUpTimeout)
defer reservationCleanup()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
// configure pending pods & expected scale up
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(nodeCount + additionalNodes)
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
// run test
testCleanup := simpleScaleUpTest(f, config)
defer testCleanup()
It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.95)
replicasPerNode := 10
additionalNodes1 := int(0.7 * maxNodes)
additionalNodes2 := int(0.25 * maxNodes)
replicas1 := additionalNodes1 * replicasPerNode
replicas2 := additionalNodes2 * replicasPerNode
glog.Infof("cores per node: %v", coresPerNode)
// saturate cluster
reservationCleanup := ReserveMemory(f, "some-pod", nodeCount, nodeCount*perNodeReservation, true, scaleUpTimeout)
defer reservationCleanup()
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
glog.Infof("Reserved successfully")
// configure pending pods & expected scale up #1
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
epsilon := 0.05
// run test #1
testCleanup1 := simpleScaleUpTestWithEpsilon(f, config, epsilon)
defer testCleanup1()
glog.Infof("Scaled up once")
// configure pending pods & expected scale up #2
rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
// run test #2
testCleanup2 := simpleScaleUpTestWithEpsilon(f, config2, epsilon)
defer testCleanup2()
glog.Infof("Scaled up twice")
It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() {
perNodeReservation := int(float64(memCapacityMb) * 0.7)
replicas := int(float64(maxNodes) * 0.7)
totalNodes := maxNodes
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout))
// run replicas
rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
expectedResult := createClusterPredicates(totalNodes)
config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
testCleanup := simpleScaleUpTestWithEpsilon(f, config, 0.1)
defer testCleanup()
// check if empty nodes are scaled down
func(size int) bool {
return size <= replicas+3 // leaving space for non-evictable kube-system pods
}, scaleDownTimeout))
It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() {
underutilizedReservation := int64(float64(memCapacityMb) * 0.01)
fullReservation := int64(float64(memCapacityMb) * 0.8)
perNodeReplicas := 10
totalNodes := maxNodes
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout))
// annotate all nodes with no-scale-down
ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
nodes, err := f.ClientSet.Core().Nodes().List(metav1.ListOptions{
FieldSelector: fields.Set{
"spec.unschedulable": "false",
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true"))
// distribute pods (using taints)
divider := int(float64(len(nodes.Items)) * 0.7)
fullNodes := nodes.Items[:divider]
underutilizedNodes := nodes.Items[divider:]
framework.ExpectNoError(makeUnschedulable(f, underutilizedNodes))
testId2 := "full"
labels2 := map[string]string{"test_id": testId2}
cleanup2, err := runReplicatedPodOnEachNodeWithCleanup(f, fullNodes, f.Namespace.Name, 1, "filling-pod", labels2, fullReservation)
defer cleanup2()
framework.ExpectNoError(makeUnschedulable(f, fullNodes))
testId := "underutilized"
labels := map[string]string{"test_id": testId}
cleanup, err := runReplicatedPodOnEachNodeWithCleanup(f, underutilizedNodes, f.Namespace.Name, perNodeReplicas, "underutilizing-pod", labels, underutilizedReservation)
defer cleanup()
framework.ExpectNoError(makeSchedulable(f, nodes.Items))
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false"))
// wait for scale down
expectedSize := int(float64(totalNodes) * 0.85)
nodesToScaleDownCount := totalNodes - expectedSize
timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool {
return size <= expectedSize
}, timeout))
It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() {
fullReservation := int(float64(memCapacityMb) * 0.9)
hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
totalNodes := maxNodes
reservedPort := 4321
// resize cluster to totalNodes
newSizes := map[string]int{
anyKey(originalSizes): totalNodes,
framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, totalNodes, largeResizeTimeout))
divider := int(float64(totalNodes) * 0.7)
fullNodesCount := divider
underutilizedNodesCount := totalNodes - fullNodesCount
By("Reserving full nodes")
// run RC1 w/o host port
cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
defer cleanup()
By("Reserving host ports on remaining nodes")
// run RC2 w/ host port
cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
defer cleanup2()
waitForAllCaPodsReadyInNamespace(f, c)
// wait and check scale down doesn't occur
By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
By("Checking if the number of nodes is as expected")
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
glog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
func makeUnschedulable(f *framework.Framework, nodes []v1.Node) error {
for _, node := range nodes {
err := makeNodeUnschedulable(f.ClientSet, &node)
if err != nil {
return err
return nil
func makeSchedulable(f *framework.Framework, nodes []v1.Node) error {
for _, node := range nodes {
err := makeNodeSchedulable(f.ClientSet, &node, false)
if err != nil {
return err
return nil
func anyKey(input map[string]int) string {
for k := range input {
return k
return ""
func simpleScaleUpTestWithEpsilon(f *framework.Framework, config *scaleUpTestConfig, epsilon float64) func() error {
// resize cluster to start size
// run rc based on config
By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
start := time.Now()
// check results
if epsilon > 0 && epsilon < 1 {
// Tolerate some number of nodes not to be created.
minExpectedNodeCount := int(float64(config.expectedResult.nodes) - epsilon*float64(config.expectedResult.nodes))
func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
} else {
framework.ExpectNoError(framework.WaitForClusterSize(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
glog.Infof("cluster is increased")
if epsilon > 0 && epsilon < 0 {
framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, int(epsilon*float64(config.extraPods.Replicas)+1)))
} else {
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, config.extraPods.Name)
func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error {
return simpleScaleUpTestWithEpsilon(f, config, 0)
func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
return &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: timeout,
Image: framework.GetPauseImageName(f.ClientSet),
Replicas: replicas,
MemRequest: int64(1024 * 1024 * megabytes / replicas),
func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
return &scaleUpTestConfig{
initialNodes: nodes,
initialPods: pods,
extraPods: extraPods,
expectedResult: expectedResult,
func createClusterPredicates(nodes int) *clusterPredicates {
return &clusterPredicates{
nodes: nodes,
func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error {
for _, node := range nodes {
oldData, err := json.Marshal(node)
if err != nil {
return err
if node.Annotations == nil {
node.Annotations = make(map[string]string)
node.Annotations[key] = value
newData, err := json.Marshal(node)
if err != nil {
return err
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
if err != nil {
return err
_, err = f.ClientSet.Core().Nodes().Patch(string(node.Name), types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
return nil
func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
By(fmt.Sprintf("Running RC which reserves host port and memory"))
request := int64(1024 * 1024 * megabytes / replicas)
config := &testutils.RCConfig{
Client: f.ClientSet,
InternalClient: f.InternalClientset,
Name: id,
Namespace: f.Namespace.Name,
Timeout: timeout,
Image: framework.GetPauseImageName(f.ClientSet),
Replicas: replicas,
HostPorts: map[string]int{"port1": port},
MemRequest: request,
err := framework.RunRC(*config)
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id)
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
glog.Infof("%s took %s", name, elapsed)
var _ = SIGDescribe("Cluster size autoscaling [Slow]", func() {
unmanagedNodes := nodeCount - status.ready
unmanagedNodes := nodeCount - status.ready
By("Schedule more pods than can fit and wait for cluster to scale-up")
By("Schedule more pods than can fit and wait for cluster to scale-up")
ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false, 1*time.Second)
ReserveMemory(f, "memory-reservation", 100, nodeCount*memCapacityMb, false, 1*time.Second)
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, "memory-reservation")
@ -719,7 +719,7 @@ func runDrainTest(f *framework.Framework, migSizes map[string]int, namespace str
numPods := len(nodes.Items) * podsPerNode
numPods := len(nodes.Items) * podsPerNode
testId := string(uuid.NewUUID()) // So that we can label and find pods
testId := string(uuid.NewUUID()) // So that we can label and find pods
labelMap := map[string]string{"test_id": testId}
labelMap := map[string]string{"test_id": testId}
framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap))
framework.ExpectNoError(runReplicatedPodOnEachNode(f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0))
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, "reschedulable-pods")
defer framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, "reschedulable-pods")
@ -907,7 +907,7 @@ func doPut(url, content string) (string, error) {
return strBody, nil
return strBody, nil
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) {
func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) func() error {
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
request := int64(1024 * 1024 * megabytes / replicas)
request := int64(1024 * 1024 * megabytes / replicas)
config := &testutils.RCConfig{
config := &testutils.RCConfig{
@ -929,9 +929,12 @@ func ReserveMemory(f *framework.Framework, id string, replicas, megabytes int, e
if expectRunning {
if expectRunning {
return func() error {
return framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, f.Namespace.Name, id)
framework.Failf("Failed to reserve memory within timeout")
framework.Failf("Failed to reserve memory within timeout")
return nil
// WaitForClusterSize waits until the cluster size matches the given function.
// WaitForClusterSize waits until the cluster size matches the given function.
@ -961,7 +964,7 @@ func WaitForClusterSizeFuncWithUnready(c clientset.Interface, sizeFunc func(int)
glog.Infof("Cluster has reached the desired size")
glog.Infof("Cluster has reached the desired size")
return nil
return nil
glog.Infof("Waiting for cluster, current size %d, not ready nodes %d", numNodes, numNodes-numReady)
glog.Infof("Waiting for cluster with func, current size %d, not ready nodes %d", numNodes, numNodes-numReady)
return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout)
return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout)
@ -1201,7 +1204,7 @@ func buildAntiAffinity(labels map[string]string) *v1.Affinity {
// 3. for each node:
// 3. for each node:
// 3a. enable scheduling on that node
// 3a. enable scheduling on that node
// 3b. increase number of replicas in RC by podsPerNode
// 3b. increase number of replicas in RC by podsPerNode
func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string) error {
func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) error {
By("Run a pod on each node")
By("Run a pod on each node")
for _, node := range nodes {
for _, node := range nodes {
err := makeNodeUnschedulable(f.ClientSet, &node)
err := makeNodeUnschedulable(f.ClientSet, &node)
@ -1223,6 +1226,7 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa
Image: framework.GetPauseImageName(f.ClientSet),
Image: framework.GetPauseImageName(f.ClientSet),
Replicas: 0,
Replicas: 0,
Labels: labels,
Labels: labels,
MemRequest: memRequest,
err := framework.RunRC(*config)
err := framework.RunRC(*config)
if err != nil {
if err != nil {
@ -1274,6 +1278,14 @@ func runReplicatedPodOnEachNode(f *framework.Framework, nodes []v1.Node, namespa
return nil
return nil
// wrap runReplicatedPodOnEachNode to return cleanup
func runReplicatedPodOnEachNodeWithCleanup(f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) (func(), error) {
err := runReplicatedPodOnEachNode(f, nodes, namespace, podsPerNode, id, labels, memRequest)
return func() {
framework.DeleteRCAndPods(f.ClientSet, f.InternalClientset, namespace, id)
}, err
// Increase cluster size by newNodesForScaledownTests to create some unused nodes
// Increase cluster size by newNodesForScaledownTests to create some unused nodes
// that can be later removed by cluster autoscaler.
that can be later removed by cluster autoscaler.
func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int {
func manuallyIncreaseClusterSize(f *framework.Framework, originalSizes map[string]int) int {
Reference in New Issue