Kubelet Metrics Summary Api Implementation

pull/6/head
Phillip Wittrock 2016-01-22 12:14:06 -08:00
parent ad9fa30e7e
commit ba5be34574
15 changed files with 928 additions and 16 deletions

View File

@ -36,6 +36,10 @@ func (c *Fake) ContainerInfo(name string, req *cadvisorapi.ContainerInfoRequest)
return new(cadvisorapi.ContainerInfo), nil
}
func (c *Fake) ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return map[string]cadvisorapiv2.ContainerInfo{}, nil
}
func (c *Fake) SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
return map[string]*cadvisorapi.ContainerInfo{}, nil
}

View File

@ -137,6 +137,10 @@ func (cc *cadvisorClient) ContainerInfo(name string, req *cadvisorapi.ContainerI
return cc.GetContainerInfo(name, req)
}
func (cc *cadvisorClient) ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return cc.GetContainerInfoV2(name, options)
}
func (cc *cadvisorClient) VersionInfo() (*cadvisorapi.VersionInfo, error) {
return cc.GetVersionInfo()
}

View File

@ -40,6 +40,12 @@ func (c *Mock) ContainerInfo(name string, req *cadvisorapi.ContainerInfoRequest)
return args.Get(0).(*cadvisorapi.ContainerInfo), args.Error(1)
}
// ContainerInfoV2 is a mock implementation of Interface.ContainerInfoV2.
func (c *Mock) ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
args := c.Called(name, options)
return args.Get(0).(map[string]cadvisorapiv2.ContainerInfo), args.Error(1)
}
func (c *Mock) SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
args := c.Called(name, req)
return args.Get(0).(map[string]*cadvisorapi.ContainerInfo), args.Error(1)

View File

@ -49,6 +49,10 @@ func (cu *cadvisorUnsupported) ContainerInfo(name string, req *cadvisorapi.Conta
return nil, unsupportedErr
}
func (cu *cadvisorUnsupported) ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return nil, unsupportedErr
}
func (cu *cadvisorUnsupported) SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error) {
return nil, unsupportedErr
}

View File

