add grabbing CA metrics in e2e tests

pull/6/head
Aleksandra Malinowska 2017-08-09 15:54:16 +02:00
parent 7ef5cc23d1
commit 55682f2a55
13 changed files with 176 additions and 34 deletions

View File

@ -251,7 +251,7 @@ func verifyRemainingCronJobsJobsPods(f *framework.Framework, clientSet clientset
func gatherMetrics(f *framework.Framework) {
By("Gathering metrics")
var summary framework.TestDataSummary
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, false, false, true, false)
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, false, false, true, false, false)
if err != nil {
framework.Logf("Failed to create MetricsGrabber. Skipping metrics gathering.")
} else {

View File

@ -308,8 +308,8 @@ func gatherTestSuiteMetrics() error {
return fmt.Errorf("error loading client: %v", err)
}
// Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case).
grabber, err := metrics.NewMetricsGrabber(c, !framework.ProviderIs("kubemark"), true, true, true)
// Grab metrics for apiserver, scheduler, controller-manager, kubelet (for non-kubemark case) and cluster autoscaler (optionally).
grabber, err := metrics.NewMetricsGrabber(c, nil, !framework.ProviderIs("kubemark"), true, true, true, framework.TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
return fmt.Errorf("failed to create MetricsGrabber: %v", err)
}

View File

@ -66,7 +66,8 @@ type Framework struct {
BaseName string
// ClientSet uses internal objects, you should use ClientSet where possible.
ClientSet clientset.Interface
ClientSet clientset.Interface
KubemarkExternalClusterClientSet clientset.Interface
InternalClientset *internalclientset.Clientset
StagingClient *staging.Clientset
@ -100,6 +101,9 @@ type Framework struct {
TestSummaries []TestDataSummary
kubemarkControllerCloseChannel chan struct{}
// Place to keep ClusterAutoscaler metrics from before test in order to compute delta.
clusterAutoscalerMetricsBeforeTest metrics.MetricsCollection
}
type TestDataSummary interface {
@ -202,12 +206,13 @@ func (f *Framework) BeforeEach() {
Expect(err).NotTo(HaveOccurred())
externalClient, err := clientset.NewForConfig(externalConfig)
Expect(err).NotTo(HaveOccurred())
f.KubemarkExternalClusterClientSet = externalClient
f.kubemarkControllerCloseChannel = make(chan struct{})
externalInformerFactory := informers.NewSharedInformerFactory(externalClient, 0)
kubemarkInformerFactory := informers.NewSharedInformerFactory(f.ClientSet, 0)
kubemarkNodeInformer := kubemarkInformerFactory.Core().V1().Nodes()
go kubemarkNodeInformer.Informer().Run(f.kubemarkControllerCloseChannel)
TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(externalClient, externalInformerFactory, f.ClientSet, kubemarkNodeInformer)
TestContext.CloudConfig.KubemarkController, err = kubemark.NewKubemarkController(f.KubemarkExternalClusterClientSet, externalInformerFactory, f.ClientSet, kubemarkNodeInformer)
Expect(err).NotTo(HaveOccurred())
externalInformerFactory.Start(f.kubemarkControllerCloseChannel)
TestContext.CloudConfig.KubemarkController.Init(f.kubemarkControllerCloseChannel)
@ -255,6 +260,22 @@ func (f *Framework) BeforeEach() {
f.logsSizeWaitGroup.Done()
}()
}
gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else {
f.clusterAutoscalerMetricsBeforeTest, err = grabber.Grab()
if err != nil {
Logf("MetricsGrabber failed to grab CA metrics before test (skipping metrics gathering): %v", err)
} else {
Logf("Gathered ClusterAutoscaler metrics before test")
}
}
}
}
// AfterEach deletes the namespace, after reading its events.
@ -351,16 +372,16 @@ func (f *Framework) AfterEach() {
By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, grabMetricsFromKubelets, true, true, true)
grabber, err := metrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else {
received, err := grabber.Grab()
if err != nil {
Logf("MetricsGrabber failed to grab metrics (skipping metrics gathering): %v", err)
} else {
f.TestSummaries = append(f.TestSummaries, (*MetricsForE2E)(&received))
Logf("MetricsGrabber failed to grab some of the metrics: %v", err)
}
(*MetricsForE2E)(&received).computeClusterAutoscalerMetricsDelta(f.clusterAutoscalerMetricsBeforeTest)
f.TestSummaries = append(f.TestSummaries, (*MetricsForE2E)(&received))
}
}

View File

@ -42,7 +42,7 @@ func GetKubemarkMasterComponentsResourceUsage() map[string]*KubemarkResourceUsag
// Get kuberenetes component resource usage
sshResult, err := getMasterUsageByPrefix("kube")
if err != nil {
Logf("Error when trying to SSH to master machine. Skipping probe")
Logf("Error when trying to SSH to master machine. Skipping probe. %v", err)
return nil
}
scanner := bufio.NewScanner(strings.NewReader(sshResult))

