Merge pull request #73659 from feiskyer/usage-nano-cores

Kubelet: add usageNanoCores from CRI stats provider
pull/564/head
Kubernetes Prow Robot 2019-02-08 19:26:21 -08:00 committed by GitHub
commit ee44e24cd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 170 additions and 0 deletions

View File

@ -22,6 +22,7 @@ import (
"path"
"sort"
"strings"
"sync"
"time"
cadvisorfs "github.com/google/cadvisor/fs"
@ -38,6 +39,11 @@ import (
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
var (
// defaultCachePeriod is the default cache period for each cpuUsage.
defaultCachePeriod = 10 * time.Minute
)
// criStatsProvider implements the containerStatsProvider interface by getting
// the container stats from CRI.
type criStatsProvider struct {
@ -54,6 +60,10 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*runtimeapi.CpuUsage
mutex sync.Mutex
}
// newCRIStatsProvider returns a containerStatsProvider implementation that
@ -71,6 +81,7 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
cpuUsageCache: make(map[string]*runtimeapi.CpuUsage),
}
}
@ -165,6 +176,8 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()
result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
@ -247,6 +260,8 @@ func (p *criStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, erro
}
ps.Containers = append(ps.Containers, *cs)
}
// cleanup outdated caches.
p.cleanupOutdatedCaches()
result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
for _, s := range sandboxIDToPodStats {
@ -450,6 +465,11 @@ func (p *criStatsProvider) makeContainerStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
@ -506,6 +526,11 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
if stats.Cpu.UsageCoreNanoSeconds != nil {
result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
}
usageNanoCores := p.getContainerUsageNanoCores(stats)
if usageNanoCores != nil {
result.CPU.UsageNanoCores = usageNanoCores
}
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
@ -516,6 +541,44 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
return result
}
// getContainerUsageNanoCores gets usageNanoCores based on cached usageCoreNanoSeconds.
func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
if stats == nil || stats.Cpu == nil || stats.Cpu.UsageCoreNanoSeconds == nil {
return nil
}
p.mutex.Lock()
defer func() {
// Update cache with new value.
p.cpuUsageCache[stats.Attributes.Id] = stats.Cpu
p.mutex.Unlock()
}()
cached, ok := p.cpuUsageCache[stats.Attributes.Id]
if !ok || cached.UsageCoreNanoSeconds == nil {
return nil
}
nanoSeconds := stats.Cpu.Timestamp - cached.Timestamp
usageNanoCores := (stats.Cpu.UsageCoreNanoSeconds.Value - cached.UsageCoreNanoSeconds.Value) * uint64(time.Second/time.Nanosecond) / uint64(nanoSeconds)
return &usageNanoCores
}
func (p *criStatsProvider) cleanupOutdatedCaches() {
p.mutex.Lock()
defer p.mutex.Unlock()
for k, v := range p.cpuUsageCache {
if v == nil {
delete(p.cpuUsageCache, k)
}
if time.Since(time.Unix(0, v.Timestamp)) > defaultCachePeriod {
delete(p.cpuUsageCache, k)
}
}
}
// removeTerminatedContainer returns the specified container but with
// the stats of the terminated containers removed.
func removeTerminatedContainer(containers []*runtimeapi.Container) []*runtimeapi.Container {

View File

@ -645,3 +645,110 @@ func makeFakeLogStats(seed int) *volume.Metrics {
m.InodesUsed = resource.NewQuantity(int64(seed+offsetInodeUsage), resource.BinarySI)
return m
}
func TestGetContainerUsageNanoCores(t *testing.T) {
var value0 uint64
var value1 uint64 = 10000000000
tests := []struct {
desc string
cpuUsageCache map[string]*runtimeapi.CpuUsage
stats *runtimeapi.ContainerStats
expected *uint64
}{
{
desc: "should return nil if stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
},
{
desc: "should return nil if cpu stats is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: nil,
},
},
{
desc: "should return nil if usageCoreNanoSeconds is nil",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: nil,
},
},
},
{
desc: "should return nil if cpu stats is not cached yet",
cpuUsageCache: map[string]*runtimeapi.CpuUsage{},
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
},
{
desc: "should return zero value if cached cpu stats is equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: 1,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value0,
},
{
desc: "should return correct value if cached cpu stats is not equal to current value",
stats: &runtimeapi.ContainerStats{
Attributes: &runtimeapi.ContainerAttributes{
Id: "1",
},
Cpu: &runtimeapi.CpuUsage{
Timestamp: int64(time.Second / time.Nanosecond),
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 20000000000,
},
},
},
cpuUsageCache: map[string]*runtimeapi.CpuUsage{
"1": {
Timestamp: 0,
UsageCoreNanoSeconds: &runtimeapi.UInt64Value{
Value: 10000000000,
},
},
},
expected: &value1,
},
}
for _, test := range tests {
provider := &criStatsProvider{cpuUsageCache: test.cpuUsageCache}
real := provider.getContainerUsageNanoCores(test.stats)
assert.Equal(t, test.expected, real, test.desc)
}
}