mirror of https://github.com/k3s-io/k3s
532 lines
19 KiB
Go
532 lines
19 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package autoscaling
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/kubernetes/test/e2e/framework"
|
|
testutils "k8s.io/kubernetes/test/utils"
|
|
imageutils "k8s.io/kubernetes/test/utils/image"
|
|
|
|
. "github.com/onsi/ginkgo"
|
|
. "github.com/onsi/gomega"
|
|
"k8s.io/klog"
|
|
)
|
|
|
|
const (
|
|
memoryReservationTimeout = 5 * time.Minute
|
|
largeResizeTimeout = 8 * time.Minute
|
|
largeScaleUpTimeout = 10 * time.Minute
|
|
largeScaleDownTimeout = 20 * time.Minute
|
|
minute = 1 * time.Minute
|
|
|
|
maxNodes = 1000
|
|
)
|
|
|
|
type clusterPredicates struct {
|
|
nodes int
|
|
}
|
|
|
|
type scaleUpTestConfig struct {
|
|
initialNodes int
|
|
initialPods int
|
|
extraPods *testutils.RCConfig
|
|
expectedResult *clusterPredicates
|
|
}
|
|
|
|
var _ = framework.KubeDescribe("Cluster size autoscaler scalability [Slow]", func() {
|
|
f := framework.NewDefaultFramework("autoscaling")
|
|
var c clientset.Interface
|
|
var nodeCount int
|
|
var coresPerNode int
|
|
var memCapacityMb int
|
|
var originalSizes map[string]int
|
|
var sum int
|
|
|
|
BeforeEach(func() {
|
|
framework.SkipUnlessProviderIs("gce", "gke", "kubemark")
|
|
|
|
// Check if Cloud Autoscaler is enabled by trying to get its ConfigMap.
|
|
_, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get("cluster-autoscaler-status", metav1.GetOptions{})
|
|
if err != nil {
|
|
framework.Skipf("test expects Cluster Autoscaler to be enabled")
|
|
}
|
|
|
|
c = f.ClientSet
|
|
if originalSizes == nil {
|
|
originalSizes = make(map[string]int)
|
|
sum = 0
|
|
for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
|
|
size, err := framework.GroupSize(mig)
|
|
framework.ExpectNoError(err)
|
|
By(fmt.Sprintf("Initial size of %s: %d", mig, size))
|
|
originalSizes[mig] = size
|
|
sum += size
|
|
}
|
|
}
|
|
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(c, sum, scaleUpTimeout))
|
|
|
|
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
|
nodeCount = len(nodes.Items)
|
|
Expect(nodeCount).NotTo(BeZero())
|
|
cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
|
|
mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
|
|
coresPerNode = int((&cpu).MilliValue() / 1000)
|
|
memCapacityMb = int((&mem).Value() / 1024 / 1024)
|
|
|
|
Expect(nodeCount).Should(Equal(sum))
|
|
|
|
if framework.ProviderIs("gke") {
|
|
val, err := isAutoscalerEnabled(3)
|
|
framework.ExpectNoError(err)
|
|
if !val {
|
|
err = enableAutoscaler("default-pool", 3, 5)
|
|
framework.ExpectNoError(err)
|
|
}
|
|
}
|
|
})
|
|
|
|
AfterEach(func() {
|
|
By(fmt.Sprintf("Restoring initial size of the cluster"))
|
|
setMigSizes(originalSizes)
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(c, nodeCount, scaleDownTimeout))
|
|
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
|
|
framework.ExpectNoError(err)
|
|
s := time.Now()
|
|
makeSchedulableLoop:
|
|
for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
|
|
for _, n := range nodes.Items {
|
|
err = makeNodeSchedulable(c, &n, true)
|
|
switch err.(type) {
|
|
case CriticalAddonsOnlyError:
|
|
continue makeSchedulableLoop
|
|
default:
|
|
framework.ExpectNoError(err)
|
|
}
|
|
}
|
|
break
|
|
}
|
|
klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
|
|
})
|
|
|
|
It("should scale up at all [Feature:ClusterAutoscalerScalability1]", func() {
|
|
perNodeReservation := int(float64(memCapacityMb) * 0.95)
|
|
replicasPerNode := 10
|
|
|
|
additionalNodes := maxNodes - nodeCount
|
|
replicas := additionalNodes * replicasPerNode
|
|
additionalReservation := additionalNodes * perNodeReservation
|
|
|
|
// saturate cluster
|
|
reservationCleanup := ReserveMemory(f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
|
|
defer reservationCleanup()
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
|
|
|
|
// configure pending pods & expected scale up
|
|
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
|
|
expectedResult := createClusterPredicates(nodeCount + additionalNodes)
|
|
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
|
|
|
|
// run test
|
|
testCleanup := simpleScaleUpTest(f, config)
|
|
defer testCleanup()
|
|
})
|
|
|
|
It("should scale up twice [Feature:ClusterAutoscalerScalability2]", func() {
|
|
perNodeReservation := int(float64(memCapacityMb) * 0.95)
|
|
replicasPerNode := 10
|
|
additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
|
|
additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
|
|
if additionalNodes1+additionalNodes2 > maxNodes {
|
|
additionalNodes2 = maxNodes - additionalNodes1
|
|
}
|
|
|
|
replicas1 := additionalNodes1 * replicasPerNode
|
|
replicas2 := additionalNodes2 * replicasPerNode
|
|
|
|
klog.Infof("cores per node: %v", coresPerNode)
|
|
|
|
// saturate cluster
|
|
initialReplicas := nodeCount
|
|
reservationCleanup := ReserveMemory(f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
|
|
defer reservationCleanup()
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
|
|
|
|
klog.Infof("Reserved successfully")
|
|
|
|
// configure pending pods & expected scale up #1
|
|
rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
|
|
expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
|
|
config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
|
|
|
|
// run test #1
|
|
tolerateUnreadyNodes := additionalNodes1 / 20
|
|
tolerateUnreadyPods := (initialReplicas + replicas1) / 20
|
|
testCleanup1 := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
|
|
defer testCleanup1()
|
|
|
|
klog.Infof("Scaled up once")
|
|
|
|
// configure pending pods & expected scale up #2
|
|
rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
|
|
expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
|
|
config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
|
|
|
|
// run test #2
|
|
tolerateUnreadyNodes = maxNodes / 20
|
|
tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
|
|
testCleanup2 := simpleScaleUpTestWithTolerance(f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
|
|
defer testCleanup2()
|
|
|
|
klog.Infof("Scaled up twice")
|
|
})
|
|
|
|
It("should scale down empty nodes [Feature:ClusterAutoscalerScalability3]", func() {
|
|
perNodeReservation := int(float64(memCapacityMb) * 0.7)
|
|
replicas := int(math.Ceil(maxNodes * 0.7))
|
|
totalNodes := maxNodes
|
|
|
|
// resize cluster to totalNodes
|
|
newSizes := map[string]int{
|
|
anyKey(originalSizes): totalNodes,
|
|
}
|
|
setMigSizes(newSizes)
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
|
|
|
|
// run replicas
|
|
rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
|
|
expectedResult := createClusterPredicates(totalNodes)
|
|
config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
|
|
tolerateUnreadyNodes := totalNodes / 10
|
|
tolerateUnreadyPods := replicas / 10
|
|
testCleanup := simpleScaleUpTestWithTolerance(f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
|
|
defer testCleanup()
|
|
|
|
// check if empty nodes are scaled down
|
|
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
|
|
func(size int) bool {
|
|
return size <= replicas+3 // leaving space for non-evictable kube-system pods
|
|
}, scaleDownTimeout))
|
|
})
|
|
|
|
It("should scale down underutilized nodes [Feature:ClusterAutoscalerScalability4]", func() {
|
|
perPodReservation := int(float64(memCapacityMb) * 0.01)
|
|
// underutilizedNodes are 10% full
|
|
underutilizedPerNodeReplicas := 10
|
|
// fullNodes are 70% full
|
|
fullPerNodeReplicas := 70
|
|
totalNodes := maxNodes
|
|
underutilizedRatio := 0.3
|
|
maxDelta := 30
|
|
|
|
// resize cluster to totalNodes
|
|
newSizes := map[string]int{
|
|
anyKey(originalSizes): totalNodes,
|
|
}
|
|
setMigSizes(newSizes)
|
|
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
|
|
|
|
// annotate all nodes with no-scale-down
|
|
ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
|
|
|
|
nodes, err := f.ClientSet.CoreV1().Nodes().List(metav1.ListOptions{
|
|
FieldSelector: fields.Set{
|
|
"spec.unschedulable": "false",
|
|
}.AsSelector().String(),
|
|
})
|
|
|
|
framework.ExpectNoError(err)
|
|
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "true"))
|
|
|
|
// distribute pods using replication controllers taking up space that should
|
|
// be empty after pods are distributed
|
|
underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
|
|
fullNodesNum := totalNodes - underutilizedNodesNum
|
|
|
|
podDistribution := []podBatch{
|
|
{numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
|
|
{numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
|
|
|
|
cleanup := distributeLoad(f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
|
|
int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
|
|
defer cleanup()
|
|
|
|
// enable scale down again
|
|
framework.ExpectNoError(addAnnotation(f, nodes.Items, ScaleDownDisabledKey, "false"))
|
|
|
|
// wait for scale down to start. Node deletion takes a long time, so we just
|
|
// wait for maximum of 30 nodes deleted
|
|
nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
|
|
if nodesToScaleDownCount > maxDelta {
|
|
nodesToScaleDownCount = maxDelta
|
|
}
|
|
expectedSize := totalNodes - nodesToScaleDownCount
|
|
timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
|
|
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet, func(size int) bool {
|
|
return size <= expectedSize
|
|
}, timeout))
|
|
})
|
|
|
|
It("shouldn't scale down with underutilized nodes due to host port conflicts [Feature:ClusterAutoscalerScalability5]", func() {
|
|
fullReservation := int(float64(memCapacityMb) * 0.9)
|
|
hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
|
|
totalNodes := maxNodes
|
|
reservedPort := 4321
|
|
|
|
// resize cluster to totalNodes
|
|
newSizes := map[string]int{
|
|
anyKey(originalSizes): totalNodes,
|
|
}
|
|
setMigSizes(newSizes)
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, totalNodes, largeResizeTimeout))
|
|
divider := int(float64(totalNodes) * 0.7)
|
|
fullNodesCount := divider
|
|
underutilizedNodesCount := totalNodes - fullNodesCount
|
|
|
|
By("Reserving full nodes")
|
|
// run RC1 w/o host port
|
|
cleanup := ReserveMemory(f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
|
|
defer cleanup()
|
|
|
|
By("Reserving host ports on remaining nodes")
|
|
// run RC2 w/ host port
|
|
cleanup2 := createHostPortPodsWithMemory(f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
|
|
defer cleanup2()
|
|
|
|
waitForAllCaPodsReadyInNamespace(f, c)
|
|
// wait and check scale down doesn't occur
|
|
By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
|
|
time.Sleep(scaleDownTimeout)
|
|
|
|
By("Checking if the number of nodes is as expected")
|
|
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
|
klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
|
|
Expect(len(nodes.Items)).Should(Equal(totalNodes))
|
|
})
|
|
|
|
Specify("CA ignores unschedulable pods while scheduling schedulable pods [Feature:ClusterAutoscalerScalability6]", func() {
|
|
// Start a number of pods saturating existing nodes.
|
|
perNodeReservation := int(float64(memCapacityMb) * 0.80)
|
|
replicasPerNode := 10
|
|
initialPodReplicas := nodeCount * replicasPerNode
|
|
initialPodsTotalMemory := nodeCount * perNodeReservation
|
|
reservationCleanup := ReserveMemory(f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true /* wait for pods to run */, memoryReservationTimeout)
|
|
defer reservationCleanup()
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, c))
|
|
|
|
// Configure a number of unschedulable pods.
|
|
unschedulableMemReservation := memCapacityMb * 2
|
|
unschedulablePodReplicas := 1000
|
|
totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
|
|
timeToWait := 5 * time.Minute
|
|
podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
|
|
framework.RunRC(*podsConfig) // Ignore error (it will occur because pods are unschedulable)
|
|
defer framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, podsConfig.Name)
|
|
|
|
// Ensure that no new nodes have been added so far.
|
|
Expect(framework.NumberOfReadyNodes(f.ClientSet)).To(Equal(nodeCount))
|
|
|
|
// Start a number of schedulable pods to ensure CA reacts.
|
|
additionalNodes := maxNodes - nodeCount
|
|
replicas := additionalNodes * replicasPerNode
|
|
totalMemory := additionalNodes * perNodeReservation
|
|
rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
|
|
expectedResult := createClusterPredicates(nodeCount + additionalNodes)
|
|
config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
|
|
|
|
// Test that scale up happens, allowing 1000 unschedulable pods not to be scheduled.
|
|
testCleanup := simpleScaleUpTestWithTolerance(f, config, 0, unschedulablePodReplicas)
|
|
defer testCleanup()
|
|
})
|
|
|
|
})
|
|
|
|
func anyKey(input map[string]int) string {
|
|
for k := range input {
|
|
return k
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func simpleScaleUpTestWithTolerance(f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
|
|
// resize cluster to start size
|
|
// run rc based on config
|
|
By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
|
|
start := time.Now()
|
|
framework.ExpectNoError(framework.RunRC(*config.extraPods))
|
|
// check results
|
|
if tolerateMissingNodeCount > 0 {
|
|
// Tolerate some number of nodes not to be created.
|
|
minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
|
|
framework.ExpectNoError(WaitForClusterSizeFunc(f.ClientSet,
|
|
func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
|
|
} else {
|
|
framework.ExpectNoError(framework.WaitForReadyNodes(f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
|
|
}
|
|
klog.Infof("cluster is increased")
|
|
if tolerateMissingPodCount > 0 {
|
|
framework.ExpectNoError(waitForCaPodsReadyInNamespace(f, f.ClientSet, tolerateMissingPodCount))
|
|
} else {
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
|
|
}
|
|
timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
|
|
return func() error {
|
|
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, config.extraPods.Name)
|
|
}
|
|
}
|
|
|
|
func simpleScaleUpTest(f *framework.Framework, config *scaleUpTestConfig) func() error {
|
|
return simpleScaleUpTestWithTolerance(f, config, 0, 0)
|
|
}
|
|
|
|
func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
|
|
return &testutils.RCConfig{
|
|
Client: f.ClientSet,
|
|
InternalClient: f.InternalClientset,
|
|
Name: id,
|
|
Namespace: f.Namespace.Name,
|
|
Timeout: timeout,
|
|
Image: imageutils.GetPauseImageName(),
|
|
Replicas: replicas,
|
|
MemRequest: int64(1024 * 1024 * megabytes / replicas),
|
|
}
|
|
}
|
|
|
|
func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
|
|
return &scaleUpTestConfig{
|
|
initialNodes: nodes,
|
|
initialPods: pods,
|
|
extraPods: extraPods,
|
|
expectedResult: expectedResult,
|
|
}
|
|
}
|
|
|
|
func createClusterPredicates(nodes int) *clusterPredicates {
|
|
return &clusterPredicates{
|
|
nodes: nodes,
|
|
}
|
|
}
|
|
|
|
func addAnnotation(f *framework.Framework, nodes []v1.Node, key, value string) error {
|
|
for _, node := range nodes {
|
|
oldData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if node.Annotations == nil {
|
|
node.Annotations = make(map[string]string)
|
|
}
|
|
node.Annotations[key] = value
|
|
|
|
newData, err := json.Marshal(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = f.ClientSet.CoreV1().Nodes().Patch(string(node.Name), types.StrategicMergePatchType, patchBytes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func createHostPortPodsWithMemory(f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
|
|
By(fmt.Sprintf("Running RC which reserves host port and memory"))
|
|
request := int64(1024 * 1024 * megabytes / replicas)
|
|
config := &testutils.RCConfig{
|
|
Client: f.ClientSet,
|
|
InternalClient: f.InternalClientset,
|
|
Name: id,
|
|
Namespace: f.Namespace.Name,
|
|
Timeout: timeout,
|
|
Image: imageutils.GetPauseImageName(),
|
|
Replicas: replicas,
|
|
HostPorts: map[string]int{"port1": port},
|
|
MemRequest: request,
|
|
}
|
|
err := framework.RunRC(*config)
|
|
framework.ExpectNoError(err)
|
|
return func() error {
|
|
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
|
|
}
|
|
}
|
|
|
|
type podBatch struct {
|
|
numNodes int
|
|
podsPerNode int
|
|
}
|
|
|
|
// distributeLoad distributes the pods in the way described by podDostribution,
|
|
// assuming all pods will have the same memory reservation and all nodes the same
|
|
// memory capacity. This allows us generate the load on the cluster in the exact
|
|
// way that we want.
|
|
//
|
|
// To achieve this we do the following:
|
|
// 1. Create replication controllers that eat up all the space that should be
|
|
// empty after setup, making sure they end up on different nodes by specifying
|
|
// conflicting host port
|
|
// 2. Create targer RC that will generate the load on the cluster
|
|
// 3. Remove the rcs created in 1.
|
|
func distributeLoad(f *framework.Framework, namespace string, id string, podDistribution []podBatch,
|
|
podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) func() error {
|
|
port := 8013
|
|
// Create load-distribution RCs with one pod per node, reserving all remaining
|
|
// memory to force the distribution of pods for the target RCs.
|
|
// The load-distribution RCs will be deleted on function return.
|
|
totalPods := 0
|
|
for i, podBatch := range podDistribution {
|
|
totalPods += podBatch.numNodes * podBatch.podsPerNode
|
|
remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
|
|
replicas := podBatch.numNodes
|
|
cleanup := createHostPortPodsWithMemory(f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
|
|
defer cleanup()
|
|
}
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
|
|
// Create the target RC
|
|
rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
|
|
framework.ExpectNoError(framework.RunRC(*rcConfig))
|
|
framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(f, f.ClientSet))
|
|
return func() error {
|
|
return framework.DeleteRCAndWaitForGC(f.ClientSet, f.Namespace.Name, id)
|
|
}
|
|
}
|
|
|
|
func timeTrack(start time.Time, name string) {
|
|
elapsed := time.Since(start)
|
|
klog.Infof("%s took %s", name, elapsed)
|
|
}
|