Track image storage usage for docker containers

add image fs info to summary stats API.
Adding node e2e test for image stats.

Signed-off-by: Vishnu kannan <vishnuk@google.com>
pull/6/head
Vishnu kannan 2016-03-28 17:05:02 -07:00
parent 596c96da8a
commit e566948a75
19 changed files with 321 additions and 29 deletions

View File

@ -46,6 +46,16 @@ type NodeStats struct {
// Stats pertaining to total usage of filesystem resources on the rootfs used by node k8s components.
// NodeFs.Used is the total bytes used on the filesystem.
Fs *FsStats `json:"fs,omitempty"`
// Stats about the underlying container runtime.
Runtime *RuntimeStats `json:"runtime,omitempty"`
}
// Stats pertaining to the underlying container runtime.
type RuntimeStats struct {
// Stats about the underlying filesystem where container images are stored.
// This filesystem could be the same as the primary (root) filesystem.
// Usage here refers to the total number of bytes occupied by images on the filesystem.
ImageFs *FsStats `json:"imageFs,omitempty"`
}
const (

View File

@ -46,6 +46,12 @@ type ImageSpec struct {
Image string
}
// ImageStats contains statistics about all the images currently available.
type ImageStats struct {
// Total amount of storage consumed by existing images.
TotalStorageBytes uint64
}
// Runtime interface defines the interfaces that should be implemented
// by a container runtime.
// Thread safety is required from implementations of this interface.
@ -86,6 +92,8 @@ type Runtime interface {
ListImages() ([]Image, error)
// Removes the specified image.
RemoveImage(image ImageSpec) error
// Returns Image statistics.
ImageStats() (*ImageStats, error)
// TODO(vmarmol): Unify pod and containerID args.
// GetContainerLogs returns logs of a specific container. By
// default, it returns a snapshot of the container log. Set 'follow' to true to

View File

@ -353,3 +353,11 @@ func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy) error {
f.CalledFunctions = append(f.CalledFunctions, "GarbageCollect")
return f.Err
}
func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "ImageStats")
return nil, f.Err
}

View File

@ -137,3 +137,8 @@ func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy) error {
args := r.Called(gcPolicy)
return args.Error(0)
}
func (r *Mock) ImageStats() (*ImageStats, error) {
args := r.Called()
return args.Get(0).(*ImageStats), args.Error(1)
}

View File

