Implemented first version of load test

As for now it creates a bunch of Replication Controllers and play with them

Partially addresses #3139
Piotr Szczesniak 2015-05-05 16:48:50 +02:00
parent a98770e1ba
commit aa116e2033
3 changed files with 204 additions and 37 deletions

View File

@ -105,6 +105,10 @@ type RetryParams struct {
interval, timeout time.Duration
func NewRetryParams(interval, timeout time.Duration) *RetryParams {
return &RetryParams{interval, timeout}
// ResizeCondition is a closure around Resize that facilitates retries via util.wait
func ResizeCondition(r Resizer, precondition *ResizePrecondition, namespace, name string, count uint) wait.ConditionFunc {
return func() (bool, error) {

test/e2e/load.go Normal file
View File

@ -0,0 +1,151 @@
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package e2e
import (
. ""
. ""
const (
image = ""
simulationTime = 20 * time.Minute
smallRCSize = 5
mediumRCSize = 30
bigRCSize = 250
// This test suite can take a long time to run, so by default it is added to
// the ginkgo.skip list (see driver.go).
// To run this suite you must explicitly ask for it by setting the
// -t/--test flag or ginkgo.focus flag.
var _ = Describe("Load", func() {
var c *client.Client
var nodeCount int
var ns string
BeforeEach(func() {
var err error
c, err = loadClient()
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything())
nodeCount = len(nodes.Items)
nsForTesting, err := createTestingNS("load", c)
ns = nsForTesting.Name
// TODO add flag that allows to skip cleanup on failure
AfterEach(func() {
By(fmt.Sprintf("Destroying namespace for this suite %v", ns))
if err := c.Namespaces().Delete(ns); err != nil {
Failf("Couldn't delete ns %s", err)
type Load struct {
skip bool
podsPerNode int
loadTests := []Load{
{podsPerNode: 30, skip: true},
for _, testArg := range loadTests {
name := fmt.Sprintf("should be able to handle %v pods per node", testArg.podsPerNode)
if testArg.skip {
name = "[Skipped] " + name
It(name, func() {
totalPods := testArg.podsPerNode * nodeCount
smallRCCount, mediumRCCount, bigRCCount := computeRCCounts(totalPods)
threads := smallRCCount + mediumRCCount + bigRCCount
var wg sync.WaitGroup
// Run RC load for all kinds of RC.
runRCLoad(c, &wg, ns, smallRCSize, smallRCCount)
runRCLoad(c, &wg, ns, mediumRCSize, mediumRCCount)
runRCLoad(c, &wg, ns, bigRCSize, bigRCCount)
// Wait for all the pods from all the RC's to return.
// TODO verify latency metrics
func computeRCCounts(total int) (int, int, int) {
// Small RCs owns ~0.5 of total number of pods, medium and big RCs ~0.25 each.
// For example for 3000 pods (100 nodes, 30 pods per node) there are:
// - 500 small RCs each 5 pods
// - 25 medium RCs each 30 pods
// - 3 big RCs each 250 pods
bigRCCount := total / 4 / bigRCSize
mediumRCCount := (total - bigRCCount*bigRCSize) / 3 / mediumRCSize
smallRCCount := (total - bigRCCount*bigRCSize - mediumRCCount*mediumRCSize) / smallRCSize
return smallRCCount, mediumRCCount, bigRCCount
// The function creates a RC and then every few second resize it and with 0.1 probability deletes it.
func playWithRC(c *client.Client, wg *sync.WaitGroup, ns string, size int) {
defer GinkgoRecover()
defer wg.Done()
rcExist := false
var name string
// Once every 1-2 minutes perform resize of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist {
name = "load-test-" + string(util.NewUUID())
expectNoError(RunRC(c, name, ns, image, size))
rcExist = true
// Resize RC to a random size between 0.5x and 1.5x of the original size.
newSize := uint(rand.Intn(size+1) + size/2)
expectNoError(ResizeRC(c, ns, name, newSize))
// With probability 0.1 remove this RC.
if rand.Intn(10) == 0 {
expectNoError(DeleteRC(c, ns, name))
rcExist = false
if rcExist {
expectNoError(DeleteRC(c, ns, name))
func runRCLoad(c *client.Client, wg *sync.WaitGroup, ns string, size, count int) {
By(fmt.Sprintf("Running %v Replication Controllers with size %v and playing with them", count, size))
for i := 0; i < count; i++ {
go playWithRC(c, wg, ns, size)

View File

@ -36,9 +36,9 @@ import (
@ -306,7 +306,6 @@ func validateController(c *client.Client, containerImage string, replicas int, c
Failf("Timed out after %v seconds waiting for %s pods to reach valid state", podStartTimeout.Seconds(), testname)
// kubectlCmd runs the kubectl executable.
// kubectlCmd runs the kubectl executable.
func kubectlCmd(args ...string) *exec.Cmd {
defaultArgs := []string{}
@ -411,41 +410,6 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
// Delete a Replication Controller and all pods it spawned
func DeleteRC(c *client.Client, ns, name string) error {
rc, err := c.ReplicationControllers(ns).Get(name)
if err != nil {
return fmt.Errorf("Failed to find replication controller %s in namespace %s: %v", name, ns, err)
rc.Spec.Replicas = 0
if _, err := c.ReplicationControllers(ns).Update(rc); err != nil {
return fmt.Errorf("Failed to resize replication controller %s to zero: %v", name, err)
// Wait up to 20 minutes until all replicas are killed.
endTime := time.Now().Add(time.Minute * 20)
for {
if time.Now().After(endTime) {
return fmt.Errorf("Timeout while waiting for replication controller %s replicas to 0", name)
remainingTime := endTime.Sub(time.Now())
err := wait.Poll(time.Second, remainingTime, client.ControllerHasDesiredReplicas(c, rc))
if err != nil {
Logf("Error while waiting for replication controller %s replicas to read 0: %v", name, err)
} else {
// Delete the replication controller.
if err := c.ReplicationControllers(ns).Delete(name); err != nil {
return fmt.Errorf("Failed to delete replication controller %s: %v", name, err)
return nil
// RunRC Launches (and verifies correctness) of a Replication Controller
// It will waits for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
@ -576,6 +540,54 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
return nil
func ResizeRC(c *client.Client, ns, name string, size uint) error {
By(fmt.Sprintf("Resizing replication controller %s in namespace %s to %d", name, ns, size))
resizer, err := kubectl.ResizerFor("ReplicationController", kubectl.NewResizerClient(c))
if err != nil {
return err
waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
if err = resizer.Resize(ns, name, size, nil, nil, waitForReplicas); err != nil {
return err
return waitForRCPodsRunning(c, ns, name)
// Wait up to 10 minutes for pods to become Running.
func waitForRCPodsRunning(c *client.Client, ns, rcName string) error {
running := false
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": rcName}))
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
pods, err := listPods(c, ns, label, fields.Everything())
if err != nil {
Logf("Error listing pods: %v", err)
for _, p := range pods.Items {
if p.Status.Phase != api.PodRunning {
running = true
if !running {
return fmt.Errorf("Timeout while waiting for replication controller %s pods to be running", rcName)
return nil
// Delete a Replication Controller and all pods it spawned
func DeleteRC(c *client.Client, ns, name string) error {
By(fmt.Sprintf("Deleting replication controller %s in namespace %s", name, ns))
reaper, err := kubectl.ReaperFor("ReplicationController", c)
if err != nil {
return err
_, err = reaper.Stop(ns, name, api.NewDeleteOptions(0))
return err
// Convenient wrapper around listing pods supporting retries.
func listPods(c *client.Client, namespace string, label labels.Selector, field fields.Selector) (*api.PodList, error) {
maxRetries := 4