diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index b6eb43fd7e..17316ffce8 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -22,6 +22,7 @@ import ( "path" "sort" "strings" + "sync" "time" cadvisorfs "github.com/google/cadvisor/fs" @@ -38,6 +39,11 @@ import ( kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +var ( + // defaultCachePeriod is the default cache period for each cpuUsage. + defaultCachePeriod = 10 * time.Minute +) + // criStatsProvider implements the containerStatsProvider interface by getting // the container stats from CRI. type criStatsProvider struct { @@ -54,6 +60,10 @@ type criStatsProvider struct { imageService internalapi.ImageManagerService // logMetrics provides the metrics for container logs logMetricsService LogMetricsService + + // cpuUsageCache caches the cpu usage for containers. + cpuUsageCache map[string]*runtimeapi.CpuUsage + mutex sync.Mutex } // newCRIStatsProvider returns a containerStatsProvider implementation that @@ -71,6 +81,7 @@ func newCRIStatsProvider( runtimeService: runtimeService, imageService: imageService, logMetricsService: logMetricsService, + cpuUsageCache: make(map[string]*runtimeapi.CpuUsage), } } @@ -165,6 +176,8 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { } ps.Containers = append(ps.Containers, *cs) } + // cleanup outdated caches. + p.cleanupOutdatedCaches() result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats)) for _, s := range sandboxIDToPodStats { @@ -247,6 +260,8 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro } ps.Containers = append(ps.Containers, *cs) } + // cleanup outdated caches. + p.cleanupOutdatedCaches() result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats)) for _, s := range sandboxIDToPodStats { @@ -450,6 +465,11 @@ func (p *criStatsProvider) makeContainerStats( if stats.Cpu.UsageCoreNanoSeconds != nil { result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value } + + usageNanoCores := p.getContainerUsageNanoCores(stats) + if usageNanoCores != nil { + result.CPU.UsageNanoCores = usageNanoCores + } } if stats.Memory != nil { result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp)) @@ -506,6 +526,11 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats( if stats.Cpu.UsageCoreNanoSeconds != nil { result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value } + + usageNanoCores := p.getContainerUsageNanoCores(stats) + if usageNanoCores != nil { + result.CPU.UsageNanoCores = usageNanoCores + } } if stats.Memory != nil { result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp)) @@ -516,6 +541,44 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats( return result } +// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds. +func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { + if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + return nil + } + + p.mutex.Lock() + defer func() { + // Update cache with new value. + p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu + p.mutex.Unlock() + }() + + cached, ok := p.cpuUsageCache[stats.Attributes.Id] + if !ok || cached.UsageCoreNanoSeconds == nil { + return nil + } + + nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp + usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds) + return &usageNanoCores +} + +func (p *criStatsProvider) cleanupOutdatedCaches() { + p.mutex.Lock() + defer p.mutex.Unlock() + + for k, v := range p.cpuUsageCache { + if v == nil { + delete(p.cpuUsageCache, k) + } + + if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod { + delete(p.cpuUsageCache, k) + } + } +} + // removeTerminatedContainer returns the specified container but with // the stats of the terminated containers removed. func removeTerminatedContainer(containers []*runtimeapi.Container) []*runtimeapi.Container { diff --git a/pkg/kubelet/stats/cri_stats_provider_test.go b/pkg/kubelet/stats/cri_stats_provider_test.go index 3e73625a2f..f420f37905 100644 --- a/pkg/kubelet/stats/cri_stats_provider_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_test.go @@ -645,3 +645,110 @@ func makeFakeLogStats(seed int) *volume.Metrics { m.InodesUsed = resource.NewQuantity(int64(seed+offsetInodeUsage), resource.BinarySI) return m } + +func TestGetContainerUsageNanoCores(t *testing.T) { + var value0 uint64 + var value1 uint64 = 10000000000 + + tests := []struct { + desc string + cpuUsageCache map[string]*runtimeapi.CpuUsage + stats *runtimeapi.ContainerStats + expected *uint64 + }{ + { + desc: "should return nil if stats is nil", + cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + }, + { + desc: "should return nil if cpu stats is nil", + cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + stats: &runtimeapi.ContainerStats{ + Attributes: &runtimeapi.ContainerAttributes{ + Id: "1", + }, + Cpu: nil, + }, + }, + { + desc: "should return nil if usageCoreNanoSeconds is nil", + cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + stats: &runtimeapi.ContainerStats{ + Attributes: &runtimeapi.ContainerAttributes{ + Id: "1", + }, + Cpu: &runtimeapi.CpuUsage{ + Timestamp: 1, + UsageCoreNanoSeconds: nil, + }, + }, + }, + { + desc: "should return nil if cpu stats is not cached yet", + cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + stats: &runtimeapi.ContainerStats{ + Attributes: &runtimeapi.ContainerAttributes{ + Id: "1", + }, + Cpu: &runtimeapi.CpuUsage{ + Timestamp: 1, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, + }, + }, + }, + { + desc: "should return zero value if cached cpu stats is equal to current value", + stats: &runtimeapi.ContainerStats{ + Attributes: &runtimeapi.ContainerAttributes{ + Id: "1", + }, + Cpu: &runtimeapi.CpuUsage{ + Timestamp: 1, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, + }, + }, + cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + "1": { + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, + }, + }, + expected: &value0, + }, + { + desc: "should return correct value if cached cpu stats is not equal to current value", + stats: &runtimeapi.ContainerStats{ + Attributes: &runtimeapi.ContainerAttributes{ + Id: "1", + }, + Cpu: &runtimeapi.CpuUsage{ + Timestamp: int64(time.Second / time.Nanosecond), + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 20000000000, + }, + }, + }, + cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + "1": { + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, + }, + }, + expected: &value1, + }, + } + + for _, test := range tests { + provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache} + real := provider.getContainerUsageNanoCores(test.stats) + assert.Equal(t, test.expected, real, test.desc) + } +}