Merge pull request #74933 from yujuhong/fix-cpu-nano-cores

Fix computing of cpu nano core usage
pull/564/head
Kubernetes Prow Robot 2019-03-05 14:54:34 -08:00 committed by GitHub
commit 4d1b830578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 172 additions and 37 deletions

View File

@ -131,6 +131,7 @@ func TestVolumeStatsCollector(t *testing.T) {
mockStatsProvider := new(statstest.StatsProvider)
mockStatsProvider.On("ListPodStats").Return(podStats, nil)
mockStatsProvider.On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil)
if err := testutil.CollectAndCompare(&volumeStatsCollector{statsProvider: mockStatsProvider}, strings.NewReader(want), metrics...); err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
}

View File

@ -257,8 +257,11 @@ func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Vo
return map[string]volume.Volume{}, true
}
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, nil
}
func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }

View File

@ -43,8 +43,15 @@ type Provider interface {
//
// ListPodStats returns the stats of all the containers managed by pods.
ListPodStats() ([]statsapi.PodStats, error)
// ListPodCPUAndMemoryStats returns the CPU and memory stats of all the containers managed by pods.
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
// ListPodStatsAndUpdateCPUNanoCoreUsage returns the stats of all the
// containers managed by pods and force update the cpu usageNanoCores.
// This is a workaround for CRI runtimes that do not integrate with
// cadvisor. See https://github.com/kubernetes/kubernetes/issues/72788
// for more details.
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
// ImageFsStats returns the stats of the image filesystem.
ImageFsStats() (*statsapi.FsStats, error)

View File

@ -84,10 +84,16 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error)
if err != nil {
return nil, fmt.Errorf("failed to get imageFs stats: %v", err)
}
podStats, err := sp.provider.ListPodStats()
var podStats []statsapi.PodStats
if updateStats {
podStats, err = sp.provider.ListPodStatsAndUpdateCPUNanoCoreUsage()
} else {
podStats, err = sp.provider.ListPodStats()
}
if err != nil {
return nil, fmt.Errorf("failed to list pod stats: %v", err)
}
rlimit, err := sp.provider.RlimitStats()
if err != nil {
return nil, fmt.Errorf("failed to get rlimit stats: %v", err)

View File

@ -74,6 +74,7 @@ func TestSummaryProviderGetStats(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(rlimitStats, nil).

View File

@ -57,6 +57,7 @@ func TestSummaryProvider(t *testing.T) {
On("GetNodeConfig").Return(nodeConfig).
On("GetPodCgroupRoot").Return(cgroupRoot).
On("ListPodStats").Return(podStats, nil).
On("ListPodStatsAndUpdateCPUNanoCoreUsage").Return(podStats, nil).
On("ImageFsStats").Return(imageFsStats, nil).
On("RootFsStats").Return(rootFsStats, nil).
On("RlimitStats").Return(nil, nil).

View File

@ -275,6 +275,29 @@ func (_m *StatsProvider) ListPodStats() ([]v1alpha1.PodStats, error) {
return r0, r1
}
// ListPodStatsAndUpdateCPUNanoCoreUsage provides a mock function with given fields:
func (_m *StatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]v1alpha1.PodStats, error) {
ret := _m.Called()
var r0 []v1alpha1.PodStats
if rf, ok := ret.Get(0).(func() []v1alpha1.PodStats); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]v1alpha1.PodStats)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ListPodCPUAndMemoryStats provides a mock function with given fields:
func (_m *StatsProvider) ListPodCPUAndMemoryStats() ([]v1alpha1.PodStats, error) {
ret := _m.Called()

View File

@ -155,6 +155,14 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
return result, nil
}
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// For cadvisor, cpu nano core usages are pre-computed and cached, so this
// function simply calls ListPodStats.
func (p *cadvisorStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return p.ListPodStats()
}
// ListPodCPUAndMemoryStats returns the cpu and memory stats of all the pod-managed containers.
func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
infos, err := getCadvisorContainerInfo(p.cadvisor)

View File

@ -45,6 +45,12 @@ var (
defaultCachePeriod = 10 * time.Minute
)
// cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
type cpuUsageRecord struct {
stats *runtimeapi.CpuUsage
usageNanoCores *uint64
}
// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
@ -63,8 +69,8 @@ type criStatsProvider struct {
logMetricsService LogMetricsService
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
cpuUsageCache map[string]*cpuUsageRecord
mutex sync.RWMutex
}
// newCRIStatsProvider returns a containerStatsProvider implementation that
@ -82,12 +88,32 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}
// ListPodStats returns the stats of all the pod-managed containers.
func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
// Don't update CPU nano core usage.
return p.listPodStats(false)
}
// ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
// the containers and returns the stats for all the pod-managed containers.
// This is a workaround because CRI runtimes do not supply nano core usages,
// so this function calculate the difference between the current and the last
// (cached) cpu stats to calculate this metrics. The implementation assumes a
// single caller to periodically invoke this function to update the metrics. If
// there exist multiple callers, the period used to compute the cpu usage may
// vary and the usage could be incoherent (e.g., spiky). If no caller calls
// this function, the cpu usage will stay nil. Right now, eviction manager is
// the only caller, and it calls this function every 10s.
func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
// Update CPU nano core usage.
return p.listPodStats(true)
}
func (p *criStatsProvider) listPodStats(updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
// Gets node root filesystem information, which will be used to populate
// the available and capacity bytes/inodes in container stats.
rootFsInfo, err := p.cadvisor.RootFsInfo()
@ -157,7 +183,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
// Fill available stats for full set of required pod stats
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid())
cs := p.makeContainerStats(stats, container, &rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata().GetUid(), updateCPUNanoCoreUsage)
p.addPodNetworkStats(ps, podSandboxID, caInfos, cs)
p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
@ -435,6 +461,7 @@ func (p *criStatsProvider) makeContainerStats(
rootFsInfo *cadvisorapiv2.FsInfo,
fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
uid string,
updateCPUNanoCoreUsage bool,
) *statsapi.ContainerStats {
result := &statsapi.ContainerStats{
Name: stats.Attributes.Metadata.Name,
@ -450,8 +477,12 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
usageNanoCores := p.getContainerUsageNanoCores(stats)
var usageNanoCores *uint64
if updateCPUNanoCoreUsage {
usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
} else {
usageNanoCores = p.getContainerUsageNanoCores(stats)
}
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
@ -541,27 +572,63 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}
// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
// getContainerUsageNanoCores gets the cached usageNanoCores.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
if stats == nil || stats.Attributes == nil {
return nil
}
p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()
p.mutex.RLock()
defer p.mutex.RUnlock()
cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
if !ok || cached.usageNanoCores == nil {
return nil
}
// return a copy of the usage
latestUsage := *cached.usageNanoCores
return &latestUsage
}
nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
// getContainerUsageNanoCores computes usageNanoCores based on the given and
// the cached usageCoreNanoSeconds, updates the cache with the computed
// usageNanoCores, and returns the usageNanoCores.
func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Attributes == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
id := stats.Attributes.Id
usage, err := func() (*uint64, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
cached, ok := p.cpuUsageCache[id]
if !ok || cached.stats.UsageCoreNanoSeconds == nil {
// Cannot compute the usage now, but update the cached stats anyway
p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
return nil, nil
}
newStats := stats.Cpu
cachedStats := cached.stats
nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
if nanoSeconds <= 0 {
return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
}
usageNanoCores := (newStats.UsageCoreNanoSeconds.Value - cachedStats.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
// Update cache with new value.
usageToUpdate := usageNanoCores
p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
return &usageNanoCores, nil
}()
if err != nil {
// This should not happen. Log now to raise visiblity
klog.Errorf("failed updating cpu usage nano core: %v", err)
}
return usage
}
func (p *criStatsProvider) cleanupOutdatedCaches() {
@ -573,7 +640,7 @@ func (p *criStatsProvider) cleanupOutdatedCaches() {
delete(p.cpuUsageCache, k)
}
if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}

