From aa022310a476888b90eccaa5c3e7a4180b32b095 Mon Sep 17 00:00:00 2001 From: Krzysztof Siedlecki Date: Mon, 4 Jun 2018 14:17:21 +0200 Subject: [PATCH] Collecting etcd metrics --- test/e2e/framework/BUILD | 1 + test/e2e/framework/metrics_util.go | 134 +++++++++++++++++++++++++++++ test/e2e/scalability/density.go | 7 ++ 3 files changed, 142 insertions(+) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 9f6cba3800..85afe5c11a 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -92,6 +92,7 @@ go_library( "//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", + "//vendor/github.com/beorn7/perks/quantile:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", "//vendor/github.com/onsi/ginkgo/config:go_default_library", diff --git a/test/e2e/framework/metrics_util.go b/test/e2e/framework/metrics_util.go index edd31c06ce..52225636b1 100644 --- a/test/e2e/framework/metrics_util.go +++ b/test/e2e/framework/metrics_util.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/util/system" "k8s.io/kubernetes/test/e2e/framework/metrics" + "github.com/beorn7/perks/quantile" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" ) @@ -228,6 +229,74 @@ func (l *SchedulingMetrics) PrintJSON() string { return PrettyPrintJSON(l) } +type histogram struct { + Count int + Buckets map[float64]int +} + +func (h *histogram) ConvertToRangeBuckets() { + keys := []float64{} + for k := range h.Buckets { + keys = append(keys, k) + } + sort.Float64s(keys) + lessValuesSum := 0 + for _, k := range keys { + h.Buckets[k] -= lessValuesSum + lessValuesSum += h.Buckets[k] + } +} + +func (h *histogram) CalculatePerc(perc float64) float64 { + targets := map[float64]float64{ + perc: 0.001, + } + q := quantile.NewTargeted(targets) + var samples quantile.Samples + + for k, v := range h.Buckets { + if v > 0 { + samples = append(samples, quantile.Sample{Value: k, Width: float64(v)}) + } + } + q.Merge(samples) + + return q.Query(perc) +} + +func (h *histogram) ConvertToLatencyMetric(m *LatencyMetric) { + h.ConvertToRangeBuckets() + m.Perc50 = time.Duration(h.CalculatePerc(0.5)) + m.Perc90 = time.Duration(h.CalculatePerc(0.9)) + m.Perc99 = time.Duration(h.CalculatePerc(0.99)) +} + +func getNewHistogram() *histogram { + return &histogram{ + Count: 0, + Buckets: make(map[float64]int), + } +} + +type EtcdMetrics struct { + BackendCommitDuration LatencyMetric `json:"backendCommitDuration"` + SnapshotSaveTotalDuration LatencyMetric `json:"snapshotSaveTotalDuration"` + PeerRoundTripTime LatencyMetric `json:"peerRoundTripTime"` + WalFsyncFuration LatencyMetric `json:"walFsyncFuration"` +} + +func (l *EtcdMetrics) SummaryKind() string { + return "EtcdMetrics" +} + +func (l *EtcdMetrics) PrintHumanReadable() string { + return PrettyPrintJSON(l) +} + +func (l *EtcdMetrics) PrintJSON() string { + return PrettyPrintJSON(l) +} + type SaturationTime struct { TimeToSaturate time.Duration `json:"timeToSaturate"` NumberOfNodes int `json:"numberOfNodes"` @@ -547,6 +616,71 @@ func ResetSchedulerMetrics(c clientset.Interface) error { return nil } +func convertSampleToBucket(sample *model.Sample, h *histogram, ratio float64) { + if sample.Metric["le"] == "+Inf" { + h.Buckets[math.MaxFloat64] = int(sample.Value) + } else { + f, err := strconv.ParseFloat(string(sample.Metric["le"]), 64) + if err == nil { + h.Buckets[f*ratio] = int(sample.Value) + } + } +} + +// VerifyEtcdMetrics verifies etcd metrics by logging them +func VerifyEtcdMetrics(c clientset.Interface) (*EtcdMetrics, error) { + // Etcd is only exposed on localhost level. We are using ssh method + if TestContext.Provider == "gke" { + Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke") + return nil, nil + } + + cmd := "curl http://localhost:2379/metrics" + sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider) + if err != nil || sshResult.Code != 0 { + return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err) + } + data := sshResult.Stdout + + samples, err := extractMetricSamples(data) + if err != nil { + return nil, err + } + + backendCommitDurationHistogam := getNewHistogram() + snapshotSaveTotalDurationHistogram := getNewHistogram() + peerRoundTripTimeHistogram := getNewHistogram() + walFsyncDurationHistogram := getNewHistogram() + secondToMillisecondRatio := float64(time.Second / time.Millisecond) + for _, sample := range samples { + switch sample.Metric[model.MetricNameLabel] { + case "etcd_disk_backend_commit_duration_seconds_bucket": + convertSampleToBucket(sample, backendCommitDurationHistogam, secondToMillisecondRatio) + case "etcd_disk_backend_commit_duration_seconds_count": + backendCommitDurationHistogam.Count = int(sample.Value) + case "etcd_debugging_snap_save_total_duration_seconds_bucket": + convertSampleToBucket(sample, snapshotSaveTotalDurationHistogram, secondToMillisecondRatio) + case "etcd_debugging_snap_save_total_duration_seconds_count": + backendCommitDurationHistogam.Count = int(sample.Value) + case "etcd_disk_wal_fsync_duration_seconds_bucket": + convertSampleToBucket(sample, walFsyncDurationHistogram, secondToMillisecondRatio) + case "etcd_disk_wal_fsync_duration_seconds_count": + walFsyncDurationHistogram.Count = int(sample.Value) + case "etcd_network_peer_round_trip_time_seconds_bucket": + convertSampleToBucket(sample, peerRoundTripTimeHistogram, secondToMillisecondRatio) + case "etcd_network_peer_round_trip_time_seconds_count": + peerRoundTripTimeHistogram.Count = int(sample.Value) + } + } + + result := EtcdMetrics{} + backendCommitDurationHistogam.ConvertToLatencyMetric(&result.BackendCommitDuration) + snapshotSaveTotalDurationHistogram.ConvertToLatencyMetric(&result.SnapshotSaveTotalDuration) + peerRoundTripTimeHistogram.ConvertToLatencyMetric(&result.PeerRoundTripTime) + walFsyncDurationHistogram.ConvertToLatencyMetric(&result.WalFsyncFuration) + return &result, nil +} + func PrettyPrintJSON(metrics interface{}) string { output := &bytes.Buffer{} if err := json.NewEncoder(output).Encode(metrics); err != nil { diff --git a/test/e2e/scalability/density.go b/test/e2e/scalability/density.go index 9d7794b96b..a46cadb5e4 100644 --- a/test/e2e/scalability/density.go +++ b/test/e2e/scalability/density.go @@ -401,6 +401,13 @@ var _ = SIGDescribe("Density", func() { latency.ThroughputSamples = scheduleThroughputs summaries = append(summaries, latency) } + + etcdMetrics, err := framework.VerifyEtcdMetrics(c) + framework.ExpectNoError(err) + if err == nil { + summaries = append(summaries, etcdMetrics) + } + summaries = append(summaries, testPhaseDurations) framework.PrintSummaries(summaries, testCaseBaseName)