Include pod logs in the pod ephemeral storage.

Signed-off-by: Lantao Liu <lantaol@google.com>
pull/564/head
Lantao Liu 2019-02-22 14:32:37 -08:00
parent f14c6c95d6
commit 0ac651bfc3
9 changed files with 199 additions and 86 deletions

View File

@ -711,7 +711,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.runtimeCache,
runtimeService,
imageService,
stats.NewLogMetricsService())
stats.NewLogMetricsService(),
kubecontainer.RealOS{})
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})

View File

@ -173,11 +173,11 @@ func buildContainerLogsPath(containerName string, restartCount int) string {
// BuildContainerLogsDirectory builds absolute log directory path for a container in pod.
func BuildContainerLogsDirectory(podNamespace, podName string, podUID types.UID, containerName string) string {
return filepath.Join(buildPodLogsDirectory(podNamespace, podName, podUID), containerName)
return filepath.Join(BuildPodLogsDirectory(podNamespace, podName, podUID), containerName)
}
// buildPodLogsDirectory builds absolute log directory path for a pod sandbox.
func buildPodLogsDirectory(podNamespace, podName string, podUID types.UID) string {
// BuildPodLogsDirectory builds absolute log directory path for a pod sandbox.
func BuildPodLogsDirectory(podNamespace, podName string, podUID types.UID) string {
return filepath.Join(podLogsRootDirectory, strings.Join([]string{podNamespace, podName,
string(podUID)}, logPathDelimiter))
}

View File

@ -103,7 +103,7 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attemp
podSandboxConfig.Hostname = hostname
}
logDir := buildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)
logDir := BuildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)
podSandboxConfig.LogDirectory = logDir
portMappings := []*runtimeapi.PortMapping{}

View File

@ -89,6 +89,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/github.com/golang/mock/gomock:go_default_library",
"//vendor/github.com/google/cadvisor/fs:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",

View File

