From 13b8d947fc109dd42f641ff65e36b0a5a96c052b Mon Sep 17 00:00:00 2001 From: Robert Rati Date: Tue, 26 May 2015 10:24:46 -0400 Subject: [PATCH] Revert "Revert "Added metrics/debug gathering methods to utils and used them in density ..."" This reverts commit 70500a64a71ecf78de7a6d3adf50950a6c771405. --- test/e2e/density.go | 34 ++++- test/e2e/fifo_queue.go | 75 +++++++++++ test/e2e/load.go | 16 ++- test/e2e/scale.go | 10 +- test/e2e/util.go | 293 +++++++++++++++++++++++++++++------------ 5 files changed, 333 insertions(+), 95 deletions(-) create mode 100644 test/e2e/fifo_queue.go diff --git a/test/e2e/density.go b/test/e2e/density.go index 701b3720ba..e64a6b647d 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -19,6 +19,7 @@ package e2e import ( "fmt" "math" + "os" "strconv" "time" @@ -45,6 +46,7 @@ var _ = Describe("Density", func() { var minionCount int var RCName string var ns string + var uuid string BeforeEach(func() { var err error @@ -57,6 +59,9 @@ var _ = Describe("Density", func() { nsForTesting, err := createTestingNS("density", c) ns = nsForTesting.Name expectNoError(err) + uuid = string(util.NewUUID()) + expectNoError(os.Mkdir(uuid, 0777)) + expectNoError(writePerfData(c, uuid, "before")) }) AfterEach(func() { @@ -76,6 +81,8 @@ var _ = Describe("Density", func() { Failf("Couldn't delete ns %s", err) } + expectNoError(writePerfData(c, uuid, "after")) + // Verify latency metrics // TODO: Update threshold to 1s once we reach this goal // TODO: We should reset metrics before the test. Currently previous tests influence latency metrics. @@ -89,16 +96,18 @@ var _ = Describe("Density", func() { type Density struct { skip bool podsPerMinion int + /* Controls how often the apiserver is polled for pods */ + interval int } densityTests := []Density{ // This test should always run, even if larger densities are skipped. - {podsPerMinion: 3, skip: false}, - {podsPerMinion: 30, skip: false}, + {podsPerMinion: 3, skip: false, interval: 10}, + {podsPerMinion: 30, skip: false, interval: 10}, // More than 30 pods per node is outside our v1.0 goals. // We might want to enable those tests in the future. - {podsPerMinion: 50, skip: true}, - {podsPerMinion: 100, skip: true}, + {podsPerMinion: 50, skip: true, interval: 10}, + {podsPerMinion: 100, skip: true, interval: 1}, } for _, testArg := range densityTests { @@ -112,8 +121,19 @@ var _ = Describe("Density", func() { itArg := testArg It(name, func() { totalPods := itArg.podsPerMinion * minionCount - nameStr := strconv.Itoa(totalPods) + "-" + string(util.NewUUID()) - RCName = "my-hostname-density" + nameStr + RCName = "density" + strconv.Itoa(totalPods) + "-" + uuid + fileHndl, err := os.Create(fmt.Sprintf("%s/pod_states.csv", uuid)) + expectNoError(err) + defer fileHndl.Close() + + config := RCConfig{Client: c, + Image: "gcr.io/google_containers/pause:go", + Name: RCName, + Namespace: ns, + PollInterval: itArg.interval, + PodStatusFile: fileHndl, + Replicas: totalPods, + } // Create a listener for events. events := make([](*api.Event), 0) @@ -139,7 +159,7 @@ var _ = Describe("Density", func() { // Start the replication controller. startTime := time.Now() - expectNoError(RunRC(c, RCName, ns, "gcr.io/google_containers/pause:go", totalPods)) + expectNoError(RunRC(config)) e2eStartupTime := time.Now().Sub(startTime) Logf("E2E startup time for %d pods: %v", totalPods, e2eStartupTime) diff --git a/test/e2e/fifo_queue.go b/test/e2e/fifo_queue.go new file mode 100644 index 0000000000..0942c37df7 --- /dev/null +++ b/test/e2e/fifo_queue.go @@ -0,0 +1,75 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "sync" + "time" +) + +type QueueItem struct { + createTime string + value interface{} +} + +type QueueItems struct { + pos int + mutex *sync.Mutex + list []QueueItem +} + +type FifoQueue QueueItems + +func (fq *FifoQueue) Push(elem interface{}) { + fq.mutex.Lock() + fq.list = append(fq.list, QueueItem{time.Now().String(), elem}) + fq.mutex.Unlock() +} + +func (fq *FifoQueue) Pop() QueueItem { + fq.mutex.Lock() + var val QueueItem + if len(fq.list)-1 >= fq.pos { + val = fq.list[fq.pos] + fq.pos++ + } + fq.mutex.Unlock() + return val +} + +func (fq FifoQueue) Len() int { + return len(fq.list[fq.pos:]) +} + +func (fq *FifoQueue) First() QueueItem { + return fq.list[fq.pos] +} + +func (fq *FifoQueue) Last() QueueItem { + return fq.list[len(fq.list)-1] +} + +func (fq *FifoQueue) Reset() { + fq.pos = 0 +} + +func newFifoQueue() *FifoQueue { + tmp := new(FifoQueue) + tmp.mutex = &sync.Mutex{} + tmp.pos = 0 + return tmp +} diff --git a/test/e2e/load.go b/test/e2e/load.go index a8498cd7c3..c61c2dbbfc 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -149,7 +149,13 @@ func playWithRC(c *client.Client, wg *sync.WaitGroup, ns, name string, size int) // Once every 1-2 minutes perform scale of RC. for start := time.Now(); time.Since(start) < simulationTime; time.Sleep(time.Duration(60+rand.Intn(60)) * time.Second) { if !rcExist { - expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: image, + Replicas: size, + } + expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s", name, ns)) rcExist = true } // Scale RC to a random size between 0.5x and 1.5x of the original size. @@ -187,7 +193,13 @@ func createRCGroup(c *client.Client, ns, groupName string, size, count, batchSiz defer GinkgoRecover() defer wg.Done() name := groupName + "-" + strconv.Itoa(i) - expectNoError(RunRC(c, name, ns, image, size), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns)) + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: image, + Replicas: size, + } + expectNoError(RunRC(config), fmt.Sprintf("creating rc %s in namespace %s for the first time", name, ns)) }(i) } wg.Wait() diff --git a/test/e2e/scale.go b/test/e2e/scale.go index a787c28f73..52f1496b6f 100644 --- a/test/e2e/scale.go +++ b/test/e2e/scale.go @@ -107,7 +107,15 @@ var _ = Describe("Scale", func() { for i := 0; i < itArg.rcsPerThread; i++ { name := "my-short-lived-pod" + string(util.NewUUID()) n := itArg.podsPerMinion * minionCount - expectNoError(RunRC(c, name, ns, "gcr.io/google_containers/pause:go", n)) + + config := RCConfig{Client: c, + Name: name, + Namespace: ns, + Image: "gcr.io/google_containers/pause:go", + Replicas: n, + } + + expectNoError(RunRC(config)) podsLaunched += n Logf("Launched %v pods so far...", podsLaunched) err := DeleteRC(c, ns, name) diff --git a/test/e2e/util.go b/test/e2e/util.go index 89811290c4..dacff45b76 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -19,8 +19,10 @@ package e2e import ( "bytes" "fmt" + "io/ioutil" "math" "math/rand" + "net/http" "os" "os/exec" "path/filepath" @@ -132,6 +134,16 @@ func (s *podStore) Stop() { close(s.stopCh) } +type RCConfig struct { + Client *client.Client + Image string + Name string + Namespace string + PollInterval int + PodStatusFile *os.File + Replicas int +} + func Logf(format string, a ...interface{}) { fmt.Fprintf(GinkgoWriter, "INFO: "+format+"\n", a...) } @@ -703,12 +715,20 @@ func Diff(oldPods []*api.Pod, curPods []*api.Pod) PodDiff { // It will waits for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the // namespace lifecycle for handling cleanup). -func RunRC(c *client.Client, name string, ns, image string, replicas int) error { +func RunRC(config RCConfig) error { var last int + c := config.Client + name := config.Name + ns := config.Namespace + image := config.Image + replicas := config.Replicas + interval := config.PollInterval maxContainerFailures := int(math.Max(1.0, float64(replicas)*.01)) current := 0 same := 0 + label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) + podLists := newFifoQueue() By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name)) rc := &api.ReplicationController{ @@ -741,32 +761,49 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error return fmt.Errorf("Error creating replication controller: %v", err) } Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas) - - By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) - label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) podStore := newPodStore(c, ns, label, fields.Everything()) defer podStore.Stop() - pods := podStore.List() - current = len(pods) - failCount := 24 + + // Create a routine to query for the list of pods + stop := make(chan struct{}) + go func(stop <-chan struct{}, n string, ns string, l labels.Selector, i int) { + for { + select { + case <-stop: + return + default: + podLists.Push(podStore.List()) + time.Sleep(time.Duration(i) * time.Second) + } + } + }(stop, name, ns, label, interval) + defer close(stop) + + By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) + failCount := int(120.0 / float64(interval)) for same < failCount && current < replicas { - Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas) - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { - return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) - } + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - if same >= failCount { - return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) - } + // Greedily read all existing entries in the queue until + // all pods are found submitted or the queue is empty + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() + pods := item.value.([]*api.Pod) + current = len(pods) + Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas) + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { + return fmt.Errorf("Controller %s: Number of submitted pods dropped from %d to %d", name, last, current) + } - last = current - time.Sleep(5 * time.Second) - pods = podStore.List() - current = len(pods) + if same >= failCount { + return fmt.Errorf("Controller %s: No pods submitted for the last %d checks", name, failCount) + } + last = current + } } if current != replicas { return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) @@ -776,77 +813,92 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures)) same = 0 last = 0 - failCount = 20 + failCount = int(100.0 / float64(interval)) current = 0 - oldPods := make([]*api.Pod, 0) + var oldPods []*api.Pod + podLists.Reset() + foundAllPods := false for same < failCount && current < replicas { - current = 0 - waiting := 0 - pending := 0 - unknown := 0 - inactive := 0 - failedContainers := 0 - time.Sleep(5 * time.Second) + time.Sleep(time.Duration(float32(interval)*1.1) * time.Second) - currentPods := podStore.List() - for _, p := range currentPods { - if p.Status.Phase == api.PodRunning { - current++ - for _, v := range FailedContainers(*p) { - failedContainers = failedContainers + v.restarts + // Greedily read all existing entries in the queue until + // either all pods are running or the queue is empty + for podLists.Len() > 0 && current < replicas { + item := podLists.Pop() + current = 0 + waiting := 0 + pending := 0 + unknown := 0 + inactive := 0 + failedContainers := 0 + currentPods := item.value.([]*api.Pod) + for _, p := range currentPods { + if p.Status.Phase == api.PodRunning { + current++ + for _, v := range FailedContainers(p) { + failedContainers = failedContainers + v.restarts + } + } else if p.Status.Phase == api.PodPending { + if p.Spec.NodeName == "" { + waiting++ + } else { + pending++ + } + } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { + inactive++ + } else if p.Status.Phase == api.PodUnknown { + unknown++ } - } else if p.Status.Phase == api.PodPending { - if p.Spec.NodeName == "" { - waiting++ - } else { - pending++ - } - } else if p.Status.Phase == api.PodSucceeded || p.Status.Phase == api.PodFailed { - inactive++ - } else if p.Status.Phase == api.PodUnknown { - unknown++ } - } - Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) - if len(currentPods) != len(pods) { + Logf("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown ", current, pending, waiting, inactive, unknown) + if config.PodStatusFile != nil { + fmt.Fprintf(config.PodStatusFile, "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n", item.createTime, current, pending, waiting, inactive, unknown) + } - // This failure mode includes: - // kubelet is dead, so node controller deleted pods and rc creates more - // - diagnose by noting the pod diff below. - // pod is unhealthy, so replication controller creates another to take its place - // - diagnose by comparing the previous "2 Pod states" lines for inactive pods - errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(pods)) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if last < current { - same = 0 - } else if last == current { - same++ - } else if current < last { + if foundAllPods && len(currentPods) != len(oldPods) { - // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. - errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) - Logf("%v, pods that changed since the last iteration:", errorStr) - Diff(oldPods, currentPods).Print(util.NewStringSet()) - return fmt.Errorf(errorStr) - } - if same >= failCount { + // This failure mode includes: + // kubelet is dead, so node controller deleted pods and rc creates more + // - diagnose by noting the pod diff below. + // pod is unhealthy, so replication controller creates another to take its place + // - diagnose by comparing the previous "2 Pod states" lines for inactive pods + errorStr := fmt.Sprintf("Number of reported pods changed: %d vs %d", len(currentPods), len(oldPods)) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if last < current { + same = 0 + } else if last == current { + same++ + } else if current < last { - // Most times this happens because a few nodes have kubelet problems, and their pods are - // stuck in pending. - errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) - Logf("%v, pods currently in pending:", errorStr) - Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) - return fmt.Errorf(errorStr) - } - last = current - oldPods = currentPods + // The pod failed or succeeded, or was somehow pushed out of running by the kubelet. + errorStr := fmt.Sprintf("Number of running pods dropped from %d to %d", last, current) + Logf("%v, pods that changed since the last iteration:", errorStr) + Diff(oldPods, currentPods).Print(util.NewStringSet()) + return fmt.Errorf(errorStr) + } + if same >= failCount { - if failedContainers > maxContainerFailures { - return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + // Most times this happens because a few nodes have kubelet problems, and their pods are + // stuck in pending. + errorStr := fmt.Sprintf("No pods started for the last %d checks", failCount) + Logf("%v, pods currently in pending:", errorStr) + Diff(currentPods, make([]*api.Pod, 0)).Print(util.NewStringSet(string(api.PodRunning))) + return fmt.Errorf(errorStr) + } + + if !foundAllPods { + foundAllPods = len(currentPods) == replicas + } + last = current + oldPods = currentPods + + if failedContainers > maxContainerFailures { + return fmt.Errorf("%d containers failed which is more than allowed %d", failedContainers, maxContainerFailures) + } } } if current != replicas { @@ -908,7 +960,7 @@ func DeleteRC(c *client.Client, ns, name string) error { // information for containers that have failed or been restarted. // A map is returned where the key is the containerID and the value is a // struct containing the restart and failure information -func FailedContainers(pod api.Pod) map[string]ContainerFailures { +func FailedContainers(pod *api.Pod) map[string]ContainerFailures { var state ContainerFailures states := make(map[string]ContainerFailures) @@ -1048,7 +1100,7 @@ func (a LatencyMetricByLatency) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a LatencyMetricByLatency) Less(i, j int) bool { return a[i].Latency < a[j].Latency } func ReadLatencyMetrics(c *client.Client) ([]LatencyMetric, error) { - body, err := c.Get().AbsPath("/metrics").DoRaw() + body, err := getMetrics(c) if err != nil { return nil, err } @@ -1118,3 +1170,74 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou return len(badMetrics), nil } + +// Retrieve metrics information +func getMetrics(c *client.Client) (string, error) { + body, err := c.Get().AbsPath("/metrics").DoRaw() + if err != nil { + return "", err + } + return string(body), nil +} + +// Retrieve debug information +func getDebugInfo(c *client.Client) (map[string]string, error) { + data := make(map[string]string) + for _, key := range []string{"block", "goroutine", "heap", "threadcreate"} { + resp, err := http.Get(c.Get().AbsPath(fmt.Sprintf("debug/pprof/%s", key)).URL().String() + "?debug=2") + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + Logf("Warning: Error trying to fetch %s debug data: %v", key, err) + } + data[key] = string(body) + } + return data, nil +} + +func writePerfData(c *client.Client, dirName string, postfix string) error { + fname := fmt.Sprintf("%s/metrics_%s.txt", dirName, postfix) + + handler, err := os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + + metrics, err := getMetrics(c) + if err != nil { + return fmt.Errorf("Error retrieving metrics: %v", err) + } + + _, err = handler.WriteString(metrics) + if err != nil { + return fmt.Errorf("Error writing metrics: %v", err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + + debug, err := getDebugInfo(c) + if err != nil { + return fmt.Errorf("Error retrieving debug information: %v", err) + } + + for key, value := range debug { + fname := fmt.Sprintf("%s/%s_%s.txt", dirName, key, postfix) + handler, err = os.Create(fname) + if err != nil { + return fmt.Errorf("Error creating file '%s': %v", fname, err) + } + _, err = handler.WriteString(value) + if err != nil { + return fmt.Errorf("Error writing %s: %v", key, err) + } + + err = handler.Close() + if err != nil { + return fmt.Errorf("Error closing '%s': %v", fname, err) + } + } + return nil +}