diff --git a/pkg/kubelet/metrics/collectors/volume_stats_test.go b/pkg/kubelet/metrics/collectors/volume_stats_test.go index b2b6973dc8..ec9a0e0b1a 100644 --- a/pkg/kubelet/metrics/collectors/volume_stats_test.go +++ b/pkg/kubelet/metrics/collectors/volume_stats_test.go @@ -131,6 +131,7 @@ func TestVolumeStatsCollector(t *testing.T) { mockStatsProvider := new(statstest.StatsProvider) mockStatsProvider.On("ListPodStats").Return(podStats, nil) + mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil) if err := testutil.CollectAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, strings.NewReader(want), metrics...); err != nil { t.Errorf("unexpected collecting result:\n%s", err) } diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 70410049b3..94952991e5 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -257,8 +257,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo return map[string]volume.Volume{}, true } -func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } -func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } +func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return nil, nil +} func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil } func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil } func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil } diff --git a/pkg/kubelet/server/stats/handler.go b/pkg/kubelet/server/stats/handler.go index aaba690c42..a5b9b4e5d8 100644 --- a/pkg/kubelet/server/stats/handler.go +++ b/pkg/kubelet/server/stats/handler.go @@ -43,8 +43,15 @@ type Provider interface { // // ListPodStats returns the stats of all the containers managed by pods. ListPodStats() ([]statsapi.PodStats, error) - // ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods. + // ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for + // the containers and returns the stats for all the pod-managed containers. ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) + // ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the + // containers managed by pods and force update the cpu usageNanoCores. + // This is a workaround for CRI runtimes that do not integrate with + // cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788 + // for more details. + ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) // ImageFsStats returns the stats of the image filesystem. ImageFsStats() (*statsapi.FsStats, error) diff --git a/pkg/kubelet/server/stats/summary.go b/pkg/kubelet/server/stats/summary.go index 29d3b31a35..ac7c04c628 100644 --- a/pkg/kubelet/server/stats/summary.go +++ b/pkg/kubelet/server/stats/summary.go @@ -84,10 +84,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error) if err != nil { return nil, fmt.Errorf("failed to get imageFs stats: %v", err) } - podStats, err := sp.provider.ListPodStats() + var podStats []statsapi.PodStats + if updateStats { + podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage() + } else { + podStats, err = sp.provider.ListPodStats() + } if err != nil { return nil, fmt.Errorf("failed to list pod stats: %v", err) } + rlimit, err := sp.provider.RlimitStats() if err != nil { return nil, fmt.Errorf("failed to get rlimit stats: %v", err) diff --git a/pkg/kubelet/server/stats/summary_test.go b/pkg/kubelet/server/stats/summary_test.go index 081036855a..198afd9676 100644 --- a/pkg/kubelet/server/stats/summary_test.go +++ b/pkg/kubelet/server/stats/summary_test.go @@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) { On("GetNodeConfig").Return(nodeConfig). On("GetPodCgroupRoot").Return(cgroupRoot). On("ListPodStats").Return(podStats, nil). + On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil). On("ImageFsStats").Return(imageFsStats, nil). On("RootFsStats").Return(rootFsStats, nil). On("RlimitStats").Return(rlimitStats, nil). diff --git a/pkg/kubelet/server/stats/summary_windows_test.go b/pkg/kubelet/server/stats/summary_windows_test.go index 35846cf479..35eac777bf 100644 --- a/pkg/kubelet/server/stats/summary_windows_test.go +++ b/pkg/kubelet/server/stats/summary_windows_test.go @@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) { On("GetNodeConfig").Return(nodeConfig). On("GetPodCgroupRoot").Return(cgroupRoot). On("ListPodStats").Return(podStats, nil). + On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil). On("ImageFsStats").Return(imageFsStats, nil). On("RootFsStats").Return(rootFsStats, nil). On("RlimitStats").Return(nil, nil). diff --git a/pkg/kubelet/server/stats/testing/mock_stats_provider.go b/pkg/kubelet/server/stats/testing/mock_stats_provider.go index 9db0482ab3..a3251b3e59 100644 --- a/pkg/kubelet/server/stats/testing/mock_stats_provider.go +++ b/pkg/kubelet/server/stats/testing/mock_stats_provider.go @@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) { return r0, r1 } +// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields: +func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) { + ret := _m.Called() + + var r0 []v1alpha1.PodStats + if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]v1alpha1.PodStats) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListPodCPUAndMemoryStats provides a mock function with given fields: func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) { ret := _m.Called() diff --git a/pkg/kubelet/stats/cadvisor_stats_provider.go b/pkg/kubelet/stats/cadvisor_stats_provider.go index a8abfb2664..203a5cd6ba 100644 --- a/pkg/kubelet/stats/cadvisor_stats_provider.go +++ b/pkg/kubelet/stats/cadvisor_stats_provider.go @@ -155,6 +155,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { return result, nil } +// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for +// the containers and returns the stats for all the pod-managed containers. +// For cadvisor, cpu nano core usages are pre-computed and cached, so this +// function simply calls ListPodStats. +func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return p.ListPodStats() +} + // ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers. func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { infos, err := getCadvisorContainerInfo(p.cadvisor) diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index b0ac86b45d..83cc892501 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -45,6 +45,12 @@ var ( defaultCachePeriod = 10 * time.Minute ) +// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores. +type cpuUsageRecord struct { + stats *runtimeapi.CpuUsage + usageNanoCores *uint64 +} + // criStatsProvider implements the containerStatsProvider interface by getting // the container stats from CRI. type criStatsProvider struct { @@ -63,8 +69,8 @@ type criStatsProvider struct { logMetricsService LogMetricsService // cpuUsageCache caches the cpu usage for containers. - cpuUsageCache map[string]*runtimeapi.CpuUsage - mutex sync.Mutex + cpuUsageCache map[string]*cpuUsageRecord + mutex sync.RWMutex } // newCRIStatsProvider returns a containerStatsProvider implementation that @@ -82,12 +88,32 @@ func newCRIStatsProvider( runtimeService: runtimeService, imageService: imageService, logMetricsService: logMetricsService, - cpuUsageCache: make(map[string]*runtimeapi.CpuUsage), + cpuUsageCache: make(map[string]*cpuUsageRecord), } } // ListPodStats returns the stats of all the pod-managed containers. func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { + // Don't update CPU nano core usage. + return p.listPodStats(false) +} + +// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for +// the containers and returns the stats for all the pod-managed containers. +// This is a workaround because CRI runtimes do not supply nano core usages, +// so this function calculate the difference between the current and the last +// (cached) cpu stats to calculate this metrics. The implementation assumes a +// single caller to periodically invoke this function to update the metrics. If +// there exist multiple callers, the period used to compute the cpu usage may +// vary and the usage could be incoherent (e.g., spiky). If no caller calls +// this function, the cpu usage will stay nil. Right now, eviction manager is +// the only caller, and it calls this function every 10s. +func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + // Update CPU nano core usage. + return p.listPodStats(true) +} + +func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) { // Gets node root filesystem information, which will be used to populate // the available and capacity bytes/inodes in container stats. rootFsInfo, err := p.cadvisor.RootFsInfo() @@ -157,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { } // Fill available stats for full set of required pod stats - cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid()) + cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage) p.addPodNetworkStats(ps, podSandboxID, caInfos, cs) p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) @@ -435,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats( rootFsInfo *cadvisorapiv2.FsInfo, fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo, uid string, + updateCPUNanoCoreUsage bool, ) *statsapi.ContainerStats { result := &statsapi.ContainerStats{ Name: stats.Attributes.Metadata.Name, @@ -450,8 +477,12 @@ func (p *criStatsProvider) makeContainerStats( if stats.Cpu.UsageCoreNanoSeconds != nil { result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value } - - usageNanoCores := p.getContainerUsageNanoCores(stats) + var usageNanoCores *uint64 + if updateCPUNanoCoreUsage { + usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats) + } else { + usageNanoCores = p.getContainerUsageNanoCores(stats) + } if usageNanoCores != nil { result.CPU.UsageNanoCores = usageNanoCores } @@ -541,27 +572,63 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats( return result } -// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds. +// getContainerUsageNanoCores gets the cached usageNanoCores. func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { - if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + if stats == nil || stats.Attributes == nil { return nil } - p.mutex.Lock() - defer func() { - // Update cache with new value. - p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu - p.mutex.Unlock() - }() + p.mutex.RLock() + defer p.mutex.RUnlock() cached, ok := p.cpuUsageCache[stats.Attributes.Id] - if !ok || cached.UsageCoreNanoSeconds == nil { + if !ok || cached.usageNanoCores == nil { return nil } + // return a copy of the usage + latestUsage := *cached.usageNanoCores + return &latestUsage +} - nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp - usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds) - return &usageNanoCores +// getContainerUsageNanoCores computes usageNanoCores based on the given and +// the cached usageCoreNanoSeconds, updates the cache with the computed +// usageNanoCores, and returns the usageNanoCores. +func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { + if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + return nil + } + id := stats.Attributes.Id + usage, err := func() (*uint64, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + cached, ok := p.cpuUsageCache[id] + if !ok || cached.stats.UsageCoreNanoSeconds == nil { + // Cannot compute the usage now, but update the cached stats anyway + p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil} + return nil, nil + } + + newStats := stats.Cpu + cachedStats := cached.stats + nanoSeconds := newStats.Timestamp - cachedStats.Timestamp + if nanoSeconds <= 0 { + return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp) + } + usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds) + + // Update cache with new value. + usageToUpdate := usageNanoCores + p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate} + + return &usageNanoCores, nil + }() + + if err != nil { + // This should not happen. Log now to raise visiblity + klog.Errorf("failed updating cpu usage nano core: %v", err) + } + return usage } func (p *criStatsProvider) cleanupOutdatedCaches() { @@ -573,7 +640,7 @@ func (p *criStatsProvider) cleanupOutdatedCaches() { delete(p.cpuUsageCache, k) } - if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod { + if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod { delete(p.cpuUsageCache, k) } } diff --git a/pkg/kubelet/stats/cri_stats_provider_test.go b/pkg/kubelet/stats/cri_stats_provider_test.go index 2c4de8e93f..1ba3878f8f 100644 --- a/pkg/kubelet/stats/cri_stats_provider_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_test.go @@ -692,17 +692,17 @@ func TestGetContainerUsageNanoCores(t *testing.T) { tests := []struct { desc string - cpuUsageCache map[string]*runtimeapi.CpuUsage + cpuUsageCache map[string]*cpuUsageRecord stats *runtimeapi.ContainerStats expected *uint64 }{ { desc: "should return nil if stats is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, }, { desc: "should return nil if cpu stats is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -712,7 +712,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, { desc: "should return nil if usageCoreNanoSeconds is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -725,7 +725,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, { desc: "should return nil if cpu stats is not cached yet", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -751,11 +751,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, }, }, - cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + cpuUsageCache: map[string]*cpuUsageRecord{ "1": { - Timestamp: 0, - UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ - Value: 10000000000, + stats: &runtimeapi.CpuUsage{ + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, }, }, }, @@ -774,11 +776,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, }, }, - cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + cpuUsageCache: map[string]*cpuUsageRecord{ "1": { - Timestamp: 0, - UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ - Value: 10000000000, + stats: &runtimeapi.CpuUsage{ + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, }, }, }, @@ -788,7 +792,16 @@ func TestGetContainerUsageNanoCores(t *testing.T) { for _, test := range tests { provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache} - real := provider.getContainerUsageNanoCores(test.stats) + // Before the update, the cached value should be nil + cached := provider.getContainerUsageNanoCores(test.stats) + assert.Nil(t, cached) + + // Update the cache and get the latest value. + real := provider.getAndUpdateContainerUsageNanoCores(test.stats) assert.Equal(t, test.expected, real, test.desc) + + // After the update, the cached value should be up-to-date + cached = provider.getContainerUsageNanoCores(test.stats) + assert.Equal(t, test.expected, cached, test.desc) } } diff --git a/pkg/kubelet/stats/stats_provider.go b/pkg/kubelet/stats/stats_provider.go index c0d98f900f..6eb7c712c7 100644 --- a/pkg/kubelet/stats/stats_provider.go +++ b/pkg/kubelet/stats/stats_provider.go @@ -88,6 +88,7 @@ type StatsProvider struct { // containers managed by pods. type containerStatsProvider interface { ListPodStats() ([]statsapi.PodStats, error) + ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) ImageFsStats() (*statsapi.FsStats, error) ImageFsDevice() (string, error) diff --git a/pkg/kubelet/stats/stats_provider_test.go b/pkg/kubelet/stats/stats_provider_test.go index 9700a501ca..8e5d6c34ec 100644 --- a/pkg/kubelet/stats/stats_provider_test.go +++ b/pkg/kubelet/stats/stats_provider_test.go @@ -682,6 +682,10 @@ func (p fakeContainerStatsProvider) ListPodStats() ([]statsapi.PodStats, error) return nil, fmt.Errorf("not implemented") } +func (p fakeContainerStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return nil, fmt.Errorf("not implemented") +} + func (p fakeContainerStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, fmt.Errorf("not implemented") }