Merge pull request #30786 from coufon/add_time_series

Automatic merge from submit-queue

Add logging time series to benchmark test

This PR adds a new file benchmark_util.go which contains tool functions for benchmark (we can migrate benchmark related functions into it). 

The PR logs time series data for density benchmark test.
pull/6/head
Kubernetes Submit Queue 2016-08-19 13:41:29 -07:00 committed by GitHub
commit 5f7875a9bc
6 changed files with 209 additions and 102 deletions

View File

@ -78,27 +78,20 @@ func PrintPerfData(p *perftype.PerfData) {
// Notice that this function only cares about memory usage, because cpu usage information will be extracted from NodesCPUSummary.
func ResourceUsageToPerfDataWithLabels(usagePerNode ResourceUsagePerNode, labels map[string]string) *perftype.PerfData {
items := []perftype.DataItem{}
for node, usages := range usagePerNode {
for _, usages := range usagePerNode {
for c, usage := range usages {
newLabels := map[string]string{
"node": node,
"container": c,
"resource": "memory",
}
if labels != nil {
for k, v := range labels {
newLabels[k] = v
}
}
item := perftype.DataItem{
Data: map[string]float64{
"memory": float64(usage.MemoryUsageInBytes) / (1024 * 1024),
"workingset": float64(usage.MemoryWorkingSetInBytes) / (1024 * 1024),
"rss": float64(usage.MemoryRSSInBytes) / (1024 * 1024),
},
Unit: "MB",
Labels: newLabels,
Unit: "MB",
Labels: map[string]string{
"container": c,
"datatype": "resource",
"resource": "memory",
},
}
items = append(items, item)
}
@ -106,34 +99,28 @@ func ResourceUsageToPerfDataWithLabels(usagePerNode ResourceUsagePerNode, labels
return &perftype.PerfData{
Version: currentKubeletPerfMetricsVersion,
DataItems: items,
Labels: labels,
}
}
// CPUUsageToPerfDataWithLabels transforms NodesCPUSummary to PerfData with additional labels.
func CPUUsageToPerfDataWithLabels(usagePerNode NodesCPUSummary, labels map[string]string) *perftype.PerfData {
items := []perftype.DataItem{}
for node, usages := range usagePerNode {
for _, usages := range usagePerNode {
for c, usage := range usages {
newLabels := map[string]string{
"node": node,
"container": c,
"resource": "cpu",
}
if labels != nil {
for k, v := range labels {
newLabels[k] = v
}
}
data := map[string]float64{}
for perc, value := range usage {
data[fmt.Sprintf("Perc%02.0f", perc*100)] = value * 1000
}
item := perftype.DataItem{
Data: data,
Unit: "mCPU",
Labels: newLabels,
Data: data,
Unit: "mCPU",
Labels: map[string]string{
"container": c,
"datatype": "resource",
"resource": "cpu",
},
}
items = append(items, item)
}
@ -141,5 +128,6 @@ func CPUUsageToPerfDataWithLabels(usagePerNode NodesCPUSummary, labels map[strin
return &perftype.PerfData{
Version: currentKubeletPerfMetricsVersion,
DataItems: items,
Labels: labels,
}
}

View File

@ -31,7 +31,7 @@ type DataItem struct {
// should have the same unit.
Unit string `json:"unit"`
// Labels is the labels of the data item.
Labels map[string]string `json:"labels"`
Labels map[string]string `json:"labels,omitempty"`
}
// PerfData contains all data items generated in current test.
@ -40,6 +40,8 @@ type PerfData struct {
// to detect metrics version change and decide what version to support.
Version string `json:"version"`
DataItems []DataItem `json:"dataItems"`
// Labels is the labels of the dataset.
Labels map[string]string `json:"labels,omitempty"`
}
// PerfResultTag is the prefix of generated perfdata. Analyzing tools can find the perf result

View File

@ -0,0 +1,104 @@
// +build linux
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"sort"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/perftype"
)
const (
// TODO(coufon): be consistent with perf_util.go version (not exposed)
currentTimeSeriesVersion = "v1"
TimeSeriesTag = "[Result:TimeSeries]"
TimeSeriesEnd = "[Finish:TimeSeries]"
)
type NodeTimeSeries struct {
// value in OperationData is an array of timestamps
OperationData map[string][]int64 `json:"op_data,omitempty"`
ResourceData map[string]*ResourceSeries `json:"resource_data,omitempty"`
Labels map[string]string `json:"labels"`
Version string `json:"version"`
}
// logDensityTimeSeries logs the time series data of operation and resource usage
func logDensityTimeSeries(rc *ResourceCollector, create, watch map[string]unversioned.Time, testName string) {
timeSeries := &NodeTimeSeries{
Labels: map[string]string{
"node": framework.TestContext.NodeName,
"test": testName,
},
Version: currentTimeSeriesVersion,
}
// Attach operation time series.
timeSeries.OperationData = map[string][]int64{
"create": getCumulatedPodTimeSeries(create),
"running": getCumulatedPodTimeSeries(watch),
}
// Attach resource time series.
timeSeries.ResourceData = rc.GetResourceTimeSeries()
// Log time series with tags
framework.Logf("%s %s\n%s", TimeSeriesTag, framework.PrettyPrintJSON(timeSeries), TimeSeriesEnd)
}
type int64arr []int64
func (a int64arr) Len() int { return len(a) }
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] }
// getCumulatedPodTimeSeries gets the cumulative pod number time series.
func getCumulatedPodTimeSeries(timePerPod map[string]unversioned.Time) []int64 {
timeSeries := make(int64arr, 0)
for _, ts := range timePerPod {
timeSeries = append(timeSeries, ts.Time.UnixNano())
}
// Sort all timestamps.
sort.Sort(timeSeries)
return timeSeries
}
// getLatencyPerfData returns perf data from latency
func getLatencyPerfData(latency framework.LatencyMetric, testName string) *perftype.PerfData {
return &perftype.PerfData{
Version: "v1",
DataItems: []perftype.DataItem{
{
Data: map[string]float64{
"Perc50": float64(latency.Perc50) / 1000000,
"Perc90": float64(latency.Perc90) / 1000000,
"Perc99": float64(latency.Perc99) / 1000000,
},
Unit: "ms",
Labels: map[string]string{
"datatype": "latency",
"latencytype": "test-e2e",
},
},
},
Labels: map[string]string{
"node": framework.TestContext.NodeName,
"test": testName,
},
}
}

View File

@ -106,19 +106,20 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval",
itArg.podsNr, itArg.interval), func() {
batchLag, e2eLags := runDensityBatchTest(f, rc, itArg)
itArg.createMethod = "batch"
testName := itArg.getTestName()
batchLag, e2eLags := runDensityBatchTest(f, rc, itArg, false)
By("Verifying latency")
printAndVerifyLatency(batchLag, e2eLags, itArg, true)
logAndVerifyLatency(batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, true)
By("Verifying resource")
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, true)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, true)
})
}
})
Context("create a batch of pods [Benchmark]", func() {
Context("create a batch of pods", func() {
dTests := []densityTest{
{
podsNr: 10,
@ -136,16 +137,17 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
for _, testArg := range dTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval",
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval [Benchmark]",
itArg.podsNr, itArg.interval), func() {
batchLag, e2eLags := runDensityBatchTest(f, rc, itArg)
itArg.createMethod = "batch"
testName := itArg.getTestName()
batchLag, e2eLags := runDensityBatchTest(f, rc, itArg, true)
By("Verifying latency")
printAndVerifyLatency(batchLag, e2eLags, itArg, false)
logAndVerifyLatency(batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, false)
By("Verifying resource")
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, false)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, false)
})
}
})
@ -175,19 +177,20 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods",
itArg.podsNr, itArg.bgPodsNr), func() {
itArg.createMethod = "sequence"
testName := itArg.getTestName()
batchlag, e2eLags := runDensitySeqTest(f, rc, itArg)
By("Verifying latency")
printAndVerifyLatency(batchlag, e2eLags, itArg, true)
logAndVerifyLatency(batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, true)
By("Verifying resource")
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, true)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, true)
})
}
})
Context("create a sequence of pods [Benchmark]", func() {
Context("create a sequence of pods", func() {
dTests := []densityTest{
{
podsNr: 10,
@ -205,16 +208,17 @@ var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
for _, testArg := range dTests {
itArg := testArg
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods",
It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods [Benchmark]",
itArg.podsNr, itArg.bgPodsNr), func() {
itArg.createMethod = "sequence"
testName := itArg.getTestName()
batchlag, e2eLags := runDensitySeqTest(f, rc, itArg)
By("Verifying latency")
printAndVerifyLatency(batchlag, e2eLags, itArg, false)
logAndVerifyLatency(batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, false)
By("Verifying resource")
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, false)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, false)
})
}
})
@ -227,6 +231,8 @@ type densityTest struct {
bgPodsNr int
// interval between creating pod (rate control)
interval time.Duration
// create pods in 'batch' or 'sequence'
createMethod string
// performance limits
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
@ -234,8 +240,13 @@ type densityTest struct {
podBatchStartupLimit time.Duration
}
func (dt *densityTest) getTestName() string {
return fmt.Sprintf("density_create_%s_%d_%d_%d", dt.createMethod, dt.podsNr, dt.bgPodsNr, dt.interval.Nanoseconds()/1000000)
}
// runDensityBatchTest runs the density batch pod creation test
func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest) (time.Duration, []framework.PodLatencyData) {
func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest,
isLogTimeSeries bool) (time.Duration, []framework.PodLatencyData) {
const (
podType = "density_test_pod"
sleepBeforeCreatePods = 30 * time.Second
@ -308,6 +319,13 @@ func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg
sort.Sort(framework.LatencySlice(e2eLags))
batchLag := lastRunning.Time.Sub(firstCreate.Time)
// Log time series data.
if isLogTimeSeries {
logDensityTimeSeries(rc, createTimes, watchTimes, testArg.getTestName())
}
// Log throughput data.
logPodCreateThroughput(batchLag, e2eLags, testArg.podsNr)
return batchLag, e2eLags
}
@ -333,6 +351,9 @@ func runDensitySeqTest(f *framework.Framework, rc *ResourceCollector, testArg de
// create pods sequentially (back-to-back)
batchlag, e2eLags := createBatchPodSequential(f, testPods)
// Log throughput data.
logPodCreateThroughput(batchlag, e2eLags, testArg.podsNr)
return batchlag, e2eLags
}
@ -454,31 +475,36 @@ func createBatchPodSequential(f *framework.Framework, pods []*api.Pod) (time.Dur
return batchLag, e2eLags
}
// printAndVerifyLatency verifies that whether pod creation latency satisfies the limit.
func printAndVerifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, testArg densityTest, isVerify bool) {
// logAndVerifyLatency verifies that whether pod creation latency satisfies the limit.
func logAndVerifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, podStartupLimits framework.LatencyMetric,
podBatchStartupLimit time.Duration, testName string, isVerify bool) {
framework.PrintLatencies(e2eLags, "worst client e2e total latencies")
// TODO(coufon): do not trust `kubelet' metrics since they are not reset!
// TODO(coufon): do not trust 'kubelet' metrics since they are not reset!
latencyMetrics, _ := getPodStartLatency(kubeletAddr)
framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
// check whether e2e pod startup time is acceptable.
podCreateLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLags)}
framework.Logf("Pod create latency: %s", framework.PrettyPrintJSON(podCreateLatency))
// calculate and log throughput
throughputBatch := float64(testArg.podsNr) / batchLag.Minutes()
framework.Logf("Batch creation throughput is %.1f pods/min", throughputBatch)
throughputSequential := 1.0 / e2eLags[len(e2eLags)-1].Latency.Minutes()
framework.Logf("Sequential creation throughput is %.1f pods/min", throughputSequential)
// log latency perf data
framework.PrintPerfData(getLatencyPerfData(podCreateLatency.Latency, testName))
if isVerify {
framework.ExpectNoError(verifyPodStartupLatency(testArg.podStartupLimits, podCreateLatency.Latency))
// check whether e2e pod startup time is acceptable.
framework.ExpectNoError(verifyPodStartupLatency(podStartupLimits, podCreateLatency.Latency))
// check bactch pod creation latency
if testArg.podBatchStartupLimit > 0 {
Expect(batchLag <= testArg.podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v",
batchLag, testArg.podBatchStartupLimit)
if podBatchStartupLimit > 0 {
Expect(batchLag <= podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v",
batchLag, podBatchStartupLimit)
}
}
}
// logThroughput calculates and logs pod creation throughput.
func logPodCreateThroughput(batchLag time.Duration, e2eLags []framework.PodLatencyData, podsNr int) {
throughputBatch := float64(podsNr) / batchLag.Minutes()
framework.Logf("Batch creation throughput is %.1f pods/min", throughputBatch)
throughputSequential := 1.0 / e2eLags[len(e2eLags)-1].Latency.Minutes()
framework.Logf("Sequential creation throughput is %.1f pods/min", throughputSequential)
}

