From ac706ea1de37110639fef66b5cd5d5fb1ceaf174 Mon Sep 17 00:00:00 2001 From: gmarek Date: Tue, 23 Feb 2016 16:45:42 +0100 Subject: [PATCH] Resource gatherer refactoring --- test/e2e/framework.go | 11 +- test/e2e/resource_usage_gatherer.go | 288 ++++++++++++++++------------ 2 files changed, 175 insertions(+), 124 deletions(-) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index f938244a37..410f1208eb 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -51,7 +51,7 @@ type Framework struct { namespacesToDelete []*api.Namespace // Some tests have more than one. NamespaceDeletionTimeout time.Duration - gatherer containerResourceGatherer + gatherer *containerResourceGatherer // Constraints that passed to a check which is executed after data is gathered to // see if 99% of results are within acceptable bounds. It as to be injected in the test, // as expectations vary greatly. Constraints are groupped by the container names. @@ -116,7 +116,12 @@ func (f *Framework) beforeEach() { } if testContext.GatherKubeSystemResourceUsageData { - f.gatherer.startGatheringData(c, resourceDataGatheringPeriodSeconds*time.Second) + f.gatherer, err = NewResourceUsageGatherer(c) + if err != nil { + Logf("Error while creating NewResourceUsageGatherer: %v", err) + } else { + go f.gatherer.startGatheringData() + } } if testContext.GatherLogsSizes { @@ -170,7 +175,7 @@ func (f *Framework) afterEach() { } summaries := make([]TestDataSummary, 0) - if testContext.GatherKubeSystemResourceUsageData { + if testContext.GatherKubeSystemResourceUsageData && f.gatherer != nil { By("Collecting resource usage data") summaries = append(summaries, f.gatherer.stopAndSummarize([]int{90, 99}, f.addonResourceConstraints)) } diff --git a/test/e2e/resource_usage_gatherer.go b/test/e2e/resource_usage_gatherer.go index b0ce209724..2bbb49026b 100644 --- a/test/e2e/resource_usage_gatherer.go +++ b/test/e2e/resource_usage_gatherer.go @@ -34,7 +34,7 @@ import ( ) const ( - resourceDataGatheringPeriodSeconds = 60 + resourceDataGatheringPeriod = 60 * time.Second ) type resourceConstraint struct { @@ -42,12 +42,6 @@ type resourceConstraint struct { memoryConstraint uint64 } -type containerResourceGatherer struct { - usageTimeseries map[time.Time]resourceUsagePerContainer - stopCh chan struct{} - wg sync.WaitGroup -} - type SingleContainerSummary struct { Name string Cpu float64 @@ -75,43 +69,184 @@ func (s *ResourceUsageSummary) PrintJSON() string { return prettyPrintJSON(*s) } -func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) { - g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer) - g.wg.Add(1) - g.stopCh = make(chan struct{}) - go func() error { - defer utilruntime.HandleCrash() - defer g.wg.Done() - for { - select { - case <-time.After(period): - now := time.Now() - data, err := g.getKubeSystemContainersResourceUsage(c) - if err != nil { - Logf("Error while getting resource usage: %v", err) - continue +func computePercentiles(timeSeries []resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { + if len(timeSeries) == 0 { + return make(map[int]resourceUsagePerContainer) + } + dataMap := make(map[string]*usageDataPerContainer) + for i := range timeSeries { + for name, data := range timeSeries[i] { + if dataMap[name] == nil { + dataMap[name] = &usageDataPerContainer{ + cpuData: make([]float64, len(timeSeries)), + memUseData: make([]uint64, len(timeSeries)), + memWorkSetData: make([]uint64, len(timeSeries)), } - g.usageTimeseries[now] = data - case <-g.stopCh: - Logf("Stop channel is closed. Stopping gatherer.") - return nil + } + dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) + dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) + dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) + } + } + for _, v := range dataMap { + sort.Float64s(v.cpuData) + sort.Sort(uint64arr(v.memUseData)) + sort.Sort(uint64arr(v.memWorkSetData)) + } + + result := make(map[int]resourceUsagePerContainer) + for _, perc := range percentilesToCompute { + data := make(resourceUsagePerContainer) + for k, v := range dataMap { + percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 + data[k] = &containerResourceUsage{ + Name: k, + CPUUsageInCores: v.cpuData[percentileIndex], + MemoryUsageInBytes: v.memUseData[percentileIndex], + MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], } } - }() + result[perc] = data + } + return result +} + +func leftMergeData(left, right map[int]resourceUsagePerContainer) map[int]resourceUsagePerContainer { + result := make(map[int]resourceUsagePerContainer) + for percentile, data := range left { + result[percentile] = data + if _, ok := right[percentile]; !ok { + continue + } + for k, v := range right[percentile] { + result[percentile][k] = v + } + } + return result +} + +type resourceGatherWorker struct { + c *client.Client + nodeName string + wg *sync.WaitGroup + containerIDToNameMap map[string]string + containerIDs []string + stopCh chan struct{} + dataSeries []resourceUsagePerContainer +} + +func (w *resourceGatherWorker) singleProbe() { + data := make(resourceUsagePerContainer) + nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, 15*time.Second, func() []string { return w.containerIDs }, true) + if err != nil { + Logf("Error while reading data from %v: %v", w.nodeName, err) + return + } + for k, v := range nodeUsage { + data[w.containerIDToNameMap[k]] = v + } + w.dataSeries = append(w.dataSeries, data) +} + +func (w *resourceGatherWorker) gather(initialSleep time.Duration) { + defer utilruntime.HandleCrash() + defer w.wg.Done() + select { + case <-time.After(initialSleep): + w.singleProbe() + for { + select { + case <-time.After(resourceDataGatheringPeriod): + w.singleProbe() + case <-w.stopCh: + return + } + } + case <-w.stopCh: + return + } +} + +func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) { + delay := resourceDataGatheringPeriod / time.Duration(len(g.workers)) + for i := range g.workers { + go g.workers[i].gather(delay) + } + g.workerWg.Wait() +} + +type containerResourceGatherer struct { + client *client.Client + stopCh chan struct{} + workers []resourceGatherWorker + workerWg sync.WaitGroup + containerIDToNameMap map[string]string + containerIDs []string +} + +func NewResourceUsageGatherer(c *client.Client) (*containerResourceGatherer, error) { + g := containerResourceGatherer{ + client: c, + stopCh: make(chan struct{}), + containerIDToNameMap: make(map[string]string), + containerIDs: make([]string, 0), + } + + pods, err := c.Pods("kube-system").List(api.ListOptions{}) + if err != nil { + Logf("Error while listing Pods: %v", err) + return nil, err + } + for _, pod := range pods.Items { + for _, container := range pod.Status.ContainerStatuses { + containerID := strings.TrimPrefix(container.ContainerID, "docker:/") + g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name + g.containerIDs = append(g.containerIDs, containerID) + } + } + nodeList, err := c.Nodes().List(api.ListOptions{}) + if err != nil { + Logf("Error while listing Nodes: %v", err) + return nil, err + } + + g.workerWg.Add(len(nodeList.Items)) + for _, node := range nodeList.Items { + g.workers = append(g.workers, resourceGatherWorker{ + c: c, + nodeName: node.Name, + wg: &g.workerWg, + containerIDToNameMap: g.containerIDToNameMap, + containerIDs: g.containerIDs, + stopCh: g.stopCh, + }) + } + return &g, nil +} + +// startGatheringData blocks until stopAndSummarize is called. +func (g *containerResourceGatherer) startGatheringData() { + g.getKubeSystemContainersResourceUsage(g.client) } func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary { close(g.stopCh) - Logf("Closed stop channel.") - g.wg.Wait() + Logf("Closed stop channel. Waiting for %v workers", len(g.workers)) + g.workerWg.Wait() Logf("Waitgroup finished.") if len(percentiles) == 0 { Logf("Warning! Empty percentile list for stopAndPrintData.") return &ResourceUsageSummary{} } - stats := g.computePercentiles(g.usageTimeseries, percentiles) + data := make(map[int]resourceUsagePerContainer) + for i := range g.workers { + stats := computePercentiles(g.workers[i].dataSeries, percentiles) + data = leftMergeData(stats, data) + } + + // Workers has been stopped. We need to gather data stored in them. sortedKeys := []string{} - for name := range stats[percentiles[0]] { + for name := range data[percentiles[0]] { sortedKeys = append(sortedKeys, name) } sort.Strings(sortedKeys) @@ -119,7 +254,7 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai summary := make(ResourceUsageSummary) for _, perc := range percentiles { for _, name := range sortedKeys { - usage := stats[perc][name] + usage := data[perc][name] summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{ Name: name, Cpu: usage.CPUUsageInCores, @@ -157,92 +292,3 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai Expect(violatedConstraints).To(BeEmpty()) return &summary } - -func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer { - if len(timeSeries) == 0 { - return make(map[int]resourceUsagePerContainer) - } - dataMap := make(map[string]*usageDataPerContainer) - for _, singleStatistic := range timeSeries { - for name, data := range singleStatistic { - if dataMap[name] == nil { - dataMap[name] = &usageDataPerContainer{ - cpuData: make([]float64, len(timeSeries)), - memUseData: make([]uint64, len(timeSeries)), - memWorkSetData: make([]uint64, len(timeSeries)), - } - } - dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores) - dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes) - dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes) - } - } - for _, v := range dataMap { - sort.Float64s(v.cpuData) - sort.Sort(uint64arr(v.memUseData)) - sort.Sort(uint64arr(v.memWorkSetData)) - } - - result := make(map[int]resourceUsagePerContainer) - for _, perc := range percentilesToCompute { - data := make(resourceUsagePerContainer) - for k, v := range dataMap { - percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1 - data[k] = &containerResourceUsage{ - Name: k, - CPUUsageInCores: v.cpuData[percentileIndex], - MemoryUsageInBytes: v.memUseData[percentileIndex], - MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex], - } - } - result[perc] = data - } - return result -} - -func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) { - pods, err := c.Pods("kube-system").List(api.ListOptions{}) - if err != nil { - return resourceUsagePerContainer{}, err - } - nodes, err := c.Nodes().List(api.ListOptions{}) - if err != nil { - return resourceUsagePerContainer{}, err - } - containerIDToNameMap := make(map[string]string) - containerIDs := make([]string, 0) - for _, pod := range pods.Items { - for _, container := range pod.Status.ContainerStatuses { - containerID := strings.TrimPrefix(container.ContainerID, "docker:/") - containerIDToNameMap[containerID] = pod.Name + "/" + container.Name - containerIDs = append(containerIDs, containerID) - } - } - - mutex := sync.Mutex{} - wg := sync.WaitGroup{} - wg.Add(len(nodes.Items)) - errors := make([]error, 0) - nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap)) - for _, node := range nodes.Items { - go func(nodeName string) { - defer utilruntime.HandleCrash() - defer wg.Done() - nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true) - mutex.Lock() - defer mutex.Unlock() - if err != nil { - errors = append(errors, err) - return - } - for k, v := range nodeUsage { - nameToUsageMap[containerIDToNameMap[k]] = v - } - }(node.Name) - } - wg.Wait() - if len(errors) != 0 { - return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors) - } - return nameToUsageMap, nil -}