@ -27,6 +27,7 @@ type Interface interface {
Start() error
DockerContainer(name string, req *cadvisorapi.ContainerInfoRequest) (cadvisorapi.ContainerInfo, error)
ContainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
ContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
SubcontainerInfo(name string, req *cadvisorapi.ContainerInfoRequest) (map[string]*cadvisorapi.ContainerInfo, error)
MachineInfo() (*cadvisorapi.MachineInfo, error)

View File

@ -64,6 +64,22 @@ type labelledContainerInfo struct {
PreStopHandler *api.Handler
}
func GetContainerName(labels map[string]string) string {
return labels[kubernetesContainerNameLabel]
}
func GetPodName(labels map[string]string) string {
return labels[kubernetesPodNameLabel]
}
func GetPodUID(labels map[string]string) string {
return labels[kubernetesPodUIDLabel]
}
func GetPodNamespace(labels map[string]string) string {
return labels[kubernetesPodNamespaceLabel]
}
func newLabels(container *api.Container, pod *api.Pod, restartCount int) map[string]string {
labels := map[string]string{}
labels[kubernetesPodNameLabel] = pod.Name

View File

@ -34,6 +34,7 @@ import (
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
@ -3413,6 +3414,19 @@ func (kl *Kubelet) GetContainerInfo(podFullName string, podUID types.UID, contai
return &ci, nil
}
// GetContainerInfoV2 returns stats (from Cadvisor) for containers.
func (kl *Kubelet) GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return kl.cadvisor.ContainerInfoV2(name, options)
}
func (kl *Kubelet) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
return kl.cadvisor.DockerImagesFsInfo()
}
func (kl *Kubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
return kl.cadvisor.RootFsInfo()
}
// Returns stats (from Cadvisor) for a non-Kubernetes container.
func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
if subcontainers {
@ -3468,6 +3482,9 @@ func (kl *Kubelet) updatePodCIDR(cidr string) {
kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
}
}
func (kl *Kubelet) GetNodeConfig() cm.NodeConfig {
return kl.nodeConfig
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")

View File

@ -33,6 +33,7 @@ import (
restful "github.com/emicklei/go-restful"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/api"
@ -45,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
@ -140,6 +142,7 @@ type AuthInterface interface {
// For testablitiy.
type HostInterface interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error)
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetPods() []*api.Pod
@ -154,7 +157,11 @@ type HostInterface interface {
StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration
GetHostname() string
GetNode() (*api.Node, error)
GetNodeConfig() cm.NodeConfig
LatestLoopEntryTime() time.Time
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error)
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.

View File

@ -34,10 +34,12 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/auth/user"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
@ -129,6 +131,22 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
return fk.streamingConnectionIdleTimeoutFunc()
}
// Unused functions
func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return nil, nil
}
func (_ *fakeKubelet) DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error) {
return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupported Operation DockerImagesFsInfo")
}
func (_ *fakeKubelet) RootFsInfo() (cadvisorapiv2.FsInfo, error) {
return cadvisorapiv2.FsInfo{}, fmt.Errorf("Unsupport Operation RootFsInfo")
}
func (_ *fakeKubelet) GetNode() (*api.Node, error) { return nil, nil }
func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
type fakeAuth struct {
authenticateFunc func(*http.Request) (user.Info, bool, error)
attributesFunc func(user.Info, *http.Request) authorizer.Attributes

View File

@ -27,8 +27,10 @@ import (
"github.com/emicklei/go-restful"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
@ -36,16 +38,22 @@ import (
// Host methods required by stats handlers.
type StatsProvider interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetNode() (*api.Node, error)
GetNodeConfig() cm.NodeConfig
DockerImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error)
}
type handler struct {
provider StatsProvider
provider StatsProvider
summaryProvider SummaryProvider
}
func CreateHandlers(provider StatsProvider) *restful.WebService {
h := &handler{provider}
h := &handler{provider, NewSummaryProvider(provider)}
ws := &restful.WebService{}
ws.Path("/stats/").
@ -137,11 +145,12 @@ func (h *handler) handleStats(request *restful.Request, response *restful.Respon
// Handles stats summary requests to /stats/summary
func (h *handler) handleSummary(request *restful.Request, response *restful.Response) {
summary := Summary{}
// TODO(timstclair): Fill in summary from cAdvisor v2 endpoint.
writeResponse(response, summary)
summary, err := h.summaryProvider.Get()
if err != nil {
handleError(response, err)
} else {
writeResponse(response, summary)
}
}
// Handles non-kubernetes container stats requests to /stats/container/

View File

@ -0,0 +1,347 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"time"
"github.com/golang/glog"
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/leaky"
)
type SummaryProvider interface {
// Get provides a new Summary using the latest results from cadvisor
Get() (*Summary, error)
}
type summaryProviderImpl struct {
provider StatsProvider
}
var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a new SummaryProvider
func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider {
return &summaryProviderImpl{statsProvider}
}
// Get implements the SummaryProvider interface
// Query cadvisor for the latest resource metrics and build into a summary
func (sp *summaryProviderImpl) Get() (*Summary, error) {
options := cadvisorapiv2.RequestOptions{
IdType: cadvisorapiv2.TypeName,
Count: 2, // 2 samples are needed to compute "instantaneous" CPU
Recursive: true,
}
infos, err := sp.provider.GetContainerInfoV2("/", options)
if err != nil {
return nil, err
}
node, err := sp.provider.GetNode()
if err != nil {
return nil, err
}
nodeConfig := sp.provider.GetNodeConfig()
rootFsInfo, err := sp.provider.RootFsInfo()
if err != nil {
return nil, err
}
imageFsInfo, err := sp.provider.DockerImagesFsInfo()
if err != nil {
return nil, err
}
sb := &summaryBuilder{node, nodeConfig, rootFsInfo, imageFsInfo, infos}
return sb.build()
}
// summaryBuilder aggregates the datastructures provided by cadvisor into a Summary result
type summaryBuilder struct {
node *api.Node
nodeConfig cm.NodeConfig
rootFsInfo cadvisorapiv2.FsInfo
imageFsInfo cadvisorapiv2.FsInfo
infos map[string]cadvisorapiv2.ContainerInfo
}
// build returns a Summary from aggregating the input data
func (sb *summaryBuilder) build() (*Summary, error) {
rootInfo, found := sb.infos["/"]
if !found {
return nil, fmt.Errorf("Missing stats for root container")
}
cstat, found := sb.latestContainerStats(&rootInfo)
if !found {
return nil, fmt.Errorf("Missing stats for root container")
}
rootStats := sb.containerInfoV2ToStats("", &rootInfo)
nodeStats := NodeStats{
NodeName: sb.node.Name,
CPU: rootStats.CPU,
Memory: rootStats.Memory,
Network: sb.containerInfoV2ToNetworkStats(&rootInfo),
Fs: &FsStats{
AvailableBytes: &sb.rootFsInfo.Available,
CapacityBytes: &sb.rootFsInfo.Capacity,
UsedBytes: &sb.rootFsInfo.Usage},
StartTime: rootStats.StartTime,
}
systemContainers := map[string]string{
SystemContainerKubelet: sb.nodeConfig.KubeletContainerName,
SystemContainerRuntime: sb.nodeConfig.DockerDaemonContainerName, // TODO: add support for other runtimes
SystemContainerMisc: sb.nodeConfig.SystemContainerName,
}
for sys, name := range systemContainers {
if info, ok := sb.infos[name]; ok {
nodeStats.SystemContainers = append(nodeStats.SystemContainers, sb.containerInfoV2ToStats(sys, &info))
}
}
summary := Summary{
Time: unversioned.NewTime(cstat.Timestamp),
Node: nodeStats,
Pods: sb.buildSummaryPods(),
}
return &summary, nil
}
// containerInfoV2FsStats populates the container fs stats
func (sb *summaryBuilder) containerInfoV2FsStats(
info *cadvisorapiv2.ContainerInfo,
cs *ContainerStats) {
// The container logs live on the node rootfs device
cs.Logs = &FsStats{
AvailableBytes: &sb.rootFsInfo.Available,
CapacityBytes: &sb.rootFsInfo.Capacity,
}
// The container rootFs lives on the imageFs devices (which may not be the node root fs)
cs.Rootfs = &FsStats{
AvailableBytes: &sb.imageFsInfo.Available,
CapacityBytes: &sb.imageFsInfo.Capacity,
}
lcs, found := sb.latestContainerStats(info)
if !found {
return
}
cfs := lcs.Filesystem
if cfs != nil && cfs.BaseUsageBytes != nil {
cs.Rootfs.UsedBytes = cfs.BaseUsageBytes
if cfs.TotalUsageBytes != nil {
logsUsage := *cfs.TotalUsageBytes - *cfs.BaseUsageBytes
cs.Logs.UsedBytes = &logsUsage
}
}
}
// latestContainerStats returns the latest container stats from cadvisor, or nil if none exist
func (sb *summaryBuilder) latestContainerStats(info *cadvisorapiv2.ContainerInfo) (*cadvisorapiv2.ContainerStats, bool) {
stats := info.Stats
if len(stats) < 1 {
return nil, false
}
latest := stats[len(stats)-1]
if latest == nil {
return nil, false
}
return latest, true
}
// buildSummaryPods aggregates and returns the container stats in cinfos by the Pod managing the container.
// Containers not managed by a Pod are omitted.
func (sb *summaryBuilder) buildSummaryPods() []PodStats {
// Map each container to a pod and update the PodStats with container data
podToStats := map[PodReference]*PodStats{}
for _, cinfo := range sb.infos {
// Build the Pod key if this container is managed by a Pod
if !sb.isPodManagedContainer(&cinfo) {
continue
}
ref := sb.buildPodRef(&cinfo)
// Lookup the PodStats for the pod using the PodRef. If none exists, initialize a new entry.
stats, found := podToStats[ref]
if !found {
stats = &PodStats{PodRef: ref}
podToStats[ref] = stats
}
// Update the PodStats entry with the stats from the container by adding it to stats.Containers
containerName := dockertools.GetContainerName(cinfo.Spec.Labels)
if containerName == leaky.PodInfraContainerName {
// Special case for infrastructure container which is hidden from the user and has network stats
stats.Network = sb.containerInfoV2ToNetworkStats(&cinfo)
stats.StartTime = unversioned.NewTime(cinfo.Spec.CreationTime)
} else {
stats.Containers = append(stats.Containers, sb.containerInfoV2ToStats(containerName, &cinfo))
}
}
// Add each PodStats to the result
result := make([]PodStats, 0, len(podToStats))
for _, stats := range podToStats {
result = append(result, *stats)
}
return result
}
// buildPodRef returns a PodReference that identifies the Pod managing cinfo
func (sb *summaryBuilder) buildPodRef(cinfo *cadvisorapiv2.ContainerInfo) PodReference {
podName := dockertools.GetPodName(cinfo.Spec.Labels)
podNamespace := dockertools.GetPodNamespace(cinfo.Spec.Labels)
podUID := dockertools.GetPodUID(cinfo.Spec.Labels)
return PodReference{Name: podName, Namespace: podNamespace, UID: podUID}
}
// isPodManagedContainer returns true if the cinfo container is managed by a Pod
func (sb *summaryBuilder) isPodManagedContainer(cinfo *cadvisorapiv2.ContainerInfo) bool {
podName := dockertools.GetPodName(cinfo.Spec.Labels)
podNamespace := dockertools.GetPodNamespace(cinfo.Spec.Labels)
managed := podName != "" && podNamespace != ""
if !managed && podName != podNamespace {
glog.Warningf(
"Expect container to have either both podName (%s) and podNamespace (%s) labels, or neither.",
podName, podNamespace)
}
return managed
}
func (sb *summaryBuilder) containerInfoV2ToStats(
name string,
info *cadvisorapiv2.ContainerInfo) ContainerStats {
stats := ContainerStats{
Name: name,
StartTime: unversioned.NewTime(info.Spec.CreationTime),
}
cstat, found := sb.latestContainerStats(info)
if !found {
return stats
}
if info.Spec.HasCpu {
cpuStats := CPUStats{}
if cstat.CpuInst != nil {
cpuStats.UsageNanoCores = &cstat.CpuInst.Usage.Total
}
if cstat.Cpu != nil {
cpuStats.UsageCoreNanoSeconds = &cstat.Cpu.Usage.Total
}
stats.CPU = &cpuStats
}
if info.Spec.HasMemory {
pageFaults := cstat.Memory.ContainerData.Pgfault
majorPageFaults := cstat.Memory.ContainerData.Pgmajfault
stats.Memory = &MemoryStats{
UsageBytes: &cstat.Memory.Usage,
WorkingSetBytes: &cstat.Memory.WorkingSet,
PageFaults: &pageFaults,
MajorPageFaults: &majorPageFaults,
}
}
sb.containerInfoV2FsStats(info, &stats)
stats.UserDefinedMetrics = sb.containerInfoV2ToUserDefinedMetrics(info)
return stats
}
func (sb *summaryBuilder) containerInfoV2ToNetworkStats(info *cadvisorapiv2.ContainerInfo) *NetworkStats {
if !info.Spec.HasNetwork {
return nil
}
cstat, found := sb.latestContainerStats(info)
if !found {
return nil
}
var (
rxBytes uint64
rxErrors uint64
txBytes uint64
txErrors uint64
)
// TODO(stclair): check for overflow
for _, inter := range cstat.Network.Interfaces {
rxBytes += inter.RxBytes
rxErrors += inter.RxErrors
txBytes += inter.TxBytes
txErrors += inter.TxErrors
}
return &NetworkStats{
RxBytes: &rxBytes,
RxErrors: &rxErrors,
TxBytes: &txBytes,
TxErrors: &txErrors,
}
}
func (sb *summaryBuilder) containerInfoV2ToUserDefinedMetrics(info *cadvisorapiv2.ContainerInfo) []UserDefinedMetric {
type specVal struct {
ref UserDefinedMetricDescriptor
valType cadvisorapiv1.DataType
time time.Time
value float64
}
udmMap := map[string]*specVal{}
for _, spec := range info.Spec.CustomMetrics {
udmMap[spec.Name] = &specVal{
ref: UserDefinedMetricDescriptor{
Name: spec.Name,
Type: UserDefinedMetricType(spec.Type),
Units: spec.Units,
},
valType: spec.Format,
}
}
for _, stat := range info.Stats {
for name, values := range stat.CustomMetrics {
specVal, ok := udmMap[name]
if !ok {
glog.Warningf("spec for custom metric %q is missing from cAdvisor output. Spec: %+v, Metrics: %+v", name, info.Spec, stat.CustomMetrics)
continue
}
for _, value := range values {
// Pick the most recent value
if value.Timestamp.Before(specVal.time) {
continue
}
specVal.time = value.Timestamp
specVal.value = value.FloatValue
if specVal.valType == cadvisorapiv1.IntType {
specVal.value = float64(value.IntValue)
}
}
}
}
var udm []UserDefinedMetric
for _, specVal := range udmMap {
udm = append(udm, UserDefinedMetric{
UserDefinedMetricDescriptor: specVal.ref,
Value: specVal.value,
})
}
return udm
}

View File

@ -0,0 +1,353 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"testing"
"time"
"github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/leaky"
)
const (
// Offsets from seed value in generated container stats.
offsetCPUUsageCores = iota
offsetCPUUsageCoreSeconds
offsetMemPageFaults
offsetMemMajorPageFaults
offsetMemUsageBytes
offsetMemWorkingSetBytes
offsetNetRxBytes
offsetNetRxErrors
offsetNetTxBytes
offsetNetTxErrors
)
func TestBuildSummary(t *testing.T) {
node := api.Node{}
node.Name = "FooNode"
nodeConfig := cm.NodeConfig{
DockerDaemonContainerName: "/docker-daemon",
SystemContainerName: "/system",
KubeletContainerName: "/kubelet",
}
const (
namespace0 = "test0"
namespace2 = "test2"
)
const (
seedRoot = 0
seedRuntime = 100
seedKubelet = 200
seedMisc = 300
seedPod0Infra = 1000
seedPod0Container0 = 2000
seedPod0Container1 = 2001
seedPod1Infra = 3000
seedPod1Container = 4000
seedPod2Infra = 5000
seedPod2Container = 6000
)
const (
pName0 = "pod0"
pName1 = "pod1"
pName2 = "pod0" // ensure pName2 conflicts with pName0, but is in a different namespace
)
const (
cName00 = "c0"
cName01 = "c1"
cName10 = "c0" // ensure cName10 conflicts with cName02, but is in a different pod
cName20 = "c1" // ensure cName20 conflicts with cName01, but is in a different pod + namespace
)
prf0 := PodReference{Name: pName0, Namespace: namespace0, UID: "UID" + pName0}
prf1 := PodReference{Name: pName1, Namespace: namespace0, UID: "UID" + pName1}
prf2 := PodReference{Name: pName2, Namespace: namespace2, UID: "UID" + pName2}
infos := map[string]v2.ContainerInfo{
"/": summaryTestContainerInfo(seedRoot, "", "", ""),
"/docker-daemon": summaryTestContainerInfo(seedRuntime, "", "", ""),
"/kubelet": summaryTestContainerInfo(seedKubelet, "", "", ""),
"/system": summaryTestContainerInfo(seedMisc, "", "", ""),
// Pod0 - Namespace0
"/pod0-i": summaryTestContainerInfo(seedPod0Infra, pName0, namespace0, leaky.PodInfraContainerName),
"/pod0-c0": summaryTestContainerInfo(seedPod0Container0, pName0, namespace0, cName00),
"/pod0-c2": summaryTestContainerInfo(seedPod0Container1, pName0, namespace0, cName01),
// Pod1 - Namespace0
"/pod1-i": summaryTestContainerInfo(seedPod1Infra, pName1, namespace0, leaky.PodInfraContainerName),
"/pod1-c0": summaryTestContainerInfo(seedPod1Container, pName1, namespace0, cName10),
// Pod2 - Namespace2
"/pod2-i": summaryTestContainerInfo(seedPod2Infra, pName2, namespace2, leaky.PodInfraContainerName),
"/pod2-c0": summaryTestContainerInfo(seedPod2Container, pName2, namespace2, cName20),
}
rootfs := v2.FsInfo{}
imagefs := v2.FsInfo{}
sb := &summaryBuilder{&node, nodeConfig, rootfs, imagefs, infos}
summary, err := sb.build()
assert.NoError(t, err)
nodeStats := summary.Node
assert.Equal(t, "FooNode", nodeStats.NodeName)
checkCPUStats(t, "Node", seedRoot, nodeStats.CPU)
checkMemoryStats(t, "Node", seedRoot, nodeStats.Memory)
checkNetworkStats(t, "Node", seedRoot, nodeStats.Network)
systemSeeds := map[string]int{
SystemContainerRuntime: seedRuntime,
SystemContainerKubelet: seedKubelet,
SystemContainerMisc: seedMisc,
}
for _, sys := range nodeStats.SystemContainers {
name := sys.Name
seed, found := systemSeeds[name]
if !found {
t.Errorf("Unknown SystemContainer: %q", name)
}
checkCPUStats(t, name, seed, sys.CPU)
checkMemoryStats(t, name, seed, sys.Memory)
}
assert.Equal(t, 3, len(summary.Pods))
indexPods := make(map[PodReference]PodStats, len(summary.Pods))
for _, pod := range summary.Pods {
indexPods[pod.PodRef] = pod
}
// Validate Pod0 Results
ps, found := indexPods[prf0]
assert.True(t, found)
assert.Len(t, ps.Containers, 2)
indexCon := make(map[string]ContainerStats, len(ps.Containers))
for _, con := range ps.Containers {
indexCon[con.Name] = con
}
con := indexCon[cName00]
checkCPUStats(t, "container", seedPod0Container0, con.CPU)
checkMemoryStats(t, "container", seedPod0Container0, con.Memory)
con = indexCon[cName01]
checkCPUStats(t, "container", seedPod0Container1, con.CPU)
checkMemoryStats(t, "container", seedPod0Container1, con.Memory)
checkNetworkStats(t, "Pod", seedPod0Infra, ps.Network)
// Validate Pod1 Results
ps, found = indexPods[prf1]
assert.True(t, found)
assert.Len(t, ps.Containers, 1)
con = ps.Containers[0]
assert.Equal(t, cName10, con.Name)
checkCPUStats(t, "container", seedPod1Container, con.CPU)
checkMemoryStats(t, "container", seedPod1Container, con.Memory)
checkNetworkStats(t, "Pod", seedPod1Infra, ps.Network)
// Validate Pod2 Results
ps, found = indexPods[prf2]
assert.True(t, found)
assert.Len(t, ps.Containers, 1)
con = ps.Containers[0]
assert.Equal(t, cName20, con.Name)
checkCPUStats(t, "container", seedPod2Container, con.CPU)
checkMemoryStats(t, "container", seedPod2Container, con.Memory)
checkNetworkStats(t, "Pod", seedPod2Infra, ps.Network)
}
func generateCustomMetricSpec() []v1.MetricSpec {
f := fuzz.New().NilChance(0).Funcs(
func(e *v1.MetricSpec, c fuzz.Continue) {
c.Fuzz(&e.Name)
switch c.Intn(3) {
case 0:
e.Type = v1.MetricGauge
case 1:
e.Type = v1.MetricCumulative
case 2:
e.Type = v1.MetricDelta
}
switch c.Intn(2) {
case 0:
e.Format = v1.IntType
case 1:
e.Format = v1.FloatType
}
c.Fuzz(&e.Units)
})
var ret []v1.MetricSpec
f.Fuzz(&ret)
return ret
}
func generateCustomMetrics(spec []v1.MetricSpec) map[string][]v1.MetricVal {
ret := map[string][]v1.MetricVal{}
for _, metricSpec := range spec {
f := fuzz.New().NilChance(0).Funcs(
func(e *v1.MetricVal, c fuzz.Continue) {
switch metricSpec.Format {
case v1.IntType:
c.Fuzz(&e.IntValue)
case v1.FloatType:
c.Fuzz(&e.FloatValue)
}
})
var metrics []v1.MetricVal
f.Fuzz(&metrics)
ret[metricSpec.Name] = metrics
}
return ret
}
func summaryTestContainerInfo(seed int, podName string, podNamespace string, containerName string) v2.ContainerInfo {
labels := map[string]string{}
if podName != "" {
labels = map[string]string{
"io.kubernetes.pod.name": podName,
"io.kubernetes.pod.uid": "UID" + podName,
"io.kubernetes.pod.namespace": podNamespace,
"io.kubernetes.container.name": containerName,
}
}
spec := v2.ContainerSpec{
HasCpu: true,
HasMemory: true,
HasNetwork: true,
Labels: labels,
CustomMetrics: generateCustomMetricSpec(),
}
stats := v2.ContainerStats{
Cpu: &v1.CpuStats{},
CpuInst: &v2.CpuInstStats{},
Memory: &v1.MemoryStats{
Usage: uint64(seed + offsetMemUsageBytes),
WorkingSet: uint64(seed + offsetMemWorkingSetBytes),
ContainerData: v1.MemoryStatsMemoryData{
Pgfault: uint64(seed + offsetMemPageFaults),
Pgmajfault: uint64(seed + offsetMemMajorPageFaults),
},
},
Network: &v2.NetworkStats{
Interfaces: []v1.InterfaceStats{{
RxBytes: uint64(seed + offsetNetRxBytes),
RxErrors: uint64(seed + offsetNetRxErrors),
TxBytes: uint64(seed + offsetNetTxBytes),
TxErrors: uint64(seed + offsetNetTxErrors),
}},
},
CustomMetrics: generateCustomMetrics(spec.CustomMetrics),
}
stats.Cpu.Usage.Total = uint64(seed + offsetCPUUsageCoreSeconds)
stats.CpuInst.Usage.Total = uint64(seed + offsetCPUUsageCores)
return v2.ContainerInfo{
Spec: spec,
Stats: []*v2.ContainerStats{&stats},
}
}
func checkNetworkStats(t *testing.T, label string, seed int, stats *NetworkStats) {
assert.EqualValues(t, seed+offsetNetRxBytes, *stats.RxBytes, label+".Net.RxBytes")
assert.EqualValues(t, seed+offsetNetRxErrors, *stats.RxErrors, label+".Net.RxErrors")
assert.EqualValues(t, seed+offsetNetTxBytes, *stats.TxBytes, label+".Net.TxBytes")
assert.EqualValues(t, seed+offsetNetTxErrors, *stats.TxErrors, label+".Net.TxErrors")
}
func checkCPUStats(t *testing.T, label string, seed int, stats *CPUStats) {
assert.EqualValues(t, seed+offsetCPUUsageCores, *stats.UsageNanoCores, label+".CPU.UsageCores")
assert.EqualValues(t, seed+offsetCPUUsageCoreSeconds, *stats.UsageCoreNanoSeconds, label+".CPU.UsageCoreSeconds")
}
func checkMemoryStats(t *testing.T, label string, seed int, stats *MemoryStats) {
assert.EqualValues(t, seed+offsetMemUsageBytes, *stats.UsageBytes, label+".Mem.UsageBytes")
assert.EqualValues(t, seed+offsetMemWorkingSetBytes, *stats.WorkingSetBytes, label+".Mem.WorkingSetBytes")
assert.EqualValues(t, seed+offsetMemPageFaults, *stats.PageFaults, label+".Mem.PageFaults")
assert.EqualValues(t, seed+offsetMemMajorPageFaults, *stats.MajorPageFaults, label+".Mem.MajorPageFaults")
}
func TestCustomMetrics(t *testing.T) {
spec := []v1.MetricSpec{
{
Name: "qos",
Type: v1.MetricGauge,
Format: v1.IntType,
Units: "per second",
},
{
Name: "cpuLoad",
Type: v1.MetricCumulative,
Format: v1.FloatType,
Units: "count",
},
}
metrics := map[string][]v1.MetricVal{
"qos": {
{
Timestamp: time.Now(),
IntValue: 10,
},
{
Timestamp: time.Now().Add(time.Minute),
IntValue: 100,
},
},
"cpuLoad": {
{
Timestamp: time.Now(),
FloatValue: 1.2,
},
{
Timestamp: time.Now().Add(time.Minute),
FloatValue: 2.1,
},
},
}
cInfo := v2.ContainerInfo{
Spec: v2.ContainerSpec{
CustomMetrics: spec,
},
Stats: []*v2.ContainerStats{
{
CustomMetrics: metrics,
},
},
}
sb := &summaryBuilder{}
assert.Contains(t, sb.containerInfoV2ToUserDefinedMetrics(&cInfo),
UserDefinedMetric{
UserDefinedMetricDescriptor: UserDefinedMetricDescriptor{
Name: "qos",
Type: MetricGauge,
Units: "per second",
},
Value: 100,
},
UserDefinedMetric{
UserDefinedMetricDescriptor: UserDefinedMetricDescriptor{
Name: "cpuLoad",
Type: MetricCumulative,
Units: "count",
},
Value: 2.1,
})
}

View File

@ -91,7 +91,7 @@ type ContainerStats struct {
// Stats pertaining to container logs usage of filesystem resources.
// Logs.UsedBytes is the number of bytes used for the container logs.
Logs *FsStats `json:"logs,omitempty"`
// User defined metrics are arbitrary metrics exposed by containers in pods.
// User defined metrics that are exposed by containers in the pod. Typically, we expect only one container in the pod to be exposing user defined metrics. In the event of multiple containers exposing metrics, they will be combined here.
UserDefinedMetrics []UserDefinedMetric `json:"userDefinedMetrics,omitmepty" patchStrategy:"merge" patchMergeKey:"name"`
}

View File

@ -15,7 +15,8 @@ limitations under the License.
*/
// To run tests in this suite
// `$ ginkgo -- --node-name node-e2e-test-1 --api-server-address <serveraddress> --logtostderr`
// Local: `$ ginkgo -- --logtostderr -v 2`
// Remote: `$ ginkgo -- --node-name <hostname> --api-server-address=<hostname:api_port> --kubelet-address=<hostname=kubelet_port> --logtostderr -v 2`
package e2e_node
import (
@ -26,9 +27,9 @@ import (
"testing"
)
var kubeletAddress = flag.String("kubelet-address", "localhost:10250", "Host and port of the kubelet")
var apiServerAddress = flag.String("api-server-address", "localhost:8080", "Host and port of the api server")
var nodeName = flag.String("node-name", "", "Name of the node")
var kubeletAddress = flag.String("kubelet-address", "http://127.0.0.1:10255", "Host and port of the kubelet")
var apiServerAddress = flag.String("api-server-address", "http://127.0.0.1:8080", "Host and port of the api server")
var nodeName = flag.String("node-name", "127.0.0.1", "Name of the node")
func TestE2eNode(t *testing.T) {
flag.Parse()

View File

@ -18,13 +18,18 @@ package e2e_node
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)
var _ = Describe("Kubelet", func() {
@ -49,10 +54,9 @@ var _ = Describe("Kubelet", func() {
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
{
Image: "gcr.io/google_containers/busybox",
Name: "busybox",
Command: []string{"echo", "'Hello World'"},
ImagePullPolicy: api.PullIfNotPresent,
Image: "gcr.io/google_containers/busybox",
Name: "busybox",
Command: []string{"echo", "'Hello World'"},
},
},
},
@ -84,4 +88,125 @@ var _ = Describe("Kubelet", func() {
})
})
})
Describe("metrics api", func() {
statsPrefix := "stats-busybox-"
podNames := []string{}
podCount := 2
for i := 0; i < podCount; i++ {
podNames = append(podNames, fmt.Sprintf("%s%v", statsPrefix, i))
}
BeforeEach(func() {
for _, podName := range podNames {
createPod(cl, podName, []api.Container{
{
Image: "gcr.io/google_containers/busybox",
Command: []string{"sh", "-c", "echo 'Hello World' | tee ~/file | tee -a ~/file | tee /test-empty-dir | sleep 60"},
Name: podName + containerSuffix,
},
})
}
// Sleep long enough for cadvisor to see the pod and calculate all of its metrics
time.Sleep(60 * time.Second)
})
Context("when querying /stats/summary", func() {
It("it should report resource usage through the stats api", func() {
resp, err := http.Get(*kubeletAddress + "/stats/summary")
now := time.Now()
Expect(err).To(BeNil(), fmt.Sprintf("Failed to get /stats/summary"))
summary := stats.Summary{}
contentsBytes, err := ioutil.ReadAll(resp.Body)
Expect(err).To(BeNil(), fmt.Sprintf("Failed to read /stats/summary: %+v", resp))
contents := string(contentsBytes)
decoder := json.NewDecoder(strings.NewReader(contents))
err = decoder.Decode(&summary)
Expect(err).To(BeNil(), fmt.Sprintf("Failed to parse /stats/summary to go struct: %+v", resp))
// Verify Misc Stats
Expect(summary.Time.Time).To(BeTemporally("~", now, 20*time.Second))
// Verify Node Stats are present
Expect(summary.Node.NodeName).To(Equal(*nodeName))
Expect(summary.Node.CPU.UsageCoreNanoSeconds).NotTo(BeZero())
Expect(summary.Node.Memory.UsageBytes).NotTo(BeZero())
Expect(summary.Node.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(summary.Node.Fs.UsedBytes).NotTo(BeZero())
Expect(summary.Node.Fs.CapacityBytes).NotTo(BeZero())
Expect(summary.Node.Fs.AvailableBytes).NotTo(BeZero())
sysContainers := map[string]stats.ContainerStats{}
sysContainersList := []string{}
for _, container := range summary.Node.SystemContainers {
sysContainers[container.Name] = container
sysContainersList = append(sysContainersList, container.Name)
Expect(container.CPU.UsageCoreNanoSeconds).NotTo(BeZero())
// TODO: Test Network
Expect(container.Memory.UsageBytes).NotTo(BeZero())
Expect(container.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(container.Rootfs.CapacityBytes).NotTo(BeZero())
Expect(container.Rootfs.AvailableBytes).NotTo(BeZero())
Expect(container.Logs.CapacityBytes).NotTo(BeZero())
Expect(container.Logs.AvailableBytes).NotTo(BeZero())
}
Expect(sysContainersList).To(ConsistOf("kubelet", "runtime"))
// Verify Pods Stats are present
podsList := []string{}
for _, pod := range summary.Pods {
if !strings.HasPrefix(pod.PodRef.Name, statsPrefix) {
// Ignore pods created outside this test
continue
}
// TODO: Test network
podsList = append(podsList, pod.PodRef.Name)
Expect(pod.Containers).To(HaveLen(1))
container := pod.Containers[0]
Expect(container.Name).To(Equal(pod.PodRef.Name + containerSuffix))
Expect(container.CPU.UsageCoreNanoSeconds).NotTo(BeZero())
Expect(container.Memory.UsageBytes).NotTo(BeZero())
Expect(container.Memory.WorkingSetBytes).NotTo(BeZero())
Expect(container.Rootfs.CapacityBytes).NotTo(BeZero())
Expect(container.Rootfs.AvailableBytes).NotTo(BeZero())
Expect(*container.Rootfs.UsedBytes).NotTo(BeZero(), contents)
Expect(container.Logs.CapacityBytes).NotTo(BeZero())
Expect(container.Logs.AvailableBytes).NotTo(BeZero())
Expect(*container.Logs.UsedBytes).NotTo(BeZero(), contents)
}
Expect(podsList).To(ConsistOf(podNames))
})
})
AfterEach(func() {
for _, podName := range podNames {
err := cl.Pods(api.NamespaceDefault).Delete(podName, &api.DeleteOptions{})
Expect(err).To(BeNil(), fmt.Sprintf("Error deleting Pod %v", podName))
}
})
})
})
const (
containerSuffix = "-c"
)
func createPod(cl *client.Client, podName string, containers []api.Container) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
// Force the Pod to schedule to the node without a scheduler running
NodeName: *nodeName,
// Don't restart the Pod since it is expected to exit
RestartPolicy: api.RestartPolicyNever,
Containers: containers,
},
}
_, err := cl.Pods(api.NamespaceDefault).Create(pod)
Expect(err).To(BeNil(), fmt.Sprintf("Error creating Pod %v", err))
}