From d387eab8176645ab8b3e3aa104ae79802ba310ab Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Mon, 18 Sep 2017 06:41:07 +0000 Subject: [PATCH] Fix CRI container/imagefs stats. --- pkg/kubelet/apis/cri/services.go | 6 +-- .../apis/cri/testing/fake_image_service.go | 6 +-- .../apis/cri/testing/fake_runtime_service.go | 20 +++++----- pkg/kubelet/dockershim/docker_image.go | 2 +- pkg/kubelet/dockershim/docker_stats.go | 4 +- .../kuberuntime/instrumented_services.go | 12 +++--- pkg/kubelet/remote/remote_image.go | 16 ++++++-- pkg/kubelet/remote/remote_runtime.go | 37 ++++++++++++++++--- pkg/kubelet/stats/cri_stats_provider.go | 8 ++-- 9 files changed, 72 insertions(+), 39 deletions(-) diff --git a/pkg/kubelet/apis/cri/services.go b/pkg/kubelet/apis/cri/services.go index 8a2394dd39..9c8ba0899c 100644 --- a/pkg/kubelet/apis/cri/services.go +++ b/pkg/kubelet/apis/cri/services.go @@ -79,9 +79,9 @@ type PodSandboxManager interface { type ContainerStatsManager interface { // ContainerStats returns stats of the container. If the container does not // exist, the call returns an error. - ContainerStats(req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) + ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) // ListContainerStats returns stats of all running containers. - ListContainerStats(req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) + ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) } // RuntimeService interface should be implemented by a container runtime. @@ -111,5 +111,5 @@ type ImageManagerService interface { // RemoveImage removes the image. RemoveImage(image *runtimeapi.ImageSpec) error // ImageFsInfo returns information of the filesystem that is used to store images. - ImageFsInfo(req *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) + ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) } diff --git a/pkg/kubelet/apis/cri/testing/fake_image_service.go b/pkg/kubelet/apis/cri/testing/fake_image_service.go index 23fa9c77ea..b3e8e0ce2e 100644 --- a/pkg/kubelet/apis/cri/testing/fake_image_service.go +++ b/pkg/kubelet/apis/cri/testing/fake_image_service.go @@ -135,15 +135,13 @@ func (r *FakeImageService) RemoveImage(image *runtimeapi.ImageSpec) error { } // ImageFsInfo returns information of the filesystem that is used to store images. -func (r *FakeImageService) ImageFsInfo(req *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { +func (r *FakeImageService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { r.Lock() defer r.Unlock() r.Called = append(r.Called, "ImageFsInfo") - return &runtimeapi.ImageFsInfoResponse{ - ImageFilesystems: r.FakeFilesystemUsage, - }, nil + return r.FakeFilesystemUsage, nil } func (r *FakeImageService) AssertImagePulledWithAuth(t *testing.T, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, failMsg string) { diff --git a/pkg/kubelet/apis/cri/testing/fake_runtime_service.go b/pkg/kubelet/apis/cri/testing/fake_runtime_service.go index cdff814d2c..69a0b773bf 100644 --- a/pkg/kubelet/apis/cri/testing/fake_runtime_service.go +++ b/pkg/kubelet/apis/cri/testing/fake_runtime_service.go @@ -418,20 +418,20 @@ func (r *FakeRuntimeService) SetFakeContainerStats(containerStats []*runtimeapi. } } -func (r *FakeRuntimeService) ContainerStats(req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { +func (r *FakeRuntimeService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { r.Lock() defer r.Unlock() r.Called = append(r.Called, "ContainerStats") - s, found := r.FakeContainerStats[req.ContainerId] + s, found := r.FakeContainerStats[containerID] if !found { - return nil, fmt.Errorf("no stats for container %q", req.ContainerId) + return nil, fmt.Errorf("no stats for container %q", containerID) } - return &runtimeapi.ContainerStatsResponse{Stats: s}, nil + return s, nil } -func (r *FakeRuntimeService) ListContainerStats(req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { +func (r *FakeRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { r.Lock() defer r.Unlock() @@ -439,14 +439,14 @@ func (r *FakeRuntimeService) ListContainerStats(req *runtimeapi.ListContainerSta var result []*runtimeapi.ContainerStats for _, c := range r.Containers { - if req.Filter != nil { - if req.Filter.Id != "" && req.Filter.Id != c.Id { + if filter != nil { + if filter.Id != "" && filter.Id != c.Id { continue } - if req.Filter.PodSandboxId != "" && req.Filter.PodSandboxId != c.SandboxID { + if filter.PodSandboxId != "" && filter.PodSandboxId != c.SandboxID { continue } - if req.Filter.LabelSelector != nil && !filterInLabels(req.Filter.LabelSelector, c.GetLabels()) { + if filter.LabelSelector != nil && !filterInLabels(filter.LabelSelector, c.GetLabels()) { continue } } @@ -457,5 +457,5 @@ func (r *FakeRuntimeService) ListContainerStats(req *runtimeapi.ListContainerSta result = append(result, s) } - return &runtimeapi.ListContainerStatsResponse{Stats: result}, nil + return result, nil } diff --git a/pkg/kubelet/dockershim/docker_image.go b/pkg/kubelet/dockershim/docker_image.go index 5ad5c1baf3..7482dc420d 100644 --- a/pkg/kubelet/dockershim/docker_image.go +++ b/pkg/kubelet/dockershim/docker_image.go @@ -134,7 +134,7 @@ func getImageRef(client libdocker.Interface, image string) (string, error) { } // ImageFsInfo returns information of the filesystem that is used to store images. -func (ds *dockerService) ImageFsInfo(req *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { +func (ds *dockerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { return nil, fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/dockershim/docker_stats.go b/pkg/kubelet/dockershim/docker_stats.go index e25e3579df..9d840a45fa 100644 --- a/pkg/kubelet/dockershim/docker_stats.go +++ b/pkg/kubelet/dockershim/docker_stats.go @@ -23,10 +23,10 @@ import ( ) // DockerService does not implement container stats. -func (ds *dockerService) ContainerStats(*runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { +func (ds *dockerService) ContainerStats(string) (*runtimeapi.ContainerStats, error) { return nil, fmt.Errorf("Not implemented") } -func (ds *dockerService) ListContainerStats(*runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { +func (ds *dockerService) ListContainerStats(*runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { return nil, fmt.Errorf("Not implemented") } diff --git a/pkg/kubelet/kuberuntime/instrumented_services.go b/pkg/kubelet/kuberuntime/instrumented_services.go index 65c10b1f74..a408f67e0e 100644 --- a/pkg/kubelet/kuberuntime/instrumented_services.go +++ b/pkg/kubelet/kuberuntime/instrumented_services.go @@ -212,20 +212,20 @@ func (in instrumentedRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandbo return out, err } -func (in instrumentedRuntimeService) ContainerStats(req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { +func (in instrumentedRuntimeService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { const operation = "container_stats" defer recordOperation(operation, time.Now()) - out, err := in.service.ContainerStats(req) + out, err := in.service.ContainerStats(containerID) recordError(operation, err) return out, err } -func (in instrumentedRuntimeService) ListContainerStats(req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { +func (in instrumentedRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { const operation = "list_container_stats" defer recordOperation(operation, time.Now()) - out, err := in.service.ListContainerStats(req) + out, err := in.service.ListContainerStats(filter) recordError(operation, err) return out, err } @@ -284,11 +284,11 @@ func (in instrumentedImageManagerService) RemoveImage(image *runtimeapi.ImageSpe return err } -func (in instrumentedImageManagerService) ImageFsInfo(req *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { +func (in instrumentedImageManagerService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { const operation = "image_fs_info" defer recordOperation(operation, time.Now()) - fsInfo, err := in.service.ImageFsInfo(req) + fsInfo, err := in.service.ImageFsInfo() recordError(operation, err) return fsInfo, nil } diff --git a/pkg/kubelet/remote/remote_image.go b/pkg/kubelet/remote/remote_image.go index f0f0eb00fa..d685f07ace 100644 --- a/pkg/kubelet/remote/remote_image.go +++ b/pkg/kubelet/remote/remote_image.go @@ -64,7 +64,7 @@ func (r *RemoteImageService) ListImages(filter *runtimeapi.ImageFilter) ([]*runt Filter: filter, }) if err != nil { - glog.Errorf("ListImages with filter %q from image service failed: %v", filter, err) + glog.Errorf("ListImages with filter %+v from image service failed: %v", filter, err) return nil, err } @@ -135,6 +135,16 @@ func (r *RemoteImageService) RemoveImage(image *runtimeapi.ImageSpec) error { } // ImageFsInfo returns information of the filesystem that is used to store images. -func (r *RemoteImageService) ImageFsInfo(req *runtimeapi.ImageFsInfoRequest) (*runtimeapi.ImageFsInfoResponse, error) { - return nil, fmt.Errorf("not implemented") +func (r *RemoteImageService) ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error) { + // Do not set timeout, because `ImageFsInfo` takes time. + // TODO(random-liu): Should we assume runtime should cache the result, and set timeout here? + ctx, cancel := getContextWithCancel() + defer cancel() + + resp, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}) + if err != nil { + glog.Errorf("ImageFsInfo from image service failed: %v", err) + return nil, err + } + return resp.GetImageFilesystems(), nil } diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index 974c0931ff..3560e3c9b2 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -166,7 +166,7 @@ func (r *RemoteRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandboxFilte Filter: filter, }) if err != nil { - glog.Errorf("ListPodSandbox with filter %q from runtime service failed: %v", filter, err) + glog.Errorf("ListPodSandbox with filter %+v from runtime service failed: %v", filter, err) return nil, err } @@ -259,7 +259,7 @@ func (r *RemoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter Filter: filter, }) if err != nil { - glog.Errorf("ListContainers with filter %q from runtime service failed: %v", filter, err) + glog.Errorf("ListContainers with filter %+v from runtime service failed: %v", filter, err) return nil, err } @@ -444,10 +444,35 @@ func (r *RemoteRuntimeService) Status() (*runtimeapi.RuntimeStatus, error) { return resp.Status, nil } -func (r *RemoteRuntimeService) ContainerStats(req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { - return nil, fmt.Errorf("Not implemented") +// ContainerStats returns the stats of the container. +func (r *RemoteRuntimeService) ContainerStats(containerID string) (*runtimeapi.ContainerStats, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{ + ContainerId: containerID, + }) + if err != nil { + glog.Errorf("ContainerStatus %q from runtime service failed: %v", containerID, err) + return nil, err + } + + return resp.GetStats(), nil } -func (r *RemoteRuntimeService) ListContainerStats(req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { - return nil, fmt.Errorf("Not implemented") +func (r *RemoteRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { + // Do not set timeout, because writable layer stats collection takes time. + // TODO(random-liu): Should we assume runtime should cache the result, and set timeout here? + ctx, cancel := getContextWithCancel() + defer cancel() + + resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{ + Filter: filter, + }) + if err != nil { + glog.Errorf("ListContainerStats with filter %+v from runtime service failed: %v", filter, err) + return nil, err + } + + return resp.GetStats(), nil } diff --git a/pkg/kubelet/stats/cri_stats_provider.go b/pkg/kubelet/stats/cri_stats_provider.go index fb65b6dede..f855e0db41 100644 --- a/pkg/kubelet/stats/cri_stats_provider.go +++ b/pkg/kubelet/stats/cri_stats_provider.go @@ -102,11 +102,11 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { // sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats. sandboxIDToPodStats := make(map[string]*statsapi.PodStats) - resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ListContainerStatsRequest{}) + resp, err := p.runtimeService.ListContainerStats(&runtimeapi.ContainerStatsFilter{}) if err != nil { return nil, fmt.Errorf("failed to list all container stats: %v", err) } - for _, stats := range resp.Stats { + for _, stats := range resp { containerID := stats.Attributes.Id container, found := containerMap[containerID] if !found { @@ -140,7 +140,7 @@ func (p *criStatsProvider) ListPodStats() ([]statsapi.PodStats, error) { // ImageFsStats returns the stats of the image filesystem. func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) { - resp, err := p.imageService.ImageFsInfo(&runtimeapi.ImageFsInfoRequest{}) + resp, err := p.imageService.ImageFsInfo() if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (p *criStatsProvider) ImageFsStats() (*statsapi.FsStats, error) { // return the first one. // // TODO(yguo0905): Support returning stats of multiple image filesystems. - for _, fs := range resp.ImageFilesystems { + for _, fs := range resp { s := &statsapi.FsStats{ Time: metav1.NewTime(time.Unix(0, fs.Timestamp)), UsedBytes: &fs.UsedBytes.Value,