From 191666d6a3dd4a1ba77bd5fbf7696018a4559ebc Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Fri, 1 Mar 2019 18:22:27 -0800 Subject: [PATCH] Fix computing of cpu nano core usage CRI runtimes do not supply cpu nano core usage as it is not part of CRI stats. However, there are upstream components that still rely on such stats to function. The previous fix was faulty because the multiple callers could compete and update the stats, causing inconsistent/incoherent metrics. This change, instead, creates a separate call for updating the usage, and rely on eviction manager, which runs periodically, to trigger the updates. The caveat is that if eviction manager is completley turned off, no one would compute the usage. --- .../metrics/collectors/volume_stats_test.go | 1 + pkg/kubelet/server/server_test.go | 7 +- pkg/kubelet/server/stats/handler.go | 9 +- pkg/kubelet/server/stats/summary.go | 8 +- pkg/kubelet/server/stats/summary_test.go | 1 + .../server/stats/summary_windows_test.go | 1 + .../stats/testing/mock_stats_provider.go | 23 ++++ pkg/kubelet/stats/cadvisor_stats_provider.go | 8 ++ pkg/kubelet/stats/cri_stats_provider.go | 105 ++++++++++++++---- pkg/kubelet/stats/cri_stats_provider_test.go | 41 ++++--- pkg/kubelet/stats/stats_provider.go | 1 + pkg/kubelet/stats/stats_provider_test.go | 4 + 12 files changed, 172 insertions(+), 37 deletions(-) diff --git a/pkg/kubelet/metrics/collectors/volume_stats_test.go b/pkg/kubelet/metrics/collectors/volume_stats_test.go index b2b6973dc8..ec9a0e0b1a 100644 --- a/pkg/kubelet/metrics/collectors/volume_stats_test.go +++ b/pkg/kubelet/metrics/collectors/volume_stats_test.go @@ -131,6 +131,7 @@ func TestVolumeStatsCollector(t *testing.T) { mockStatsProvider := new(statstest.StatsProvider) mockStatsProvider.On("ListPodStats").Return(podStats, nil) + mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil) if err := testutil.CollectAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, strings.NewReader(want), metrics...); err != nil { t.Errorf("unexpected collecting result:\n%s", err) } diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 70410049b3..94952991e5 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -257,8 +257,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo return map[string]volume.Volume{}, true } -func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } -func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil } +func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil } +func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return nil, nil +} func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil } func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil } func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil } diff --git a/pkg/kubelet/server/stats/handler.go b/pkg/kubelet/server/stats/handler.go index aaba690c42..a5b9b4e5d8 100644 --- a/pkg/kubelet/server/stats/handler.go +++ b/pkg/kubelet/server/stats/handler.go @@ -43,8 +43,15 @@ type Provider interface { // // ListPodStats returns the stats of all the containers managed by pods. ListPodStats() ([]statsapi.PodStats, error) - // ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods. + // ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for + // the containers and returns the stats for all the pod-managed containers. ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) + // ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the + // containers managed by pods and force update the cpu usageNanoCores. + // This is a workaround for CRI runtimes that do not integrate with + // cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788 + // for more details. + ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) // ImageFsStats returns the stats of the image filesystem. ImageFsStats() (*statsapi.FsStats, error) diff --git a/pkg/kubelet/server/stats/summary.go b/pkg/kubelet/server/stats/summary.go index 29d3b31a35..ac7c04c628 100644 --- a/pkg/kubelet/server/stats/summary.go +++ b/pkg/kubelet/server/stats/summary.go @@ -84,10 +84,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error) if err != nil { return nil, fmt.Errorf("failed to get imageFs stats: %v", err) } - podStats, err := sp.provider.ListPodStats() + var podStats []statsapi.PodStats + if updateStats { + podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage() + } else { + podStats, err = sp.provider.ListPodStats() + } if err != nil { return nil, fmt.Errorf("failed to list pod stats: %v", err) } + rlimit, err := sp.provider.RlimitStats() if err != nil { return nil, fmt.Errorf("failed to get rlimit stats: %v", err) diff --git a/pkg/kubelet/server/stats/summary_test.go b/pkg/kubelet/server/stats/summary_test.go index 081036855a..198afd9676 100644 --- a/pkg/kubelet/server/stats/summary_test.go +++ b/pkg/kubelet/server/stats/summary_test.go @@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) { On("GetNodeConfig").Return(nodeConfig). On("GetPodCgroupRoot").Return(cgroupRoot). On("ListPodStats").Return(podStats, nil). + On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil). On("ImageFsStats").Return(imageFsStats, nil). On("RootFsStats").Return(rootFsStats, nil). On("RlimitStats").Return(rlimitStats, nil). diff --git a/pkg/kubelet/server/stats/summary_windows_test.go b/pkg/kubelet/server/stats/summary_windows_test.go index 35846cf479..35eac777bf 100644 --- a/pkg/kubelet/server/stats/summary_windows_test.go +++ b/pkg/kubelet/server/stats/summary_windows_test.go @@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) { On("GetNodeConfig").Return(nodeConfig). On("GetPodCgroupRoot").Return(cgroupRoot). On("ListPodStats").Return(podStats, nil). + On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil). On("ImageFsStats").Return(imageFsStats, nil). On("RootFsStats").Return(rootFsStats, nil). On("RlimitStats").Return(nil, nil). diff --git a/pkg/kubelet/server/stats/testing/mock_stats_provider.go b/pkg/kubelet/server/stats/testing/mock_stats_provider.go index 9db0482ab3..a3251b3e59 100644 --- a/pkg/kubelet/server/stats/testing/mock_stats_provider.go +++ b/pkg/kubelet/server/stats/testing/mock_stats_provider.go @@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) { return r0, r1 } +// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields: +func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) { + ret := _m.Called() + + var r0 []v1alpha1.PodStats + if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]v1alpha1.PodStats) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListPodCPUAndMemoryStats provides a mock function with given fields: func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) { ret := _m.Called() diff --git a/pkg/kubelet/stats/cadvisor_stats_provider.go b/pkg/kubelet/stats/cadvisor_stats_provider.go index a8abfb2664..203a5cd6ba 100644 --- a/pkg/kubelet/stats/cadvisor_stats_provider.go +++ b/pkg/kubelet/stats/cadvisor_stats_provider.go @@ -155,6 +155,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { return result, nil } +// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for +// the containers and returns the stats for all the pod-managed containers. +// For cadvisor, cpu nano core usages are pre-computed and cached, so this +// function simply calls ListPodStats. +func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return p.ListPodStats() +} + // ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers. func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { infos, err := getCadvisorContainerInfo(p.cadvisor) diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index b0ac86b45d..83cc892501 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -45,6 +45,12 @@ var ( defaultCachePeriod = 10 * time.Minute ) +// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores. +type cpuUsageRecord struct { + stats *runtimeapi.CpuUsage + usageNanoCores *uint64 +} + // criStatsProvider implements the containerStatsProvider interface by getting // the container stats from CRI. type criStatsProvider struct { @@ -63,8 +69,8 @@ type criStatsProvider struct { logMetricsService LogMetricsService // cpuUsageCache caches the cpu usage for containers. - cpuUsageCache map[string]*runtimeapi.CpuUsage - mutex sync.Mutex + cpuUsageCache map[string]*cpuUsageRecord + mutex sync.RWMutex } // newCRIStatsProvider returns a containerStatsProvider implementation that @@ -82,12 +88,32 @@ func newCRIStatsProvider( runtimeService: runtimeService, imageService: imageService, logMetricsService: logMetricsService, - cpuUsageCache: make(map[string]*runtimeapi.CpuUsage), + cpuUsageCache: make(map[string]*cpuUsageRecord), } } // ListPodStats returns the stats of all the pod-managed containers. func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { + // Don't update CPU nano core usage. + return p.listPodStats(false) +} + +// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for +// the containers and returns the stats for all the pod-managed containers. +// This is a workaround because CRI runtimes do not supply nano core usages, +// so this function calculate the difference between the current and the last +// (cached) cpu stats to calculate this metrics. The implementation assumes a +// single caller to periodically invoke this function to update the metrics. If +// there exist multiple callers, the period used to compute the cpu usage may +// vary and the usage could be incoherent (e.g., spiky). If no caller calls +// this function, the cpu usage will stay nil. Right now, eviction manager is +// the only caller, and it calls this function every 10s. +func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + // Update CPU nano core usage. + return p.listPodStats(true) +} + +func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) { // Gets node root filesystem information, which will be used to populate // the available and capacity bytes/inodes in container stats. rootFsInfo, err := p.cadvisor.RootFsInfo() @@ -157,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { } // Fill available stats for full set of required pod stats - cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid()) + cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage) p.addPodNetworkStats(ps, podSandboxID, caInfos, cs) p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs) @@ -435,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats( rootFsInfo *cadvisorapiv2.FsInfo, fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo, uid string, + updateCPUNanoCoreUsage bool, ) *statsapi.ContainerStats { result := &statsapi.ContainerStats{ Name: stats.Attributes.Metadata.Name, @@ -450,8 +477,12 @@ func (p *criStatsProvider) makeContainerStats( if stats.Cpu.UsageCoreNanoSeconds != nil { result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value } - - usageNanoCores := p.getContainerUsageNanoCores(stats) + var usageNanoCores *uint64 + if updateCPUNanoCoreUsage { + usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats) + } else { + usageNanoCores = p.getContainerUsageNanoCores(stats) + } if usageNanoCores != nil { result.CPU.UsageNanoCores = usageNanoCores } @@ -541,27 +572,63 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats( return result } -// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds. +// getContainerUsageNanoCores gets the cached usageNanoCores. func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { - if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + if stats == nil || stats.Attributes == nil { return nil } - p.mutex.Lock() - defer func() { - // Update cache with new value. - p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu - p.mutex.Unlock() - }() + p.mutex.RLock() + defer p.mutex.RUnlock() cached, ok := p.cpuUsageCache[stats.Attributes.Id] - if !ok || cached.UsageCoreNanoSeconds == nil { + if !ok || cached.usageNanoCores == nil { return nil } + // return a copy of the usage + latestUsage := *cached.usageNanoCores + return &latestUsage +} - nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp - usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds) - return &usageNanoCores +// getContainerUsageNanoCores computes usageNanoCores based on the given and +// the cached usageCoreNanoSeconds, updates the cache with the computed +// usageNanoCores, and returns the usageNanoCores. +func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 { + if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil { + return nil + } + id := stats.Attributes.Id + usage, err := func() (*uint64, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + cached, ok := p.cpuUsageCache[id] + if !ok || cached.stats.UsageCoreNanoSeconds == nil { + // Cannot compute the usage now, but update the cached stats anyway + p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil} + return nil, nil + } + + newStats := stats.Cpu + cachedStats := cached.stats + nanoSeconds := newStats.Timestamp - cachedStats.Timestamp + if nanoSeconds <= 0 { + return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp) + } + usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds) + + // Update cache with new value. + usageToUpdate := usageNanoCores + p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate} + + return &usageNanoCores, nil + }() + + if err != nil { + // This should not happen. Log now to raise visiblity + klog.Errorf("failed updating cpu usage nano core: %v", err) + } + return usage } func (p *criStatsProvider) cleanupOutdatedCaches() { @@ -573,7 +640,7 @@ func (p *criStatsProvider) cleanupOutdatedCaches() { delete(p.cpuUsageCache, k) } - if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod { + if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod { delete(p.cpuUsageCache, k) } } diff --git a/pkg/kubelet/stats/cri_stats_provider_test.go b/pkg/kubelet/stats/cri_stats_provider_test.go index 2c4de8e93f..1ba3878f8f 100644 --- a/pkg/kubelet/stats/cri_stats_provider_test.go +++ b/pkg/kubelet/stats/cri_stats_provider_test.go @@ -692,17 +692,17 @@ func TestGetContainerUsageNanoCores(t *testing.T) { tests := []struct { desc string - cpuUsageCache map[string]*runtimeapi.CpuUsage + cpuUsageCache map[string]*cpuUsageRecord stats *runtimeapi.ContainerStats expected *uint64 }{ { desc: "should return nil if stats is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, }, { desc: "should return nil if cpu stats is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -712,7 +712,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, { desc: "should return nil if usageCoreNanoSeconds is nil", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -725,7 +725,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, { desc: "should return nil if cpu stats is not cached yet", - cpuUsageCache: map[string]*runtimeapi.CpuUsage{}, + cpuUsageCache: map[string]*cpuUsageRecord{}, stats: &runtimeapi.ContainerStats{ Attributes: &runtimeapi.ContainerAttributes{ Id: "1", @@ -751,11 +751,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, }, }, - cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + cpuUsageCache: map[string]*cpuUsageRecord{ "1": { - Timestamp: 0, - UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ - Value: 10000000000, + stats: &runtimeapi.CpuUsage{ + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, }, }, }, @@ -774,11 +776,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) { }, }, }, - cpuUsageCache: map[string]*runtimeapi.CpuUsage{ + cpuUsageCache: map[string]*cpuUsageRecord{ "1": { - Timestamp: 0, - UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ - Value: 10000000000, + stats: &runtimeapi.CpuUsage{ + Timestamp: 0, + UsageCoreNanoSeconds: &runtimeapi.UInt64Value{ + Value: 10000000000, + }, }, }, }, @@ -788,7 +792,16 @@ func TestGetContainerUsageNanoCores(t *testing.T) { for _, test := range tests { provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache} - real := provider.getContainerUsageNanoCores(test.stats) + // Before the update, the cached value should be nil + cached := provider.getContainerUsageNanoCores(test.stats) + assert.Nil(t, cached) + + // Update the cache and get the latest value. + real := provider.getAndUpdateContainerUsageNanoCores(test.stats) assert.Equal(t, test.expected, real, test.desc) + + // After the update, the cached value should be up-to-date + cached = provider.getContainerUsageNanoCores(test.stats) + assert.Equal(t, test.expected, cached, test.desc) } } diff --git a/pkg/kubelet/stats/stats_provider.go b/pkg/kubelet/stats/stats_provider.go index c0d98f900f..6eb7c712c7 100644 --- a/pkg/kubelet/stats/stats_provider.go +++ b/pkg/kubelet/stats/stats_provider.go @@ -88,6 +88,7 @@ type StatsProvider struct { // containers managed by pods. type containerStatsProvider interface { ListPodStats() ([]statsapi.PodStats, error) + ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) ImageFsStats() (*statsapi.FsStats, error) ImageFsDevice() (string, error) diff --git a/pkg/kubelet/stats/stats_provider_test.go b/pkg/kubelet/stats/stats_provider_test.go index 9700a501ca..8e5d6c34ec 100644 --- a/pkg/kubelet/stats/stats_provider_test.go +++ b/pkg/kubelet/stats/stats_provider_test.go @@ -682,6 +682,10 @@ func (p fakeContainerStatsProvider) ListPodStats() ([]statsapi.PodStats, error) return nil, fmt.Errorf("not implemented") } +func (p fakeContainerStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { + return nil, fmt.Errorf("not implemented") +} + func (p fakeContainerStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, fmt.Errorf("not implemented") }