View File

@ -53,8 +53,6 @@ const (
cadvisorPort = 8090
// housekeeping interval of Cadvisor (second)
houseKeepingInterval = 1
// TODO(coufon): be consistent with perf_util.go version (not exposed)
currentTimeSeriesVersion = "v1"
)
var (
@ -426,39 +424,22 @@ type ResourceSeries struct {
Units map[string]string `json:"unit"`
}
// Time series of resource usage per container
type ResourceSeriesPerContainer struct {
Data map[string]*ResourceSeries `json:"data"`
Labels map[string]string `json:"labels"`
Version string `json:"version"`
}
// GetResourceSeriesWithLabels gets the time series of resource usage of each container.
// TODO(coufon): the labels are to be re-defined based on benchmark dashboard.
func (r *ResourceCollector) GetResourceSeriesWithLabels(labels map[string]string) *ResourceSeriesPerContainer {
seriesPerContainer := &ResourceSeriesPerContainer{
Data: map[string]*ResourceSeries{},
Labels: map[string]string{
"node": framework.TestContext.NodeName,
},
Version: currentTimeSeriesVersion,
}
func (r *ResourceCollector) GetResourceTimeSeries() map[string]*ResourceSeries {
resourceSeries := make(map[string]*ResourceSeries)
for key, name := range systemContainers {
newSeries := &ResourceSeries{Units: map[string]string{
"cpu": "mCPU",
"memory": "MB",
}}
seriesPerContainer.Data[key] = newSeries
resourceSeries[key] = newSeries
for _, usage := range r.buffers[name] {
newSeries.Timestamp = append(newSeries.Timestamp, usage.Timestamp.UnixNano())
newSeries.CPUUsageInMilliCores = append(newSeries.CPUUsageInMilliCores, int64(usage.CPUUsageInCores*1000))
newSeries.MemoryRSSInMegaBytes = append(newSeries.MemoryRSSInMegaBytes, int64(float64(usage.MemoryUsageInBytes)/(1024*1024)))
}
}
for k, v := range labels {
seriesPerContainer.Labels[k] = v
}
return seriesPerContainer
return resourceSeries
}
// Code for getting container name of docker, copied from pkg/kubelet/cm/container_manager_linux.go

View File

@ -67,7 +67,7 @@ var _ = framework.KubeDescribe("Resource-usage [Serial] [Slow]", func() {
Context("regular resource usage tracking", func() {
rTests := []resourceTest{
{
pods: 10,
podsNr: 10,
cpuLimits: framework.ContainersCPUSummary{
stats.SystemContainerKubelet: {0.50: 0.25, 0.95: 0.30},
stats.SystemContainerRuntime: {0.50: 0.30, 0.95: 0.40},
@ -82,45 +82,49 @@ var _ = framework.KubeDescribe("Resource-usage [Serial] [Slow]", func() {
for _, testArg := range rTests {
itArg := testArg
It(fmt.Sprintf("resource tracking for %d pods per node", itArg.pods), func() {
It(fmt.Sprintf("resource tracking for %d pods per node", itArg.podsNr), func() {
runResourceUsageTest(f, rc, itArg)
// Log and verify resource usage
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, true)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, itArg.getTestName(), true)
})
}
})
Context("regular resource usage tracking [Benchmark]", func() {
Context("regular resource usage tracking", func() {
rTests := []resourceTest{
{
pods: 10,
podsNr: 10,
},
{
pods: 35,
podsNr: 35,
},
{
pods: 105,
podsNr: 105,
},
}
for _, testArg := range rTests {
itArg := testArg
It(fmt.Sprintf("resource tracking for %d pods per node", itArg.pods), func() {
It(fmt.Sprintf("resource tracking for %d pods per node [Benchmark]", itArg.podsNr), func() {
runResourceUsageTest(f, rc, itArg)
// Log and verify resource usage
printAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, true)
logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, itArg.getTestName(), true)
})
}
})
})
type resourceTest struct {
pods int
podsNr int
cpuLimits framework.ContainersCPUSummary
memLimits framework.ResourceUsagePerContainer
}
func (rt *resourceTest) getTestName() string {
return fmt.Sprintf("resource_%d", rt.podsNr)
}
// runResourceUsageTest runs the resource usage test
func runResourceUsageTest(f *framework.Framework, rc *ResourceCollector, testArg resourceTest) {
const (
@ -136,7 +140,7 @@ func runResourceUsageTest(f *framework.Framework, rc *ResourceCollector, testArg
defer rc.Stop()
By("Creating a batch of Pods")
pods := newTestPods(testArg.pods, ImageRegistry[pauseImage], "test_pod")
pods := newTestPods(testArg.podsNr, ImageRegistry[pauseImage], "test_pod")
f.PodClient().CreateBatch(pods)
// wait for a while to let the node be steady
@ -168,9 +172,9 @@ func runResourceUsageTest(f *framework.Framework, rc *ResourceCollector, testArg
logPods(f.Client)
}
// printAndVerifyResource prints the resource usage as perf data and verifies whether resource usage satisfies the limit.
func printAndVerifyResource(f *framework.Framework, rc *ResourceCollector, cpuLimits framework.ContainersCPUSummary,
memLimits framework.ResourceUsagePerContainer, isVerify bool) {
// logAndVerifyResource prints the resource usage as perf data and verifies whether resource usage satisfies the limit.
func logAndVerifyResource(f *framework.Framework, rc *ResourceCollector, cpuLimits framework.ContainersCPUSummary,
memLimits framework.ResourceUsagePerContainer, testName string, isVerify bool) {
nodeName := framework.TestContext.NodeName
// Obtain memory PerfData
@ -189,8 +193,10 @@ func printAndVerifyResource(f *framework.Framework, rc *ResourceCollector, cpuLi
cpuSummaryPerNode[nodeName] = cpuSummary
// Print resource usage
framework.PrintPerfData(framework.ResourceUsageToPerfData(usagePerNode))
framework.PrintPerfData(framework.CPUUsageToPerfData(cpuSummaryPerNode))
framework.PrintPerfData(framework.ResourceUsageToPerfDataWithLabels(usagePerNode,
map[string]string{"test": testName, "node": nodeName}))
framework.PrintPerfData(framework.CPUUsageToPerfDataWithLabels(cpuSummaryPerNode,
map[string]string{"test": testName, "node": nodeName}))
// Verify resource usage
if isVerify {