Merge pull request #21759 from gmarek/refactor-gatherer

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-02-24 04:37:14 -08:00
commit 07e9bd82dc
2 changed files with 175 additions and 124 deletions

View File

@ -51,7 +51,7 @@ type Framework struct {
namespacesToDelete []*api.Namespace // Some tests have more than one.
NamespaceDeletionTimeout time.Duration
gatherer containerResourceGatherer
gatherer *containerResourceGatherer
// Constraints that passed to a check which is executed after data is gathered to
// see if 99% of results are within acceptable bounds. It as to be injected in the test,
// as expectations vary greatly. Constraints are groupped by the container names.
@ -116,7 +116,12 @@ func (f *Framework) beforeEach() {
}
if testContext.GatherKubeSystemResourceUsageData {
f.gatherer.startGatheringData(c, resourceDataGatheringPeriodSeconds*time.Second)
f.gatherer, err = NewResourceUsageGatherer(c)
if err != nil {
Logf("Error while creating NewResourceUsageGatherer: %v", err)
} else {
go f.gatherer.startGatheringData()
}
}
if testContext.GatherLogsSizes {
@ -170,7 +175,7 @@ func (f *Framework) afterEach() {
}
summaries := make([]TestDataSummary, 0)
if testContext.GatherKubeSystemResourceUsageData {
if testContext.GatherKubeSystemResourceUsageData && f.gatherer != nil {
By("Collecting resource usage data")
summaries = append(summaries, f.gatherer.stopAndSummarize([]int{90, 99}, f.addonResourceConstraints))
}

View File

@ -34,7 +34,7 @@ import (
)
const (
resourceDataGatheringPeriodSeconds = 60
resourceDataGatheringPeriod = 60 * time.Second
)
type resourceConstraint struct {
@ -42,12 +42,6 @@ type resourceConstraint struct {
memoryConstraint uint64
}
type containerResourceGatherer struct {
usageTimeseries map[time.Time]resourceUsagePerContainer
stopCh chan struct{}
wg sync.WaitGroup
}
type SingleContainerSummary struct {
Name string
Cpu float64
@ -75,43 +69,184 @@ func (s *ResourceUsageSummary) PrintJSON() string {
return prettyPrintJSON(*s)
}
func (g *containerResourceGatherer) startGatheringData(c *client.Client, period time.Duration) {
g.usageTimeseries = make(map[time.Time]resourceUsagePerContainer)
g.wg.Add(1)
g.stopCh = make(chan struct{})
go func() error {
defer utilruntime.HandleCrash()
defer g.wg.Done()
for {
select {
case <-time.After(period):
now := time.Now()
data, err := g.getKubeSystemContainersResourceUsage(c)
if err != nil {
Logf("Error while getting resource usage: %v", err)
continue
func computePercentiles(timeSeries []resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]resourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for i := range timeSeries {
for name, data := range timeSeries[i] {
if dataMap[name] == nil {
dataMap[name] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)),
memUseData: make([]uint64, len(timeSeries)),
memWorkSetData: make([]uint64, len(timeSeries)),
}
g.usageTimeseries[now] = data
case <-g.stopCh:
Logf("Stop channel is closed. Stopping gatherer.")
return nil
}
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
}
}
for _, v := range dataMap {
sort.Float64s(v.cpuData)
sort.Sort(uint64arr(v.memUseData))
sort.Sort(uint64arr(v.memWorkSetData))
}
result := make(map[int]resourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(resourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &containerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
}
}
}()
result[perc] = data
}
return result
}
func leftMergeData(left, right map[int]resourceUsagePerContainer) map[int]resourceUsagePerContainer {
result := make(map[int]resourceUsagePerContainer)
for percentile, data := range left {
result[percentile] = data
if _, ok := right[percentile]; !ok {
continue
}
for k, v := range right[percentile] {
result[percentile][k] = v
}
}
return result
}
type resourceGatherWorker struct {
c *client.Client
nodeName string
wg *sync.WaitGroup
containerIDToNameMap map[string]string
containerIDs []string
stopCh chan struct{}
dataSeries []resourceUsagePerContainer
}
func (w *resourceGatherWorker) singleProbe() {
data := make(resourceUsagePerContainer)
nodeUsage, err := getOneTimeResourceUsageOnNode(w.c, w.nodeName, 15*time.Second, func() []string { return w.containerIDs }, true)
if err != nil {
Logf("Error while reading data from %v: %v", w.nodeName, err)
return
}
for k, v := range nodeUsage {
data[w.containerIDToNameMap[k]] = v
}
w.dataSeries = append(w.dataSeries, data)
}
func (w *resourceGatherWorker) gather(initialSleep time.Duration) {
defer utilruntime.HandleCrash()
defer w.wg.Done()
select {
case <-time.After(initialSleep):
w.singleProbe()
for {
select {
case <-time.After(resourceDataGatheringPeriod):
w.singleProbe()
case <-w.stopCh:
return
}
}
case <-w.stopCh:
return
}
}
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) {
delay := resourceDataGatheringPeriod / time.Duration(len(g.workers))
for i := range g.workers {
go g.workers[i].gather(delay)
}
g.workerWg.Wait()
}
type containerResourceGatherer struct {
client *client.Client
stopCh chan struct{}
workers []resourceGatherWorker
workerWg sync.WaitGroup
containerIDToNameMap map[string]string
containerIDs []string
}
func NewResourceUsageGatherer(c *client.Client) (*containerResourceGatherer, error) {
g := containerResourceGatherer{
client: c,
stopCh: make(chan struct{}),
containerIDToNameMap: make(map[string]string),
containerIDs: make([]string, 0),
}
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
Logf("Error while listing Pods: %v", err)
return nil, err
}
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
g.containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
g.containerIDs = append(g.containerIDs, containerID)
}
}
nodeList, err := c.Nodes().List(api.ListOptions{})
if err != nil {
Logf("Error while listing Nodes: %v", err)
return nil, err
}
g.workerWg.Add(len(nodeList.Items))
for _, node := range nodeList.Items {
g.workers = append(g.workers, resourceGatherWorker{
c: c,
nodeName: node.Name,
wg: &g.workerWg,
containerIDToNameMap: g.containerIDToNameMap,
containerIDs: g.containerIDs,
stopCh: g.stopCh,
})
}
return &g, nil
}
// startGatheringData blocks until stopAndSummarize is called.
func (g *containerResourceGatherer) startGatheringData() {
g.getKubeSystemContainersResourceUsage(g.client)
}
func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constraints map[string]resourceConstraint) *ResourceUsageSummary {
close(g.stopCh)
Logf("Closed stop channel.")
g.wg.Wait()
Logf("Closed stop channel. Waiting for %v workers", len(g.workers))
g.workerWg.Wait()
Logf("Waitgroup finished.")
if len(percentiles) == 0 {
Logf("Warning! Empty percentile list for stopAndPrintData.")
return &ResourceUsageSummary{}
}
stats := g.computePercentiles(g.usageTimeseries, percentiles)
data := make(map[int]resourceUsagePerContainer)
for i := range g.workers {
stats := computePercentiles(g.workers[i].dataSeries, percentiles)
data = leftMergeData(stats, data)
}
// Workers has been stopped. We need to gather data stored in them.
sortedKeys := []string{}
for name := range stats[percentiles[0]] {
for name := range data[percentiles[0]] {
sortedKeys = append(sortedKeys, name)
}
sort.Strings(sortedKeys)
@ -119,7 +254,7 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai
summary := make(ResourceUsageSummary)
for _, perc := range percentiles {
for _, name := range sortedKeys {
usage := stats[perc][name]
usage := data[perc][name]
summary[strconv.Itoa(perc)] = append(summary[strconv.Itoa(perc)], SingleContainerSummary{
Name: name,
Cpu: usage.CPUUsageInCores,
@ -157,92 +292,3 @@ func (g *containerResourceGatherer) stopAndSummarize(percentiles []int, constrai
Expect(violatedConstraints).To(BeEmpty())
return &summary
}
func (g *containerResourceGatherer) computePercentiles(timeSeries map[time.Time]resourceUsagePerContainer, percentilesToCompute []int) map[int]resourceUsagePerContainer {
if len(timeSeries) == 0 {
return make(map[int]resourceUsagePerContainer)
}
dataMap := make(map[string]*usageDataPerContainer)
for _, singleStatistic := range timeSeries {
for name, data := range singleStatistic {
if dataMap[name] == nil {
dataMap[name] = &usageDataPerContainer{
cpuData: make([]float64, len(timeSeries)),
memUseData: make([]uint64, len(timeSeries)),
memWorkSetData: make([]uint64, len(timeSeries)),
}
}
dataMap[name].cpuData = append(dataMap[name].cpuData, data.CPUUsageInCores)
dataMap[name].memUseData = append(dataMap[name].memUseData, data.MemoryUsageInBytes)
dataMap[name].memWorkSetData = append(dataMap[name].memWorkSetData, data.MemoryWorkingSetInBytes)
}
}
for _, v := range dataMap {
sort.Float64s(v.cpuData)
sort.Sort(uint64arr(v.memUseData))
sort.Sort(uint64arr(v.memWorkSetData))
}
result := make(map[int]resourceUsagePerContainer)
for _, perc := range percentilesToCompute {
data := make(resourceUsagePerContainer)
for k, v := range dataMap {
percentileIndex := int(math.Ceil(float64(len(v.cpuData)*perc)/100)) - 1
data[k] = &containerResourceUsage{
Name: k,
CPUUsageInCores: v.cpuData[percentileIndex],
MemoryUsageInBytes: v.memUseData[percentileIndex],
MemoryWorkingSetInBytes: v.memWorkSetData[percentileIndex],
}
}
result[perc] = data
}
return result
}
func (g *containerResourceGatherer) getKubeSystemContainersResourceUsage(c *client.Client) (resourceUsagePerContainer, error) {
pods, err := c.Pods("kube-system").List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
nodes, err := c.Nodes().List(api.ListOptions{})
if err != nil {
return resourceUsagePerContainer{}, err
}
containerIDToNameMap := make(map[string]string)
containerIDs := make([]string, 0)
for _, pod := range pods.Items {
for _, container := range pod.Status.ContainerStatuses {
containerID := strings.TrimPrefix(container.ContainerID, "docker:/")
containerIDToNameMap[containerID] = pod.Name + "/" + container.Name
containerIDs = append(containerIDs, containerID)
}
}
mutex := sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(nodes.Items))
errors := make([]error, 0)
nameToUsageMap := make(resourceUsagePerContainer, len(containerIDToNameMap))
for _, node := range nodes.Items {
go func(nodeName string) {
defer utilruntime.HandleCrash()
defer wg.Done()
nodeUsage, err := getOneTimeResourceUsageOnNode(c, nodeName, 15*time.Second, func() []string { return containerIDs }, true)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
errors = append(errors, err)
return
}
for k, v := range nodeUsage {
nameToUsageMap[containerIDToNameMap[k]] = v
}
}(node.Name)
}
wg.Wait()
if len(errors) != 0 {
return resourceUsagePerContainer{}, fmt.Errorf("Errors while gathering usage data: %v", errors)
}
return nameToUsageMap, nil
}