View File

@ -692,17 +692,17 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
cpuUsageCache map[string]*cpuUsageRecord
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -712,7 +712,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -725,7 +725,7 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
cpuUsageCache: map[string]*cpuUsageRecord{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
@ -751,11 +751,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
@ -774,11 +776,13 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
cpuUsageCache: map[string]*cpuUsageRecord{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
stats: &runtimeapi.CpuUsage{
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
@ -788,7 +792,16 @@ func TestGetContainerUsageNanoCores(t *testing.T) {
for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
// Before the update, the cached value should be nil
cached := provider.getContainerUsageNanoCores(test.stats)
assert.Nil(t, cached)
// Update the cache and get the latest value.
real := provider.getAndUpdateContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)
// After the update, the cached value should be up-to-date
cached = provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, cached, test.desc)
}
}

View File

@ -88,6 +88,7 @@ type StatsProvider struct {
// containers managed by pods.
type containerStatsProvider interface {
ListPodStats() ([]statsapi.PodStats, error)
ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error)
ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error)
ImageFsStats() (*statsapi.FsStats, error)
ImageFsDevice() (string, error)

View File

@ -682,6 +682,10 @@ func (p fakeContainerStatsProvider) ListPodStats() ([]statsapi.PodStats, error)
return nil, fmt.Errorf("not implemented")
}
func (p fakeContainerStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
return nil, fmt.Errorf("not implemented")
}
func (p fakeContainerStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
return nil, fmt.Errorf("not implemented")
}