@ -67,6 +67,7 @@ type DockerInterface interface {
ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
ImageHistory(id string) ([]dockertypes.ImageHistory, error)
Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
Version() (*dockertypes.Version, error)
Info() (*dockertypes.Info, error)

View File

@ -55,6 +55,7 @@ type FakeDockerClient struct {
ExecInspect *dockertypes.ContainerExecInspect
execCmd []string
EnableSleep bool
ImageHistoryMap map[string][]dockertypes.ImageHistory
}
// We don't check docker version now, just set the docker version of fake docker client to 1.8.1.
@ -482,6 +483,12 @@ func (f *FakeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemov
return []dockertypes.ImageDelete{{Deleted: image}}, err
}
func (f *FakeDockerClient) InjectImages(images []dockertypes.Image) {
f.Lock()
defer f.Unlock()
f.Images = append(f.Images, images...)
}
func (f *FakeDockerClient) updateContainerStatus(id, status string) {
for i := range f.RunningContainerList {
if f.RunningContainerList[i].ID == id {
@ -528,6 +535,18 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
}
return false, nil
}
func (f *FakeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
f.Lock()
defer f.Unlock()
history := f.ImageHistoryMap[id]
return history, nil
}
func (f *FakeDockerClient) InjectImageHistory(data map[string][]dockertypes.ImageHistory) {
f.Lock()
defer f.Unlock()
f.ImageHistoryMap = data
}
// dockerTimestampToString converts the timestamp to string
func dockerTimestampToString(t time.Time) string {

View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"fmt"
"github.com/golang/glog"
dockertypes "github.com/docker/engine-api/types"
runtime "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/sets"
)
// imageStatsProvider exposes stats about all images currently available.
type imageStatsProvider struct {
// Docker remote API client
c DockerInterface
}
func (isp *imageStatsProvider) ImageStats() (*runtime.ImageStats, error) {
images, err := isp.c.ListImages(dockertypes.ImageListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list docker images - %v", err)
}
// A map of all the image layers to its corresponding size.
imageMap := sets.NewString()
ret := &runtime.ImageStats{}
for _, image := range images {
// Get information about the various layers of each docker image.
history, err := isp.c.ImageHistory(image.ID)
if err != nil {
glog.V(2).Infof("failed to get history of docker image %v - %v", image, err)
continue
}
// Store size information of each layer.
for _, layer := range history {
// Skip empty layers.
if layer.Size == 0 {
glog.V(10).Infof("skipping image layer %v with size 0", layer)
continue
}
key := layer.ID
// Some of the layers are empty.
// We are hoping that these layers are unique to each image.
// Still keying with the CreatedBy field to be safe.
if key == "" || key == "<missing>" {
key = key + layer.CreatedBy
}
if !imageMap.Has(key) {
ret.TotalStorageBytes += uint64(layer.Size)
}
imageMap.Insert(key)
}
}
return ret, nil
}

View File

@ -0,0 +1,103 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"testing"
dockertypes "github.com/docker/engine-api/types"
"github.com/stretchr/testify/assert"
)
func TestImageStatsNoImages(t *testing.T) {
fakeDockerClient := NewFakeDockerClientWithVersion("1.2.3", "1.2")
isp := &imageStatsProvider{fakeDockerClient}
st, err := isp.ImageStats()
as := assert.New(t)
as.NoError(err)
as.Equal(st.TotalStorageBytes, uint64(0))
}
func TestImageStatsWithImages(t *testing.T) {
fakeDockerClient := NewFakeDockerClientWithVersion("1.2.3", "1.2")
fakeHistoryData := map[string][]dockertypes.ImageHistory{
"busybox": {
{
ID: "0123456",
CreatedBy: "foo",
Size: 100,
},
{
ID: "0123457",
CreatedBy: "duplicate",
Size: 200,
},
{
ID: "<missing>",
CreatedBy: "baz",
Size: 300,
},
},
"kubelet": {
{
ID: "1123456",
CreatedBy: "foo",
Size: 200,
},
{
ID: "<missing>",
CreatedBy: "1baz",
Size: 400,
},
},
"busybox-new": {
{
ID: "01234567",
CreatedBy: "foo",
Size: 100,
},
{
ID: "0123457",
CreatedBy: "duplicate",
Size: 200,
},
{
ID: "<missing>",
CreatedBy: "baz",
Size: 300,
},
},
}
fakeDockerClient.InjectImageHistory(fakeHistoryData)
fakeDockerClient.InjectImages([]dockertypes.Image{
{
ID: "busybox",
},
{
ID: "kubelet",
},
{
ID: "busybox-new",
},
})
isp := &imageStatsProvider{fakeDockerClient}
st, err := isp.ImageStats()
as := assert.New(t)
as.NoError(err)
const expectedOutput uint64 = 1300
as.Equal(expectedOutput, st.TotalStorageBytes, "expected %d, got %d", expectedOutput, st.TotalStorageBytes)
}

View File

@ -199,3 +199,12 @@ func (in instrumentedDockerInterface) AttachToContainer(id string, opts dockerty
recordError(operation, err)
return err
}
func (in instrumentedDockerInterface) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
const operation = "image_history"
defer recordOperation(operation, time.Now())
out, err := in.client.ImageHistory(id)
recordError(operation, err)
return out, err
}

View File

