Fix computing of cpu nano core usage

CRI runtimes do not supply cpu nano core usage as it is not part of CRI
stats. However, there are upstream components that still rely on such
stats to function. The previous fix was faulty because the multiple
callers could compete and update the stats, causing
inconsistent/incoherent metrics. This change, instead, creates a
separate call for updating the usage, and rely on eviction manager,
which runs periodically, to trigger the updates. The caveat is that if
eviction manager is completley turned off, no one would compute the
usage.
pull/564/head
Yu-Ju Hong 2019-03-01 18:22:27 -08:00
parent feb0937fa4
commit 191666d6a3
12 changed files with 172 additions and 37 deletions

View File

@ -131,6 +131,7 @@ func TestVolumeStatsCollector(t *testing.T) {
mockStatsProvider := new(statstest.StatsProvider)
mockStatsProvider.On("ListPodStats").Return(podStats, nil)
mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil)
if err := testutil.CollectAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, strings.NewReader(want), metrics...); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}

View File

@ -257,8 +257,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo
return map[string]volume.Volume{}, true
}
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, nil
}
func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }

View File

@ -43,8 +43,15 @@ type Provider interface {
//
// ListPodStats returns the stats of all the containers managed by pods.
ListPodStats() ([]statsapi.PodStats, error)
// ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods.
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
// ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the
// containers managed by pods and force update the cpu usageNanoCores.
// This is a workaround for CRI runtimes that do not integrate with
// cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788
// for more details.
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
// ImageFsStats returns the stats of the image filesystem.
ImageFsStats() (*statsapi.FsStats, error)

View File

@ -84,10 +84,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error)
if err != nil {
return nil, fmt.Errorf("failed to get imageFs stats: %v", err)
}
podStats, err := sp.provider.ListPodStats()
var podStats []statsapi.PodStats
if updateStats {
podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
} else {
podStats, err = sp.provider.ListPodStats()
}
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}
rlimit, err := sp.provider.RlimitStats()
if err != nil {
return nil, fmt.Errorf("failed to get rlimit stats: %v", err)

View File

@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(rlimitStats, nil).

View File

@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(nil, nil).

View File

@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) {
return r0, r1
}
// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields:
func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) {
ret := _m.Called()
var r0 []v1alpha1.PodStats
if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]v1alpha1.PodStats)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ListPodCPUAndMemoryStats provides a mock function with given fields:
func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) {
ret := _m.Called()

View File

@ -155,6 +155,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
return result, nil
}
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// For cadvisor, cpu nano core usages are pre-computed and cached, so this
// function simply calls ListPodStats.
func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return p.ListPodStats()
}
// ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers.
func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
infos, err := getCadvisorContainerInfo(p.cadvisor)

View File

@ -45,6 +45,12 @@ var (
defaultCachePeriod = 10 * time.Minute
)
// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
stats *runtimeapi.CpuUsage
usageNanoCores *uint64
}
// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
@ -63,8 +69,8 @@ type criStatsProvider struct {
logMetricsService LogMetricsService
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
cpuUsageCache map[string]*cpuUsageRecord
mutex sync.RWMutex
}
// newCRIStatsProvider returns a containerStatsProvider implementation that
@ -82,12 +88,32 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}
// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
// Don't update CPU nano core usage.
return p.listPodStats(false)
}
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
// Update CPU nano core usage.
return p.listPodStats(true)
}
func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
// Gets node root filesystem information, which will be used to populate
// the available and capacity bytes/inodes in container stats.
rootFsInfo, err := p.cadvisor.RootFsInfo()
@ -157,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid())
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage)
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs)
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
@ -435,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats(
rootFsInfo *cadvisorapiv2.FsInfo,
fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
uid string,
updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
@ -450,8 +477,12 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
usageNanoCores := p.getContainerUsageNanoCores(stats)
var usageNanoCores *uint64
if updateCPUNanoCoreUsage {
usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
} else {
usageNanoCores = p.getContainerUsageNanoCores(stats)
}
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
@ -541,27 +572,63 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}
// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
// getContainerUsageNanoCores gets the cached usageNanoCores.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
if stats == nil || stats.Attributes == nil {
return nil
}
p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()
p.mutex.RLock()
defer p.mutex.RUnlock()
cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
if !ok || cached.usageNanoCores == nil {
return nil
}
// return a copy of the usage
latestUsage := *cached.usageNanoCores
return &latestUsage
}
nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
// getContainerUsageNanoCores computes usageNanoCores based on the given and
// the cached usageCoreNanoSeconds, updates the cache with the computed
// usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
id := stats.Attributes.Id
usage, err := func() (*uint64, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
cached, ok := p.cpuUsageCache[id]
if !ok || cached.stats.UsageCoreNanoSeconds == nil {
// Cannot compute the usage now, but update the cached stats anyway
p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
return nil, nil
}
newStats := stats.Cpu
cachedStats := cached.stats
nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
if nanoSeconds <= 0 {
return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
}
usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
// Update cache with new value.
usageToUpdate := usageNanoCores
p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
return &usageNanoCores, nil
}()
if err != nil {
// This should not happen. Log now to raise visiblity
klog.Errorf("failed updating cpu usage nano core: %v", err)
}
return usage
}
func (p *criStatsProvider) cleanupOutdatedCaches() {
@ -573,7 +640,7 @@ func (p *criStatsProvider) cleanupOutdatedCaches() {
delete(p.cpuUsageCache, k)
}
if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}

View File

@ -692,17 +692,17 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
cpuUsageCache map[string]*cpuUsageRecord
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -712,7 +712,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -725,7 +725,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -751,11 +751,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
@ -774,11 +776,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
@ -788,7 +792,16 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
// Before the update, the cached value should be nil
cached := provider.getContainerUsageNanoCores(test.stats)
assert.Nil(t, cached)
// Update the cache and get the latest value.
real := provider.getAndUpdateContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)
// After the update, the cached value should be up-to-date
cached = provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, cached, test.desc)
}
}

View File

@ -88,6 +88,7 @@ type StatsProvider struct {
// containers managed by pods.
type containerStatsProvider interface {
ListPodStats() ([]statsapi.PodStats, error)
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
ImageFsStats() (*statsapi.FsStats, error)
ImageFsDevice() (string, error)

View File

@ -682,6 +682,10 @@ func (p fakeContainerStatsProvider) ListPodStats() ([]statsapi.PodStats, error)
return nil, fmt.Errorf("not implemented")
}
func (p fakeContainerStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, fmt.Errorf("not implemented")
}
func (p fakeContainerStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
return nil, fmt.Errorf("not implemented")
}