@ -135,7 +135,7 @@ func (p *cadvisorStatsProvider) ListPodStats() ([]statsapi.PodStats, error) {
copy(ephemeralStats, vstats.EphemeralVolumes)
podStats.VolumeStats = append(vstats.EphemeralVolumes, vstats.PersistentVolumes...)
}
podStats.EphemeralStorage = calcEphemeralStorage(podStats.Containers, ephemeralStats, &rootFsInfo)
podStats.EphemeralStorage = calcEphemeralStorage(podStats.Containers, ephemeralStats, &rootFsInfo, nil, false)
// Lookup the pod-level cgroup's CPU and memory stats
podInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
if podInfo != nil {
@ -225,53 +225,6 @@ func (p *cadvisorStatsProvider) ListPodCPUAndMemoryStats() ([]statsapi.PodStats,
return result, nil
}
func calcEphemeralStorage(containers []statsapi.ContainerStats, volumes []statsapi.VolumeStats, rootFsInfo *cadvisorapiv2.FsInfo) *statsapi.FsStats {
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
for _, container := range containers {
addContainerUsage(result, &container)
}
for _, volume := range volumes {
result.UsedBytes = addUsage(result.UsedBytes, volume.FsStats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, volume.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &volume.FsStats.Time)
}
return result
}
func addContainerUsage(stat *statsapi.FsStats, container *statsapi.ContainerStats) {
if rootFs := container.Rootfs; rootFs != nil {
stat.Time = maxUpdateTime(&stat.Time, &rootFs.Time)
stat.InodesUsed = addUsage(stat.InodesUsed, rootFs.InodesUsed)
stat.UsedBytes = addUsage(stat.UsedBytes, rootFs.UsedBytes)
if logs := container.Logs; logs != nil {
stat.UsedBytes = addUsage(stat.UsedBytes, logs.UsedBytes)
stat.Time = maxUpdateTime(&stat.Time, &logs.Time)
}
}
}
func maxUpdateTime(first, second *metav1.Time) metav1.Time {
if first.Before(second) {
return *second
}
return *first
}
func addUsage(first, second *uint64) *uint64 {
if first == nil {
return second
} else if second == nil {
return first
}
total := *first + *second
return &total
}
// ImageFsStats returns the stats of the filesystem for storing images.
func (p *cadvisorStatsProvider) ImageFsStats() (*statsapi.FsStats, error) {
imageFsInfo, err := p.cadvisor.ImagesFsInfo()

View File

@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"path"
"path/filepath"
"sort"
"strings"
"sync"
@ -34,6 +35,7 @@ import (
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -66,6 +68,8 @@ type criStatsProvider struct {
imageService internalapi.ImageManagerService
// logMetrics provides the metrics for container logs
logMetricsService LogMetricsService
// osInterface is the interface for syscalls.
osInterface kubecontainer.OSInterface
// cpuUsageCache caches the cpu usage for containers.
cpuUsageCache map[string]*cpuUsageRecord
@ -80,6 +84,7 @@ func newCRIStatsProvider(
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
logMetricsService LogMetricsService,
osInterface kubecontainer.OSInterface,
) containerStatsProvider {
return &criStatsProvider{
cadvisor: cadvisor,
@ -87,6 +92,7 @@ func newCRIStatsProvider(
runtimeService: runtimeService,
imageService: imageService,
logMetricsService: logMetricsService,
osInterface: osInterface,
cpuUsageCache: make(map[string]*cpuUsageRecord),
}
}
@ -384,15 +390,27 @@ func buildPodStats(podSandbox *runtimeapi.PodSandbox) *statsapi.PodStats {
}
}
func (p *criStatsProvider) makePodStorageStats(s *statsapi.PodStats, rootFsInfo *cadvisorapiv2.FsInfo) *statsapi.PodStats {
func (p *criStatsProvider) makePodStorageStats(s *statsapi.PodStats, rootFsInfo *cadvisorapiv2.FsInfo) {
podNs := s.PodRef.Namespace
podName := s.PodRef.Name
podUID := types.UID(s.PodRef.UID)
if vstats, found := p.resourceAnalyzer.GetPodVolumeStats(podUID); found {
vstats, found := p.resourceAnalyzer.GetPodVolumeStats(podUID)
if !found {
return
}
podLogDir := kuberuntime.BuildPodLogsDirectory(podNs, podName, podUID)
logStats, err := p.getPodLogStats(podLogDir, rootFsInfo)
if err != nil {
klog.Errorf("Unable to fetch pod log stats for path %s: %v ", podLogDir, err)
// If people do in-place upgrade, there might be pods still using
// the old log path. For those pods, no pod log stats is returned.
// We should continue generating other stats in that case.
// calcEphemeralStorage tolerants logStats == nil.
}
ephemeralStats := make([]statsapi.VolumeStats, len(vstats.EphemeralVolumes))
copy(ephemeralStats, vstats.EphemeralVolumes)
s.VolumeStats = append(vstats.EphemeralVolumes, vstats.PersistentVolumes...)
s.EphemeralStorage = calcEphemeralStorage(s.Containers, ephemeralStats, rootFsInfo)
}
return s
s.EphemeralStorage = calcEphemeralStorage(s.Containers, ephemeralStats, rootFsInfo, logStats, true)
}
func (p *criStatsProvider) addPodNetworkStats(
@ -504,8 +522,8 @@ func (p *criStatsProvider) makeContainerStats(
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = Uint64Ptr(0)
result.CPU.UsageNanoCores = Uint64Ptr(0)
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
result.CPU.UsageNanoCores = uint64Ptr(0)
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
@ -514,7 +532,7 @@ func (p *criStatsProvider) makeContainerStats(
}
} else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = Uint64Ptr(0)
result.Memory.WorkingSetBytes = uint64Ptr(0)
}
if stats.WritableLayer != nil {
result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
@ -546,10 +564,15 @@ func (p *criStatsProvider) makeContainerStats(
// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
// using old log path, empty log stats are returned. This is fine, because we don't
// officially support in-place upgrade anyway.
containerLogPath := kuberuntime.BuildContainerLogsDirectory(meta.GetNamespace(),
var (
containerLogPath = kuberuntime.BuildContainerLogsDirectory(meta.GetNamespace(),
meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName())
// TODO(random-liu): Collect log stats for logs under the pod log directory.
result.Logs = p.getContainerLogStats(containerLogPath, rootFsInfo)
err error
)
result.Logs, err = p.getPathFsStats(containerLogPath, rootFsInfo)
if err != nil {
klog.Errorf("Unable to fetch container log stats for path %s: %v ", containerLogPath, err)
}
return result
}
@ -577,8 +600,8 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
}
} else {
result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.CPU.UsageCoreNanoSeconds = Uint64Ptr(0)
result.CPU.UsageNanoCores = Uint64Ptr(0)
result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
result.CPU.UsageNanoCores = uint64Ptr(0)
}
if stats.Memory != nil {
result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
@ -587,7 +610,7 @@ func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
}
} else {
result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
result.Memory.WorkingSetBytes = Uint64Ptr(0)
result.Memory.WorkingSetBytes = uint64Ptr(0)
}
return result
@ -740,13 +763,11 @@ func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) map[strin
return stats
}
// TODO Cache the metrics in container log manager
func (p *criStatsProvider) getContainerLogStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) *statsapi.FsStats {
func (p *criStatsProvider) getPathFsStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
m := p.logMetricsService.createLogMetricsProvider(path)
logMetrics, err := m.GetMetrics()
if err != nil {
klog.Errorf("Unable to fetch container log stats for path %s: %v ", path, err)
return nil
return nil, err
}
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
@ -759,5 +780,40 @@ func (p *criStatsProvider) getContainerLogStats(path string, rootFsInfo *cadviso
result.UsedBytes = &usedbytes
inodesUsed := uint64(logMetrics.InodesUsed.Value())
result.InodesUsed = &inodesUsed
return result
result.Time = maxUpdateTime(&result.Time, &logMetrics.Time)
return result, nil
}
// getPodLogStats gets stats for logs under the pod log directory. Container logs usually exist
// under the container log directory. However, for some container runtimes, e.g. kata, gvisor,
// they may want to keep some pod level logs, in that case they can put those logs directly under
// the pod log directory. And kubelet will take those logs into account as part of pod ephemeral
// storage.
func (p *criStatsProvider) getPodLogStats(path string, rootFsInfo *cadvisorapiv2.FsInfo) (*statsapi.FsStats, error) {
files, err := p.osInterface.ReadDir(path)
if err != nil {
return nil, err
}
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
for _, f := range files {
if f.IsDir() {
continue
}
// Only include *files* under pod log directory.
fpath := filepath.Join(path, f.Name())
fstats, err := p.getPathFsStats(fpath, rootFsInfo)
if err != nil {
return nil, fmt.Errorf("failed to get fsstats for %q: %v", fpath, err)
}
result.UsedBytes = addUsage(result.UsedBytes, fstats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, fstats.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &fstats.Time)
}
return result, nil
}

View File

@ -18,10 +18,13 @@ package stats
import (
"math/rand"
"os"
"path/filepath"
"runtime"
"testing"
"time"
gomock "github.com/golang/mock/gomock"
cadvisorfs "github.com/google/cadvisor/fs"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
@ -110,6 +113,11 @@ func TestCRIListPodStats(t *testing.T) {
container5 = makeFakeContainer(sandbox3, cName5, 0, true)
containerStats5 = makeFakeContainerStats(container5, imageFsMountpoint)
containerLogStats5 = makeFakeLogStats(5000)
podLogName0 = "pod-log-0"
podLogName1 = "pod-log-1"
podLogStats0 = makeFakeLogStats(5000)
podLogStats1 = makeFakeLogStats(6000)
)
var (
@ -171,9 +179,31 @@ func TestCRIListPodStats(t *testing.T) {
kuberuntime.BuildContainerLogsDirectory("sandbox1-ns", "sandbox1-name", types.UID("sandbox1-uid"), cName2): containerLogStats2,
kuberuntime.BuildContainerLogsDirectory("sandbox2-ns", "sandbox2-name", types.UID("sandbox2-uid"), cName3): containerLogStats4,
kuberuntime.BuildContainerLogsDirectory("sandbox3-ns", "sandbox3-name", types.UID("sandbox3-uid"), cName5): containerLogStats5,
filepath.Join(kuberuntime.BuildPodLogsDirectory("sandbox0-ns", "sandbox0-name", types.UID("sandbox0-uid")), podLogName0): podLogStats0,
filepath.Join(kuberuntime.BuildPodLogsDirectory("sandbox1-ns", "sandbox1-name", types.UID("sandbox1-uid")), podLogName1): podLogStats1,
}
fakeLogStatsProvider := NewFakeLogMetricsService(fakeLogStats)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
fakeOS := &kubecontainertest.FakeOS{}
fakeOS.ReadDirFn = func(path string) ([]os.FileInfo, error) {
var fileInfos []os.FileInfo
mockFI := kubecontainertest.NewMockFileInfo(ctrl)
switch path {
case kuberuntime.BuildPodLogsDirectory("sandbox0-ns", "sandbox0-name", types.UID("sandbox0-uid")):
mockFI.EXPECT().Name().Return(podLogName0)
case kuberuntime.BuildPodLogsDirectory("sandbox1-ns", "sandbox1-name", types.UID("sandbox1-uid")):
mockFI.EXPECT().Name().Return(podLogName1)
default:
return nil, nil
}
mockFI.EXPECT().IsDir().Return(false)
fileInfos = append(fileInfos, mockFI)
return fileInfos, nil
}
provider := NewCRIStatsProvider(
mockCadvisor,
resourceAnalyzer,
@ -182,6 +212,7 @@ func TestCRIListPodStats(t *testing.T) {
fakeRuntimeService,
fakeImageService,
fakeLogStatsProvider,
fakeOS,
)
stats, err := provider.ListPodStats()
@ -199,7 +230,7 @@ func TestCRIListPodStats(t *testing.T) {
assert.Equal(2, len(p0.Containers))
checkEphemeralStorageStats(assert, p0, ephemeralVolumes, []*runtimeapi.ContainerStats{containerStats0, containerStats1},
[]*volume.Metrics{containerLogStats0, containerLogStats1})
[]*volume.Metrics{containerLogStats0, containerLogStats1}, podLogStats0)
containerStatsMap := make(map[string]statsapi.ContainerStats)
for _, s := range p0.Containers {
@ -223,7 +254,8 @@ func TestCRIListPodStats(t *testing.T) {
assert.Equal(sandbox1.CreatedAt, p1.StartTime.UnixNano())
assert.Equal(1, len(p1.Containers))
checkEphemeralStorageStats(assert, p1, ephemeralVolumes, []*runtimeapi.ContainerStats{containerStats2}, []*volume.Metrics{containerLogStats2})
checkEphemeralStorageStats(assert, p1, ephemeralVolumes, []*runtimeapi.ContainerStats{containerStats2},
[]*volume.Metrics{containerLogStats2}, podLogStats1)
c2 := p1.Containers[0]
assert.Equal(cName2, c2.Name)
assert.Equal(container2.CreatedAt, c2.StartTime.UnixNano())
@ -237,7 +269,8 @@ func TestCRIListPodStats(t *testing.T) {
assert.Equal(sandbox2.CreatedAt, p2.StartTime.UnixNano())
assert.Equal(1, len(p2.Containers))
checkEphemeralStorageStats(assert, p2, ephemeralVolumes, []*runtimeapi.ContainerStats{containerStats4}, []*volume.Metrics{containerLogStats4})
checkEphemeralStorageStats(assert, p2, ephemeralVolumes, []*runtimeapi.ContainerStats{containerStats4},
[]*volume.Metrics{containerLogStats4}, nil)
c3 := p2.Containers[0]
assert.Equal(cName3, c3.Name)
@ -352,6 +385,7 @@ func TestCRIListPodCPUAndMemoryStats(t *testing.T) {
fakeRuntimeService,
nil,
nil,
&kubecontainertest.FakeOS{},
)
stats, err := provider.ListPodCPUAndMemoryStats()
@ -471,6 +505,7 @@ func TestCRIImagesFsStats(t *testing.T) {
fakeRuntimeService,
fakeImageService,
fakeLogStatsProvider,
&kubecontainertest.FakeOS{},
)
stats, err := provider.ImageFsStats()
@ -637,7 +672,8 @@ func checkEphemeralStorageStats(assert *assert.Assertions,
actual statsapi.PodStats,
volumes []statsapi.VolumeStats,
containers []*runtimeapi.ContainerStats,
containerLogStats []*volume.Metrics) {
containerLogStats []*volume.Metrics,
podLogStats *volume.Metrics) {
var totalUsed, inodesUsed uint64
for _, container := range containers {
totalUsed = totalUsed + container.WritableLayer.UsedBytes.Value
@ -651,6 +687,12 @@ func checkEphemeralStorageStats(assert *assert.Assertions,
for _, logStats := range containerLogStats {
totalUsed = totalUsed + uint64(logStats.Used.Value())
inodesUsed = inodesUsed + uint64(logStats.InodesUsed.Value())
}
if podLogStats != nil {
totalUsed = totalUsed + uint64(podLogStats.Used.Value())
inodesUsed = inodesUsed + uint64(podLogStats.InodesUsed.Value())
}
assert.Equal(int(totalUsed), int(*actual.EphemeralStorage.UsedBytes))

View File

@ -330,6 +330,64 @@ func getUint64Value(value *uint64) uint64 {
return *value
}
func Uint64Ptr(i uint64) *uint64 {
func uint64Ptr(i uint64) *uint64 {
return &i
}
func calcEphemeralStorage(containers []statsapi.ContainerStats, volumes []statsapi.VolumeStats, rootFsInfo *cadvisorapiv2.FsInfo,
podLogStats *statsapi.FsStats, isCRIStatsProvider bool) *statsapi.FsStats {
result := &statsapi.FsStats{
Time: metav1.NewTime(rootFsInfo.Timestamp),
AvailableBytes: &rootFsInfo.Available,
CapacityBytes: &rootFsInfo.Capacity,
InodesFree: rootFsInfo.InodesFree,
Inodes: rootFsInfo.Inodes,
}
for _, container := range containers {
addContainerUsage(result, &container, isCRIStatsProvider)
}
for _, volume := range volumes {
result.UsedBytes = addUsage(result.UsedBytes, volume.FsStats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, volume.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &volume.FsStats.Time)
}
if podLogStats != nil {
result.UsedBytes = addUsage(result.UsedBytes, podLogStats.UsedBytes)
result.InodesUsed = addUsage(result.InodesUsed, podLogStats.InodesUsed)
result.Time = maxUpdateTime(&result.Time, &podLogStats.Time)
}
return result
}
func addContainerUsage(stat *statsapi.FsStats, container *statsapi.ContainerStats, isCRIStatsProvider bool) {
if rootFs := container.Rootfs; rootFs != nil {
stat.Time = maxUpdateTime(&stat.Time, &rootFs.Time)
stat.InodesUsed = addUsage(stat.InodesUsed, rootFs.InodesUsed)
stat.UsedBytes = addUsage(stat.UsedBytes, rootFs.UsedBytes)
if logs := container.Logs; logs != nil {
stat.UsedBytes = addUsage(stat.UsedBytes, logs.UsedBytes)
// We have accurate container log inode usage for CRI stats provider.
if isCRIStatsProvider {
stat.InodesUsed = addUsage(stat.InodesUsed, logs.InodesUsed)
}
stat.Time = maxUpdateTime(&stat.Time, &logs.Time)
}
}
}
func maxUpdateTime(first, second *metav1.Time) metav1.Time {
if first.Before(second) {
return *second
}
return *first
}
func addUsage(first, second *uint64) *uint64 {
if first == nil {
return second
} else if second == nil {
return first
}
total := *first + *second
return &total
}

View File

@ -42,8 +42,10 @@ func NewCRIStatsProvider(
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
logMetricsService LogMetricsService,
osInterface kubecontainer.OSInterface,
) *StatsProvider {
return newStatsProvider(cadvisor, podManager, runtimeCache, newCRIStatsProvider(cadvisor, resourceAnalyzer, runtimeService, imageService, logMetricsService))
return newStatsProvider(cadvisor, podManager, runtimeCache, newCRIStatsProvider(cadvisor, resourceAnalyzer,
runtimeService, imageService, logMetricsService, osInterface))
}
// NewCadvisorStatsProvider returns a containerStatsProvider that provides both