@ -151,6 +151,10 @@ func (d *kubeDockerClient) InspectImage(image string) (*dockertypes.ImageInspect
return &resp, nil
}
func (d *kubeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
return d.client.ImageHistory(getDefaultContext(), id)
}
func (d *kubeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
images, err := d.client.ImageList(getDefaultContext(), opts)
if err != nil {

View File

@ -163,6 +163,9 @@ type DockerManager struct {
// The api version cache of docker daemon.
versionCache *cache.VersionCache
// Provides image stats
*imageStatsProvider
}
// A subset of the pod.Manager interface extracted for testing purposes.
@ -240,6 +243,7 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota,
enableCustomMetrics: enableCustomMetrics,
configureHairpinMode: hairpinMode,
imageStatsProvider: &imageStatsProvider{client},
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
if serializeImagePulls {

View File

@ -342,8 +342,6 @@ func NewMainKubelet(
enableCustomMetrics: enableCustomMetrics,
babysitDaemons: babysitDaemons,
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod)
if klet.flannelExperimentalOverlay {
glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
@ -440,6 +438,9 @@ func NewMainKubelet(
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, volumeStatsAggPeriod, klet.containerRuntime)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, util.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0)
klet.updatePodCIDR(podCIDR)
@ -3579,11 +3580,11 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
}
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers)
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
}
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
}
// GetRuntime returns the current Runtime implementation in use by the kubelet. This func

View File

@ -1695,3 +1695,8 @@ func (r *Runtime) GetPodStatus(uid types.UID, name, namespace string) (*kubecont
return podStatus, nil
}
// FIXME: I need to be implemented.
func (r *Runtime) ImageStats() (*kubecontainer.ImageStats, error) {
return &kubecontainer.ImageStats{}, nil
}

View File

@ -67,6 +67,7 @@ type Server struct {
host HostInterface
restfulCont containerInterface
resourceAnalyzer stats.ResourceAnalyzer
runtime kubecontainer.Runtime
}
type TLSOptions struct {
@ -104,9 +105,17 @@ func (a *filteringContainer) RegisteredHandlePaths() []string {
}
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers bool) {
func ListenAndServeKubeletServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
address net.IP,
port uint,
tlsOptions *TLSOptions,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime)
s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
@ -121,9 +130,9 @@ func ListenAndServeKubeletServer(host HostInterface, resourceAnalyzer stats.Reso
}
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) {
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false)
s := NewServer(host, resourceAnalyzer, nil, false, runtime)
server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -169,12 +178,18 @@ type HostInterface interface {
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, enableDebuggingHandlers bool) Server {
func NewServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) Server {
server := Server{
host: host,
resourceAnalyzer: resourceAnalyzer,
auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()},
runtime: runtime,
}
if auth != nil {
server.InstallAuthFilter()

View File

@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
@ -208,9 +209,10 @@ func newServerTest() *serverTestFramework {
}
server := NewServer(
fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}),
fw.fakeAuth,
true)
true,
&kubecontainertesting.Mock{})
fw.serverUnderTest = &server
// TODO: Close() this when fix #19254
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)

View File

