Add resource monitoring of kube-system pods

pull/6/head
gmarek 2015-10-29 15:49:23 +01:00
parent fcbf1c1012
commit cfe391d4ee
5 changed files with 211 additions and 11 deletions

View File

@ -112,6 +112,7 @@ var _ = Describe("Density", func() {
framework := NewFramework("density")
framework.NamespaceDeletionTimeout = time.Hour
framework.GatherKubeSystemResourceUsageData = true
BeforeEach(func() {
c = framework.Client

View File

@ -39,6 +39,11 @@ type Framework struct {
Namespace *api.Namespace
Client *client.Client
NamespaceDeletionTimeout time.Duration
// If set to true framework will start a goroutine monitoring resource usage of system add-ons.
// It will read the data every 30 seconds from all Nodes and print summary during afterEach.
GatherKubeSystemResourceUsageData bool
gatherer containerResourceGatherer
}
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
@ -75,6 +80,10 @@ func (f *Framework) beforeEach() {
} else {
Logf("Skipping waiting for service account")
}
if f.GatherKubeSystemResourceUsageData {
f.gatherer.startGatheringData(c, time.Minute)
}
}
// afterEach deletes the namespace, after reading its events.
@ -113,6 +122,10 @@ func (f *Framework) afterEach() {
} else {
Logf("Found DeleteNamespace=false, skipping namespace deletion!")
}
if f.GatherKubeSystemResourceUsageData {
f.gatherer.stopAndPrintData([]int{50, 90, 99, 100})
}
// Paranoia-- prevent reuse!
f.Namespace = nil
f.Client = nil

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"sort"
"strconv"
@ -209,8 +210,10 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa
return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryUsageInBytes > rhs.MemoryUsageInBytes && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
}
type resourceUsagePerContainer map[string]*containerResourceUsage
// getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
// and returns the resource usage of targetContainers for the past
// and returns the resource usage of all containerNames for the past
// cpuInterval.
// The acceptable range of the interval is 2s~120s. Be warned that as the
// interval (and #containers) increases, the size of kubelet's response
@ -223,7 +226,19 @@ func (r *containerResourceUsage) isStrictlyGreaterThan(rhs *containerResourceUsa
// Note that this is an approximation and may not be accurate, hence we also
// write the actual interval used for calculation (based on the timestamps of
// the stats points in containerResourceUsage.CPUInterval.
func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterval time.Duration) (map[string]*containerResourceUsage, error) {
//
// containerNames is a function returning a collection of contianer names in which
// user is interested in. ExpectMissingContainers is a flag which says if the test
// should fail if one of containers listed by containerNames is missing on any node
// (useful e.g. when looking for system containers or daemons). If set to true function
// is more forgiving and ignores missing containers.
func getOneTimeResourceUsageOnNode(
c *client.Client,
nodeName string,
cpuInterval time.Duration,
containerNames func() []string,
expectMissingContainers bool,
) (resourceUsagePerContainer, error) {
numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
if numStats < 2 || numStats > maxNumStatsToRequest {
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
@ -238,12 +253,15 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
return nil, err
}
// Process container infos that are relevant to us.
containers := targetContainers()
usageMap := make(map[string]*containerResourceUsage, len(containers))
containers := containerNames()
usageMap := make(resourceUsagePerContainer, len(containers))
for _, name := range containers {
info, ok := containerInfos[name]
if !ok {
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName)
if !expectMissingContainers {
return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName)
}
continue
}
first := info.Stats[0]
last := info.Stats[len(info.Stats)-1]
@ -252,12 +270,58 @@ func getOneTimeResourceUsageOnNode(c *client.Client, nodeName string, cpuInterva
return usageMap, nil
}
func getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
pods, err := c.Pods("kube-system").List(labels.Everything(), fields.Everything())
if err != nil {
return resourceUsagePerContainer{}, err
}
nodes, err := c.Nodes().List(labels.Everything(), fields.Everything())
if err != nil {
return resourceUsagePerContainer{}, err
}
containerIDToNameMap := make(map[string]string)
containerIDs := make([]string, 0)
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
containerIDs = append(containerIDs, containerID)
}
}
mutex := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(nodes.Items))
errors := make([]error, 0)
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
for _, node := range nodes.Items {
go func(nodeName string) {
defer wg.Done()
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 5*time.Second, func() []string { return containerIDs }, true)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
errors = append(errors, err)
return
}
for k, v := range nodeUsage {
nameToUsageMap[containerIDToNameMap[k]] = v
}
}(node.Name)
}
wg.Wait()
if len(errors) != 0 {
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
}
return nameToUsageMap, nil
}
// logOneTimeResourceUsageSummary collects container resource for the list of
// nodes, formats and logs the stats.
func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInterval time.Duration) {
var summary []string
for _, nodeName := range nodeNames {
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval)
stats, err := getOneTimeResourceUsageOnNode(c, nodeName, cpuInterval, targetContainers, false)
if err != nil {
summary = append(summary, fmt.Sprintf("Error getting resource usage from node %q, err: %v", nodeName, err))
} else {
@ -267,7 +331,7 @@ func logOneTimeResourceUsageSummary(c *client.Client, nodeNames []string, cpuInt
Logf("\n%s", strings.Join(summary, "\n"))
}
func formatResourceUsageStats(nodeName string, containerStats map[string]*containerResourceUsage) string {
func formatResourceUsageStats(nodeName string, containerStats resourceUsagePerContainer) string {
// Example output:
//
// Resource usage for node "e2e-test-foo-minion-abcde":
@ -287,6 +351,120 @@ func formatResourceUsageStats(nodeName string, containerStats map[string]*contai
return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
}
type int64arr []int64
func (a int64arr) Len() int { return len(a) }
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] }
type usageDataPerContainer struct {
cpuData []float64
memUseData []int64
memWorkSetData []int64
}
func computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]resourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for _, v := range timeSeries {
for k := range v {
dataMap[k] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)),
memUseData: make([]int64, len(timeSeries)),
memWorkSetData: make([]int64, len(timeSeries)),
}
}
break
}
for _, singleStatistic := range timeSeries {
for name, data := range singleStatistic {
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
}
}
for _, v := range dataMap {
sort.Float64s(v.cpuData)
sort.Sort(int64arr(v.memUseData))
sort.Sort(int64arr(v.memWorkSetData))
}
result := make(map[int]resourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(resourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &containerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
}
}
result[perc] = data
}
return result
}
type containerResourceGatherer struct {
usageTimeseries map[time.Time]resourceUsagePerContainer
stopCh chan struct{}
timer *time.Ticker
wg sync.WaitGroup
}
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
g.wg.Add(1)
g.stopCh = make(chan struct{})
g.timer = time.NewTicker(period)
go func() error {
for {
select {
case <-g.timer.C:
now := time.Now()
data, err := getKubeSystemContainersResourceUsage(c)
if err != nil {
return err
}
g.usageTimeseries[now] = data
case <-g.stopCh:
g.wg.Done()
return nil
}
}
}()
}
func (g *containerResourceGatherer) stopAndPrintData(percentiles []int) {
close(g.stopCh)
g.timer.Stop()
g.wg.Wait()
if len(percentiles) == 0 {
Logf("Warning! Empty percentile list for stopAndPrintData.")
return
}
stats := computePercentiles(g.usageTimeseries, percentiles)
sortedKeys := []string{}
for name := range stats[percentiles[0]] {
sortedKeys = append(sortedKeys, name)
}
sort.Strings(sortedKeys)
for _, perc := range percentiles {
buf := &bytes.Buffer{}
w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
fmt.Fprintf(w, "container\tcpu(cores)\tmemory(MB)\n")
for _, name := range sortedKeys {
usage := stats[perc][name]
fmt.Fprintf(w, "%q\t%.3f\t%.2f\n", name, usage.CPUUsageInCores, float64(usage.MemoryWorkingSetInBytes)/(1024*1024))
}
w.Flush()
Logf("%v percentile:\n%v", perc, buf.String())
}
}
// Performs a get on a node proxy endpoint given the nodename and rest client.
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result {
return c.Get().

View File

@ -110,7 +110,8 @@ func readLatencyMetrics(c *client.Client) (APIResponsiveness, error) {
}
ignoredResources := sets.NewString("events")
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY")
// TODO: figure out why we're getting non-capitalized proxy and fix this.
ignoredVerbs := sets.NewString("WATCHLIST", "PROXY", "proxy")
for _, sample := range samples {
// Example line:

View File

@ -29,8 +29,6 @@ import (
const datapointAmount = 5
type resourceUsagePerContainer map[string]*containerResourceUsage
var systemContainers = []string{"/docker-daemon", "/kubelet", "/kube-proxy", "/system"}
//TODO tweak those values.
@ -102,7 +100,13 @@ var _ = Describe("Resource usage of system containers", func() {
for i := 0; i < datapointAmount; i++ {
for _, node := range nodeList.Items {
resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second)
resourceUsage, err := getOneTimeResourceUsageOnNode(c, node.Name, 5*time.Second, func() []string {
if providerIs("gce", "gke") {
return systemContainers
} else {
return []string{}
}
}, false)
expectNoError(err)
resourceUsagePerNode[node.Name] = append(resourceUsagePerNode[node.Name], resourceUsage)
}
@ -119,6 +123,9 @@ var _ = Describe("Resource usage of system containers", func() {
for container, cUsage := range usage {
Logf("%v on %v usage: %#v", container, node, cUsage)
if !allowedUsage[container].isStrictlyGreaterThan(cUsage) {
if _, ok := violating[node]; !ok {
violating[node] = make(resourceUsagePerContainer)
}
if allowedUsage[container].CPUUsageInCores < cUsage.CPUUsageInCores {
Logf("CPU is too high for %s (%v)", container, cUsage.CPUUsageInCores)
}