View File

@ -68,7 +68,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (metrics.
if c == nil {
return metrics.GrabKubeletMetricsWithoutProxy(nodeName)
}
grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false)
grabber, err := metrics.NewMetricsGrabber(c, nil, true, false, false, false, false)
if err != nil {
return metrics.KubeletMetrics{}, err
}

View File

@ -11,6 +11,7 @@ go_library(
name = "go_default_library",
srcs = [
"api_server_metrics.go",
"cluster_autoscaler_metrics.go",
"controller_manager_metrics.go",
"generic_metrics.go",
"kubelet_metrics.go",

View File

@ -0,0 +1,36 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
type ClusterAutoscalerMetrics Metrics
func (m *ClusterAutoscalerMetrics) Equal(o ClusterAutoscalerMetrics) bool {
return (*Metrics)(m).Equal(Metrics(o))
}
func NewClusterAutoscalerMetrics() ClusterAutoscalerMetrics {
result := NewMetrics()
return ClusterAutoscalerMetrics(result)
}
func parseClusterAutoscalerMetrics(data string) (ClusterAutoscalerMetrics, error) {
result := NewClusterAutoscalerMetrics()
if err := parseMetrics(data, (*Metrics)(&result)); err != nil {
return ClusterAutoscalerMetrics{}, err
}
return result, nil
}

View File

@ -51,7 +51,7 @@ func (m *Metrics) Equal(o Metrics) bool {
func PrintSample(sample *model.Sample) string {
buf := make([]string, 0)
// Id is a VERY special label. For 'normal' container it's usless, but it's necessary
// Id is a VERY special label. For 'normal' container it's useless, but it's necessary
// for 'system' containers (e.g. /docker-daemon, /kubelet, etc.). We know if that's the
// case by checking if there's a label "kubernetes_container_name" present. It's hacky
// but it works...
@ -97,17 +97,3 @@ func parseMetrics(data string, output *Metrics) error {
}
}
}
func (g *MetricsGrabber) getMetricsFromPod(podName string, namespace string, port int) (string, error) {
rawOutput, err := g.client.Core().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", podName, port)).
Suffix("metrics").
Do().Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}

View File

@ -39,19 +39,22 @@ type MetricsCollection struct {
ControllerManagerMetrics ControllerManagerMetrics
KubeletMetrics map[string]KubeletMetrics
SchedulerMetrics SchedulerMetrics
ClusterAutoscalerMetrics ClusterAutoscalerMetrics
}
type MetricsGrabber struct {
client clientset.Interface
externalClient clientset.Interface
grabFromApiServer bool
grabFromControllerManager bool
grabFromKubelets bool
grabFromScheduler bool
grabFromClusterAutoscaler bool
masterName string
registeredMaster bool
}
func NewMetricsGrabber(c clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool) (*MetricsGrabber, error) {
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool) (*MetricsGrabber, error) {
registeredMaster := false
masterName := ""
nodeList, err := c.Core().Nodes().List(metav1.ListOptions{})
@ -71,15 +74,22 @@ func NewMetricsGrabber(c clientset.Interface, kubelets bool, scheduler bool, con
if !registeredMaster {
scheduler = false
controllers = false
glog.Warningf("Master node is not registered. Grabbing metrics from Scheduler and ControllerManager is disabled.")
clusterAutoscaler = ec != nil
if clusterAutoscaler {
glog.Warningf("Master node is not registered. Grabbing metrics from Scheduler, ControllerManager is disabled.")
} else {
glog.Warningf("Master node is not registered. Grabbing metrics from Scheduler, ControllerManager and ClusterAutoscaler is disabled.")
}
}
return &MetricsGrabber{
client: c,
externalClient: ec,
grabFromApiServer: apiServer,
grabFromControllerManager: controllers,
grabFromKubelets: kubelets,
grabFromScheduler: scheduler,
grabFromClusterAutoscaler: clusterAutoscaler,
masterName: masterName,
registeredMaster: registeredMaster,
}, nil
@ -112,18 +122,38 @@ func (g *MetricsGrabber) GrabFromScheduler() (SchedulerMetrics, error) {
if !g.registeredMaster {
return SchedulerMetrics{}, fmt.Errorf("Master's Kubelet is not registered. Skipping Scheduler's metrics gathering.")
}
output, err := g.getMetricsFromPod(fmt.Sprintf("%v-%v", "kube-scheduler", g.masterName), metav1.NamespaceSystem, ports.SchedulerPort)
output, err := g.getMetricsFromPod(g.client, fmt.Sprintf("%v-%v", "kube-scheduler", g.masterName), metav1.NamespaceSystem, ports.SchedulerPort)
if err != nil {
return SchedulerMetrics{}, err
}
return parseSchedulerMetrics(output)
}
func (g *MetricsGrabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) {
if !g.registeredMaster && g.externalClient == nil {
return ClusterAutoscalerMetrics{}, fmt.Errorf("Master's Kubelet is not registered. Skipping ClusterAutoscaler's metrics gathering.")
}
var client clientset.Interface
var namespace string
if g.externalClient != nil {
client = g.externalClient
namespace = "kubemark"
} else {
client = g.client
namespace = metav1.NamespaceSystem
}
output, err := g.getMetricsFromPod(client, "cluster-autoscaler", namespace, 8085)
if err != nil {
return ClusterAutoscalerMetrics{}, err
}
return parseClusterAutoscalerMetrics(output)
}
func (g *MetricsGrabber) GrabFromControllerManager() (ControllerManagerMetrics, error) {
if !g.registeredMaster {
return ControllerManagerMetrics{}, fmt.Errorf("Master's Kubelet is not registered. Skipping ControllerManager's metrics gathering.")
}
output, err := g.getMetricsFromPod(fmt.Sprintf("%v-%v", "kube-controller-manager", g.masterName), metav1.NamespaceSystem, ports.ControllerManagerPort)
output, err := g.getMetricsFromPod(g.client, fmt.Sprintf("%v-%v", "kube-controller-manager", g.masterName), metav1.NamespaceSystem, ports.ControllerManagerPort)
if err != nil {
return ControllerManagerMetrics{}, err
}
@ -165,6 +195,14 @@ func (g *MetricsGrabber) Grab() (MetricsCollection, error) {
result.ControllerManagerMetrics = metrics
}
}
if g.grabFromClusterAutoscaler {
metrics, err := g.GrabFromClusterAutoscaler()
if err != nil {
errs = append(errs, err)
} else {
result.ClusterAutoscalerMetrics = metrics
}
}
if g.grabFromKubelets {
result.KubeletMetrics = make(map[string]KubeletMetrics)
nodes, err := g.client.Core().Nodes().List(metav1.ListOptions{})
@ -182,7 +220,21 @@ func (g *MetricsGrabber) Grab() (MetricsCollection, error) {
}
}
if len(errs) > 0 {
return MetricsCollection{}, fmt.Errorf("Errors while grabbing metrics: %v", errs)
return result, fmt.Errorf("Errors while grabbing metrics: %v", errs)
}
return result, nil
}
func (g *MetricsGrabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) {
rawOutput, err := client.Core().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", podName, port)).
Suffix("metrics").
Do().Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}

View File

@ -54,6 +54,10 @@ const (
// We set a higher threshold for list apicalls as they can take more time when
// the list is really big. For eg. list nodes in a 5000-node cluster.
apiListCallLatencyThreshold time.Duration = 2 * time.Second
// Cluster Autoscaler metrics names
caFunctionMetric = "cluster_autoscaler_function_duration_seconds_bucket"
caFunctionMetricLabel = "function"
)
type MetricsForE2E metrics.MetricsCollection
@ -67,6 +71,10 @@ func (m *MetricsForE2E) filterMetrics() {
for _, metric := range InterestingControllerManagerMetrics {
interestingControllerManagerMetrics[metric] = (*m).ControllerManagerMetrics[metric]
}
interestingClusterAutoscalerMetrics := make(metrics.ClusterAutoscalerMetrics)
for _, metric := range InterestingClusterAutoscalerMetrics {
interestingClusterAutoscalerMetrics[metric] = (*m).ClusterAutoscalerMetrics[metric]
}
interestingKubeletMetrics := make(map[string]metrics.KubeletMetrics)
for kubelet, grabbed := range (*m).KubeletMetrics {
interestingKubeletMetrics[kubelet] = make(metrics.KubeletMetrics)
@ -93,6 +101,12 @@ func (m *MetricsForE2E) PrintHumanReadable() string {
buf.WriteString(fmt.Sprintf("\t%v\n", metrics.PrintSample(sample)))
}
}
for _, interestingMetric := range InterestingClusterAutoscalerMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", interestingMetric))
for _, sample := range (*m).ClusterAutoscalerMetrics[interestingMetric] {
buf.WriteString(fmt.Sprintf("\t%v\n", metrics.PrintSample(sample)))
}
}
for kubelet, grabbed := range (*m).KubeletMetrics {
buf.WriteString(fmt.Sprintf("For %v:\n", kubelet))
for _, interestingMetric := range InterestingKubeletMetrics {
@ -156,6 +170,12 @@ var InterestingKubeletMetrics = []string{
"kubelet_sync_pods_latency_microseconds",
}
var InterestingClusterAutoscalerMetrics = []string{
"function_duration_seconds",
"errors_total",
"evicted_pods_total",
}
// Dashboard metrics
type LatencyMetric struct {
Perc50 time.Duration `json:"Perc50"`
@ -199,7 +219,7 @@ func (l *SchedulingLatency) PrintJSON() string {
}
type SaturationTime struct {
TimeToSaturate time.Duration `json:"timeToStaturate"`
TimeToSaturate time.Duration `json:"timeToSaturate"`
NumberOfNodes int `json:"numberOfNodes"`
NumberOfPods int `json:"numberOfPods"`
Throughput float32 `json:"throughput"`
@ -575,3 +595,24 @@ func PrintLatencies(latencies []PodLatencyData, header string) {
Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:])
Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
}
func (m *MetricsForE2E) computeClusterAutoscalerMetricsDelta(before metrics.MetricsCollection) {
if beforeSamples, found := before.ClusterAutoscalerMetrics[caFunctionMetric]; found {
if afterSamples, found := m.ClusterAutoscalerMetrics[caFunctionMetric]; found {
beforeSamplesMap := make(map[string]*model.Sample)
for _, bSample := range beforeSamples {
beforeSamplesMap[makeKey(bSample.Metric[caFunctionMetricLabel], bSample.Metric["le"])] = bSample
}
for _, aSample := range afterSamples {
if bSample, found := beforeSamplesMap[makeKey(aSample.Metric[caFunctionMetricLabel], aSample.Metric["le"])]; found {
aSample.Value = aSample.Value - bSample.Value
}
}
}
}
}
func makeKey(a, b model.LabelValue) string {
return string(a) + "___" + string(b)
}

View File

@ -77,6 +77,8 @@ type TestContextType struct {
GatherLogsSizes bool
GatherMetricsAfterTest string
GatherSuiteMetricsAfterTest bool
// If set to 'true' framework will gather ClusterAutoscaler metrics when gathering them for other components.
IncludeClusterAutoscalerMetrics bool
// Currently supported values are 'hr' for human-readable and 'json'. It's a comma separated list.
OutputPrintType string
// NodeSchedulableTimeout is the timeout for waiting for all nodes to be schedulable.
@ -181,6 +183,7 @@ func RegisterCommonFlags() {
flag.BoolVar(&TestContext.GatherLogsSizes, "gather-logs-sizes", false, "If set to true framework will be monitoring logs sizes on all machines running e2e tests.")
flag.StringVar(&TestContext.GatherMetricsAfterTest, "gather-metrics-at-teardown", "false", "If set to 'true' framework will gather metrics from all components after each test. If set to 'master' only master component metrics would be gathered.")
flag.BoolVar(&TestContext.GatherSuiteMetricsAfterTest, "gather-suite-metrics-at-teardown", false, "If set to true framwork will gather metrics from all components after the whole test suite completes.")
flag.BoolVar(&TestContext.IncludeClusterAutoscalerMetrics, "include-cluster-autoscaler", false, "If set to true, framework will include Cluster Autoscaler when gathering metrics.")
flag.StringVar(&TestContext.OutputPrintType, "output-print-type", "json", "Format in which summaries should be printed: 'hr' for human readable, 'json' for JSON ones.")
flag.BoolVar(&TestContext.DumpLogsOnFailure, "dump-logs-on-failure", true, "If set to true test will dump data about the namespace in which test was running.")
flag.BoolVar(&TestContext.DisableLogDump, "disable-log-dump", false, "If set to true, logs from master and nodes won't be gathered after test run.")

View File

@ -4948,6 +4948,7 @@ func PrintSummaries(summaries []TestDataSummary, testBaseName string) {
} else {
// TODO: learn to extract test name and append it to the kind instead of timestamp.
filePath := path.Join(TestContext.ReportDir, summaries[i].SummaryKind()+"_"+testBaseName+"_"+now.Format(time.RFC3339)+".json")
Logf("Writing to %s", filePath)
if err := ioutil.WriteFile(filePath, []byte(summaries[i].PrintJSON()), 0644); err != nil {
Logf("Failed to write file %v with test performance data: %v", filePath, err)
}

View File

@ -31,13 +31,14 @@ import (
var _ = instrumentation.SIGDescribe("MetricsGrabber", func() {
f := framework.NewDefaultFramework("metrics-grabber")
var c clientset.Interface
var c, ec clientset.Interface
var grabber *metrics.MetricsGrabber
gin.BeforeEach(func() {
var err error
c = f.ClientSet
ec = f.KubemarkExternalClusterClientSet
framework.ExpectNoError(err)
grabber, err = metrics.NewMetricsGrabber(c, true, true, true, true)
grabber, err = metrics.NewMetricsGrabber(c, ec, true, true, true, true, true)
framework.ExpectNoError(err)
})