@ -16,7 +16,11 @@ limitations under the License.
package stats
import "time"
import (
"time"
"k8s.io/kubernetes/pkg/kubelet/container"
)
// ResourceAnalyzer provides statistics on node resource consumption
type ResourceAnalyzer interface {
@ -35,9 +39,9 @@ type resourceAnalyzer struct {
var _ ResourceAnalyzer = &resourceAnalyzer{}
// NewResourceAnalyzer returns a new ResourceAnalyzer
func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration) ResourceAnalyzer {
func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration, runtime container.Runtime) ResourceAnalyzer {
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency)
summaryProvider := NewSummaryProvider(statsProvider, fsAnalyzer)
summaryProvider := NewSummaryProvider(statsProvider, fsAnalyzer, runtime)
return &resourceAnalyzer{fsAnalyzer, summaryProvider}
}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -45,15 +46,16 @@ type SummaryProvider interface {
type summaryProviderImpl struct {
provider StatsProvider
fsResourceAnalyzer fsResourceAnalyzerInterface
runtime container.Runtime
}
var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a new SummaryProvider
func NewSummaryProvider(statsProvider StatsProvider, fsResourceAnalyzer fsResourceAnalyzerInterface) SummaryProvider {
func NewSummaryProvider(statsProvider StatsProvider, fsResourceAnalyzer fsResourceAnalyzerInterface, cruntime container.Runtime) SummaryProvider {
stackBuff := []byte{}
runtime.Stack(stackBuff, false)
return &summaryProviderImpl{statsProvider, fsResourceAnalyzer}
return &summaryProviderImpl{statsProvider, fsResourceAnalyzer, cruntime}
}
// Get implements the SummaryProvider interface
@ -83,8 +85,11 @@ func (sp *summaryProviderImpl) Get() (*stats.Summary, error) {
if err != nil {
return nil, err
}
sb := &summaryBuilder{sp.fsResourceAnalyzer, node, nodeConfig, rootFsInfo, imageFsInfo, infos}
imageStats, err := sp.runtime.ImageStats()
if err != nil || imageStats == nil {
return nil, err
}
sb := &summaryBuilder{sp.fsResourceAnalyzer, node, nodeConfig, rootFsInfo, imageFsInfo, *imageStats, infos}
return sb.build()
}
@ -95,6 +100,7 @@ type summaryBuilder struct {
nodeConfig cm.NodeConfig
rootFsInfo cadvisorapiv2.FsInfo
imageFsInfo cadvisorapiv2.FsInfo
imageStats container.ImageStats
infos map[string]cadvisorapiv2.ContainerInfo
}
@ -116,6 +122,13 @@ func (sb *summaryBuilder) build() (*stats.Summary, error) {
CapacityBytes: &sb.rootFsInfo.Capacity,
UsedBytes: &sb.rootFsInfo.Usage},
StartTime: rootStats.StartTime,
Runtime: &stats.RuntimeStats{
ImageFs: &stats.FsStats{
AvailableBytes: &sb.imageFsInfo.Available,
CapacityBytes: &sb.imageFsInfo.Capacity,
UsedBytes: &sb.imageStats.TotalStorageBytes,
},
},
}
systemContainers := map[string]string{
@ -152,7 +165,6 @@ func (sb *summaryBuilder) containerInfoV2FsStats(
AvailableBytes: &sb.imageFsInfo.Available,
CapacityBytes: &sb.imageFsInfo.Capacity,
}
lcs, found := sb.latestContainerStats(info)
if !found {
return

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
kubestats "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/leaky"
)
@ -127,7 +128,7 @@ func TestBuildSummary(t *testing.T) {
}
sb := &summaryBuilder{
newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5), &node, nodeConfig, rootfs, imagefs, infos}
newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5), &node, nodeConfig, rootfs, imagefs, container.ImageStats{}, infos}
summary, err := sb.build()
assert.NoError(t, err)

View File

@ -203,6 +203,16 @@ var _ = Describe("Kubelet", func() {
Expect(summary.Node.Fs.UsedBytes).NotTo(BeNil())
Expect(*summary.Node.Fs.UsedBytes).NotTo(BeZero())
By("Having container runtime's image storage information")
Expect(summary.Node.Runtime).NotTo(BeNil())
Expect(summary.Node.Runtime.ImageFs).NotTo(BeNil())
Expect(summary.Node.Runtime.ImageFs.AvailableBytes).NotTo(BeNil())
Expect(*summary.Node.Runtime.ImageFs.AvailableBytes).NotTo(BeZero())
Expect(summary.Node.Runtime.ImageFs.CapacityBytes).NotTo(BeNil())
Expect(*summary.Node.Runtime.ImageFs.CapacityBytes).NotTo(BeZero())
Expect(summary.Node.Runtime.ImageFs.UsedBytes).NotTo(BeNil())
Expect(*summary.Node.Runtime.ImageFs.UsedBytes).NotTo(BeZero())
By("Having resources for kubelet and runtime system containers")
sysContainers := map[string]stats.ContainerStats{}
sysContainersList := []string{}