Merge pull request #8809 from rrati/performance-gathering-7572

Performance gathering 7572
pull/6/head
Brian Grant 2015-06-03 12:37:17 -07:00
commit 8c0b1c7db4
5 changed files with 369 additions and 97 deletions

View File

@ -19,6 +19,7 @@ package e2e
import (
"fmt"
"math"
"os"
"strconv"
"time"
@ -45,6 +46,7 @@ var _ = Describe("Density", func() {
var minionCount int
var RCName string
var ns string
var uuid string
BeforeEach(func() {
var err error
@ -57,6 +59,9 @@ var _ = Describe("Density", func() {
nsForTesting, err := createTestingNS("density", c)
ns = nsForTesting.Name
expectNoError(err)
uuid = string(util.NewUUID())
expectNoError(os.Mkdir(uuid, 0777))
expectNoError(writePerfData(c, uuid, "before"))
})
AfterEach(func() {
@ -76,6 +81,8 @@ var _ = Describe("Density", func() {
Failf("Couldn't delete ns %s", err)
}
expectNoError(writePerfData(c, uuid, "after"))
// Verify latency metrics
// TODO: Update threshold to 1s once we reach this goal
// TODO: We should reset metrics before the test. Currently previous tests influence latency metrics.
@ -89,16 +96,18 @@ var _ = Describe("Density", func() {
type Density struct {
skip bool
podsPerMinion int
/* Controls how often the apiserver is polled for pods */
interval int
}
densityTests := []Density{
// This test should always run, even if larger densities are skipped.
{podsPerMinion: 3, skip: false},
{podsPerMinion: 30, skip: false},
{podsPerMinion: 3, skip: false, interval: 10},
{podsPerMinion: 30, skip: false, interval: 10},
// More than 30 pods per node is outside our v1.0 goals.
// We might want to enable those tests in the future.
{podsPerMinion: 50, skip: true},
{podsPerMinion: 100, skip: true},
{podsPerMinion: 50, skip: true, interval: 10},
{podsPerMinion: 100, skip: true, interval: 1},
}
for _, testArg := range densityTests {
@ -112,8 +121,19 @@ var _ = Describe("Density", func() {
itArg := testArg
It(name, func() {
totalPods := itArg.podsPerMinion * minionCount
nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID())
RCName = "my-hostname-density" + nameStr
RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid
fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid))
expectNoError(err)
defer fileHndl.Close()
config := RCConfig{Client: c,
Image: "gcr.io/google_containers/pause:go",
Name: RCName,
Namespace: ns,
PollInterval: itArg.interval,
PodStatusFile: fileHndl,
Replicas: totalPods,
}
// Create a listener for events.
events := make([](*api.Event), 0)
@ -139,7 +159,7 @@ var _ = Describe("Density", func() {
// Start the replication controller.
startTime := time.Now()
expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods))
expectNoError(RunRC(config))
e2eStartupTime := time.Now().Sub(startTime)
Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime)

75
test/e2e/fifo_queue.go Normal file
View File

@ -0,0 +1,75 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"sync"
"time"
)
type QueueItem struct {
createTime string
value interface{}
}
type QueueItems struct {
pos int
mutex *sync.Mutex
list []QueueItem
}
type FifoQueue QueueItems
func (fq *FifoQueue) Push(elem interface{}) {
fq.mutex.Lock()
fq.list = append(fq.list, QueueItem{time.Now().String(), elem})
fq.mutex.Unlock()
}
func (fq *FifoQueue) Pop() QueueItem {
fq.mutex.Lock()
var val QueueItem
if len(fq.list)-1 >= fq.pos {
val = fq.list[fq.pos]
fq.pos++
}
fq.mutex.Unlock()
return val
}
func (fq FifoQueue) Len() int {
return len(fq.list[fq.pos:])
}
func (fq *FifoQueue) First() QueueItem {
return fq.list[fq.pos]
}
func (fq *FifoQueue) Last() QueueItem {
return fq.list[len(fq.list)-1]
}
func (fq *FifoQueue) Reset() {
fq.pos = 0
}
func newFifoQueue() *FifoQueue {
tmp := new(FifoQueue)
tmp.mutex = &sync.Mutex{}
tmp.pos = 0
return tmp
}

View File

@ -149,7 +149,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int)
// Once every 1-2 minutes perform scale of RC.
for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) {
if !rcExist {
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns))
rcExist = true
}
// Scale RC to a random size between 0.5x and 1.5x of the original size.
@ -187,7 +193,13 @@ func createRCGroup(c *client.Client, ns, groupName string, size, count, batchSiz
defer GinkgoRecover()
defer wg.Done()
name := groupName + "-" + strconv.Itoa(i)
expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: image,
Replicas: size,
}
expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns))
}(i)
}
wg.Wait()

View File

@ -107,7 +107,15 @@ var _ = Describe("Scale", func() {
for i := 0; i < itArg.rcsPerThread; i++ {
name := "my-short-lived-pod" + string(util.NewUUID())
n := itArg.podsPerMinion * minionCount
expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n))
config := RCConfig{Client: c,
Name: name,
Namespace: ns,
Image: "gcr.io/google_containers/pause:go",
Replicas: n,
}
expectNoError(RunRC(config))
podsLaunched += n
Logf("Launched %v pods so far...", podsLaunched)
err := DeleteRC(c, ns, name)

View File

@ -19,8 +19,10 @@ package e2e
import (
"bytes"
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
@ -132,6 +134,16 @@ func (s *podStore) Stop() {
close(s.stopCh)
}
type RCConfig struct {
Client *client.Client
Image string
Name string
Namespace string
PollInterval int
PodStatusFile *os.File
Replicas int
}
func Logf(format string, a ...interface{}) {
fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...)
}
@ -700,15 +712,27 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff {
}
// RunRC Launches (and verifies correctness) of a Replication Controller
// It will waits for all pods it spawns to become "Running".
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunRC(c *client.Client, name string, ns, image string, replicas int) error {
func RunRC(config RCConfig) error {
var last int
c := config.Client
name := config.Name
ns := config.Namespace
image := config.Image
replicas := config.Replicas
interval := config.PollInterval
maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01))
current := 0
same := 0
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podLists := newFifoQueue()
// Default to 10 second polling/check interval
if interval <= 0 {
interval = 10
}
By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name))
rc := &api.ReplicationController{
@ -741,112 +765,170 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
return fmt.Errorf("Error creating replication controller: %v", err)
}
Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
podStore := newPodStore(c, ns, label, fields.Everything())
defer podStore.Stop()
pods := podStore.List()
current = len(pods)
failCount := 24
// Create a routine to query for the list of pods
stop := make(chan struct{})
go func(stop <-chan struct{}, ns string, label labels.Selector, interval int) {
for {
select {
case <-stop:
return
default:
podLists.Push(podStore.List())
time.Sleep(time.Duration(interval) * time.Second)
}
}
}(stop, ns, label, interval)
defer close(stop)
// Look for all the replicas to be created by the replication
// controller. Stop looking if all replicas are found or no new
// replicas are found for a continual number of times
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
// There must be some amount of new pods created within 2 minutes, so
// determine the number of checks needed to ensure timeout within
// that time period. 2 minutes is generous amount of time to see
// a change new pods created in the system even if it is under load.
failCount := int(math.Max(1.0, 120.0/float64(interval)))
for same < failCount && current < replicas {
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
// Wait just longer than an interval to allow processing
// information in the queue quickly
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
if same >= failCount {
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
}
// Greedily read all existing entries in the queue until
// all pods are found submitted or the queue is empty. If
// the queue is empty then we need to stop trying to process
// entries until there is something or process in the queue
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
pods := item.value.([]*api.Pod)
current = len(pods)
Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current)
}
last = current
time.Sleep(5 * time.Second)
pods = podStore.List()
current = len(pods)
if same >= failCount {
return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount)
}
last = current
}
}
if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
}
Logf("%v Controller %s in ns %s: Found %d pods out of %d", time.Now(), name, ns, current, replicas)
// Look for all the replicas to be in a Running state. Stop looking
// if all replicas are found in a Running state or no new
// replicas are found Running for a continual number of times
By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures))
// There must be some amount of pods that have newly transitioned to
// the Running state within 100 seconds, so determine the number of
// checks needed to ensure timeout within that time period.
// 100 seconds is generous amount of time to see a change in the
// system even if it is under load.
failCount = int(math.Max(1.0, 100.0/float64(interval)))
same = 0
last = 0
failCount = 20
current = 0
oldPods := make([]*api.Pod, 0)
var oldPods []*api.Pod
podLists.Reset()
foundAllPods := false
for same < failCount && current < replicas {
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
time.Sleep(5 * time.Second)
// Wait just longer than an interval to allow processing
// information in the queue quickly
time.Sleep(time.Duration(float32(interval)*1.1) * time.Second)
currentPods := podStore.List()
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(*p) {
failedContainers = failedContainers + v.restarts
// Greedily read all existing entries in the queue until
// either all pods are running or the queue is empty. If
// the queue is empty we need to stop looking for entries
// and wait for a new entry to process
for podLists.Len() > 0 && current < replicas {
item := podLists.Pop()
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
currentPods := item.value.([]*api.Pod)
for _, p := range currentPods {
if p.Status.Phase == api.PodRunning {
current++
for _, v := range FailedContainers(p) {
failedContainers = failedContainers + v.restarts
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
}
} else if p.Status.Phase == api.PodPending {
if p.Spec.NodeName == "" {
waiting++
} else {
pending++
}
} else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed {
inactive++
} else if p.Status.Phase == api.PodUnknown {
unknown++
}
}
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if len(currentPods) != len(pods) {
Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown)
if config.PodStatusFile != nil {
fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown)
}
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(pods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
if foundAllPods && len(currentPods) != len(oldPods) {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current)
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if same >= failCount {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods))
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if last < current {
same = 0
} else if last == current {
same++
} else if current < last {
// Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
last = current
oldPods = currentPods
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current)
Logf("%v, pods that changed since the last iteration:", errorStr)
Diff(oldPods, currentPods).Print(util.NewStringSet())
return fmt.Errorf(errorStr)
}
if same >= failCount {
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
// Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending.
errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount)
Logf("%v, pods currently in pending:", errorStr)
Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning)))
return fmt.Errorf(errorStr)
}
if !foundAllPods {
foundAllPods = len(currentPods) == replicas
}
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures {
return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures)
}
}
}
if current != replicas {
@ -908,7 +990,7 @@ func DeleteRC(c *client.Client, ns, name string) error {
// information for containers that have failed or been restarted.
// A map is returned where the key is the containerID and the value is a
// struct containing the restart and failure information
func FailedContainers(pod api.Pod) map[string]ContainerFailures {
func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
var state ContainerFailures
states := make(map[string]ContainerFailures)
@ -1048,7 +1130,7 @@ func (a LatencyMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency }
func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
body, err := getMetrics(c)
if err != nil {
return nil, err
}
@ -1118,3 +1200,78 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou
return len(badMetrics), nil
}
// Retrieve metrics information
func getMetrics(c *client.Client) (string, error) {
body, err := c.Get().AbsPath("/metrics").DoRaw()
if err != nil {
return "", err
}
return string(body), nil
}
// Retrieve debug information
func getDebugInfo(c *client.Client) (map[string]string, error) {
data := make(map[string]string)
for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} {
resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2")
if err != nil {
Logf("Warning: Error trying to fetch %s debug data: %v", key, err)
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
Logf("Warning: Error trying to read %s debug data: %v", key, err)
}
data[key] = string(body)
}
return data, nil
}
func writePerfData(c *client.Client, dirName string, postfix string) error {
fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix)
handler, err := os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
metrics, err := getMetrics(c)
if err != nil {
return fmt.Errorf("Error retrieving metrics: %v", err)
}
_, err = handler.WriteString(metrics)
if err != nil {
return fmt.Errorf("Error writing metrics: %v", err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
debug, err := getDebugInfo(c)
if err != nil {
return fmt.Errorf("Error retrieving debug information: %v", err)
}
for key, value := range debug {
fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix)
handler, err = os.Create(fname)
if err != nil {
return fmt.Errorf("Error creating file '%s': %v", fname, err)
}
_, err = handler.WriteString(value)
if err != nil {
return fmt.Errorf("Error writing %s: %v", key, err)
}
err = handler.Close()
if err != nil {
return fmt.Errorf("Error closing '%s': %v", fname, err)
}
}
return nil
}