Merge pull request #64695 from krzysied/etcd_metrics

Automatic merge from submit-queue (batch tested with PRs 64695, 65982, 65908). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Collecting etcd metrics

**What this PR does / why we need it**:
Adding etcd metrics to performance test log.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
ref #64030

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
pull/8/head
Kubernetes Submit Queue 2018-07-10 08:55:03 -07:00 committed by GitHub
commit a8777c26fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 0 deletions

View File

@ -141,6 +141,7 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/autoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/beorn7/perks/quantile:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/ginkgo/config:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/test/e2e/framework/metrics"
"github.com/beorn7/perks/quantile"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
)
@ -232,6 +233,74 @@ func (l *SchedulingMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
type histogram struct {
Count int
Buckets map[float64]int
}
func (h *histogram) ConvertToRangeBuckets() {
keys := []float64{}
for k := range h.Buckets {
keys = append(keys, k)
}
sort.Float64s(keys)
lessValuesSum := 0
for _, k := range keys {
h.Buckets[k] -= lessValuesSum
lessValuesSum += h.Buckets[k]
}
}
func (h *histogram) CalculatePerc(perc float64) float64 {
targets := map[float64]float64{
perc: 0.001,
}
q := quantile.NewTargeted(targets)
var samples quantile.Samples
for k, v := range h.Buckets {
if v > 0 {
samples = append(samples, quantile.Sample{Value: k, Width: float64(v)})
}
}
q.Merge(samples)
return q.Query(perc)
}
func (h *histogram) ConvertToLatencyMetric(m *LatencyMetric) {
h.ConvertToRangeBuckets()
m.Perc50 = time.Duration(h.CalculatePerc(0.5))
m.Perc90 = time.Duration(h.CalculatePerc(0.9))
m.Perc99 = time.Duration(h.CalculatePerc(0.99))
}
func getNewHistogram() *histogram {
return &histogram{
Count: 0,
Buckets: make(map[float64]int),
}
}
type EtcdMetrics struct {
BackendCommitDuration LatencyMetric `json:"backendCommitDuration"`
SnapshotSaveTotalDuration LatencyMetric `json:"snapshotSaveTotalDuration"`
PeerRoundTripTime LatencyMetric `json:"peerRoundTripTime"`
WalFsyncFuration LatencyMetric `json:"walFsyncFuration"`
}
func (l *EtcdMetrics) SummaryKind() string {
return "EtcdMetrics"
}
func (l *EtcdMetrics) PrintHumanReadable() string {
return PrettyPrintJSON(l)
}
func (l *EtcdMetrics) PrintJSON() string {
return PrettyPrintJSON(l)
}
type SaturationTime struct {
TimeToSaturate time.Duration `json:"timeToSaturate"`
NumberOfNodes int `json:"numberOfNodes"`
@ -552,6 +621,71 @@ func ResetSchedulerMetrics(c clientset.Interface) error {
return nil
}
func convertSampleToBucket(sample *model.Sample, h *histogram, ratio float64) {
if sample.Metric["le"] == "+Inf" {
h.Buckets[math.MaxFloat64] = int(sample.Value)
} else {
f, err := strconv.ParseFloat(string(sample.Metric["le"]), 64)
if err == nil {
h.Buckets[f*ratio] = int(sample.Value)
}
}
}
// VerifyEtcdMetrics verifies etcd metrics by logging them
func VerifyEtcdMetrics(c clientset.Interface) (*EtcdMetrics, error) {
// Etcd is only exposed on localhost level. We are using ssh method
if TestContext.Provider == "gke" {
Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke")
return nil, nil
}
cmd := "curl http://localhost:2379/metrics"
sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
if err != nil || sshResult.Code != 0 {
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
}
data := sshResult.Stdout
samples, err := extractMetricSamples(data)
if err != nil {
return nil, err
}
backendCommitDurationHistogam := getNewHistogram()
snapshotSaveTotalDurationHistogram := getNewHistogram()
peerRoundTripTimeHistogram := getNewHistogram()
walFsyncDurationHistogram := getNewHistogram()
secondToMillisecondRatio := float64(time.Second / time.Millisecond)
for _, sample := range samples {
switch sample.Metric[model.MetricNameLabel] {
case "etcd_disk_backend_commit_duration_seconds_bucket":
convertSampleToBucket(sample, backendCommitDurationHistogam, secondToMillisecondRatio)
case "etcd_disk_backend_commit_duration_seconds_count":
backendCommitDurationHistogam.Count = int(sample.Value)
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
convertSampleToBucket(sample, snapshotSaveTotalDurationHistogram, secondToMillisecondRatio)
case "etcd_debugging_snap_save_total_duration_seconds_count":
backendCommitDurationHistogam.Count = int(sample.Value)
case "etcd_disk_wal_fsync_duration_seconds_bucket":
convertSampleToBucket(sample, walFsyncDurationHistogram, secondToMillisecondRatio)
case "etcd_disk_wal_fsync_duration_seconds_count":
walFsyncDurationHistogram.Count = int(sample.Value)
case "etcd_network_peer_round_trip_time_seconds_bucket":
convertSampleToBucket(sample, peerRoundTripTimeHistogram, secondToMillisecondRatio)
case "etcd_network_peer_round_trip_time_seconds_count":
peerRoundTripTimeHistogram.Count = int(sample.Value)
}
}
result := EtcdMetrics{}
backendCommitDurationHistogam.ConvertToLatencyMetric(&result.BackendCommitDuration)
snapshotSaveTotalDurationHistogram.ConvertToLatencyMetric(&result.SnapshotSaveTotalDuration)
peerRoundTripTimeHistogram.ConvertToLatencyMetric(&result.PeerRoundTripTime)
walFsyncDurationHistogram.ConvertToLatencyMetric(&result.WalFsyncFuration)
return &result, nil
}
func PrettyPrintJSON(metrics interface{}) string {
output := &bytes.Buffer{}
if err := json.NewEncoder(output).Encode(metrics); err != nil {

View File

@ -441,6 +441,13 @@ var _ = SIGDescribe("Density", func() {
}
summaries = append(summaries, latency)
}
etcdMetrics, err := framework.VerifyEtcdMetrics(c)
framework.ExpectNoError(err)
if err == nil {
summaries = append(summaries, etcdMetrics)
}
summaries = append(summaries, testPhaseDurations)
framework.PrintSummaries(summaries, testCaseBaseName)