Merge pull request #48224 from deads2k/controller-11-reflectormetrics

Automatic merge from submit-queue (batch tested with PRs 48224, 45431, 45946, 48775, 49396)

add reflector metrics

This adds metrics (optionally prometheus) to reflectors so that you can see when one reflector is behaving poorly and just how poorly its doing.

@eparis 

```release-note
Adds metrics for checking reflector health.
```
pull/6/head
Kubernetes Submit Queue 2017-07-25 11:06:47 -07:00 committed by GitHub
commit 9c3d0e8a96
17 changed files with 321 additions and 4 deletions

View File

@ -43,6 +43,8 @@ go_library(
"//pkg/quota/install:go_default_library",
"//pkg/registry/cachesize:go_default_library",
"//pkg/registry/rbac/rest:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//pkg/version:go_default_library",
"//plugin/pkg/admission/admit:go_default_library",
"//plugin/pkg/admission/alwayspullimages:go_default_library",

View File

@ -83,6 +83,9 @@ import (
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/bootstrap"
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
)
const etcdRetryLimit = 60

View File

@ -30,6 +30,7 @@ go_library(
"//cmd/kube-controller-manager/app:go_default_library",
"//cmd/kube-controller-manager/app/options:go_default_library",
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
"k8s.io/kubernetes/pkg/version/verflag"

View File

@ -21,6 +21,7 @@ go_library(
deps = [
"//federation/cmd/federation-controller-manager/app:go_default_library",
"//federation/cmd/federation-controller-manager/app/options:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app"
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
"k8s.io/kubernetes/pkg/version/verflag"
)

View File

@ -26,6 +26,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/garbagecollector/metaonly:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
// install the prometheus plugin
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus"
// import known versions

View File

@ -44,6 +44,7 @@ filegroup(
"//pkg/util/parsers:all-srcs",
"//pkg/util/pointer:all-srcs",
"//pkg/util/procfs:all-srcs",
"//pkg/util/reflector/prometheus:all-srcs",
"//pkg/util/removeall:all-srcs",
"//pkg/util/resourcecontainer:all-srcs",
"//pkg/util/rlimit:all-srcs",

View File

@ -0,0 +1,31 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["prometheus.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,127 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package prometheus sets the cache DefaultMetricsFactory to produce
// prometheus metrics. To use this package, you just have to import it.
package prometheus
import (
"k8s.io/client-go/tools/cache"
"github.com/prometheus/client_golang/prometheus"
)
const reflectorSubsystem = "reflector"
var (
listsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: reflectorSubsystem,
Name: "lists_total",
Help: "Total number of API lists done by the reflectors",
}, []string{"name"})
listsDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: reflectorSubsystem,
Name: "list_duration_seconds",
Help: "How long an API list takes to return and decode for the reflectors",
}, []string{"name"})
itemsPerList = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: reflectorSubsystem,
Name: "items_per_list",
Help: "How many items an API list returns to the reflectors",
}, []string{"name"})
watchesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: reflectorSubsystem,
Name: "watches_total",
Help: "Total number of API watches done by the reflectors",
}, []string{"name"})
shortWatchesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: reflectorSubsystem,
Name: "short_watches_total",
Help: "Total number of short API watches done by the reflectors",
}, []string{"name"})
watchDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: reflectorSubsystem,
Name: "watch_duration_seconds",
Help: "How long an API watch takes to return and decode for the reflectors",
}, []string{"name"})
itemsPerWatch = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Subsystem: reflectorSubsystem,
Name: "items_per_watch",
Help: "How many items an API watch returns to the reflectors",
}, []string{"name"})
)
func init() {
prometheus.MustRegister(listsTotal)
prometheus.MustRegister(listsDuration)
prometheus.MustRegister(itemsPerList)
prometheus.MustRegister(watchesTotal)
prometheus.MustRegister(shortWatchesTotal)
prometheus.MustRegister(watchDuration)
prometheus.MustRegister(itemsPerWatch)
cache.SetReflectorMetricsProvider(prometheusMetricsProvider{})
}
type prometheusMetricsProvider struct{}
func (prometheusMetricsProvider) NewListsMetric(name string) cache.CounterMetric {
return listsTotal.WithLabelValues(name)
}
// use summary to get averages and percentiles
func (prometheusMetricsProvider) NewListDurationMetric(name string) cache.SummaryMetric {
return listsDuration.WithLabelValues(name)
}
// use summary to get averages and percentiles
func (prometheusMetricsProvider) NewItemsInListMetric(name string) cache.SummaryMetric {
return itemsPerList.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewWatchesMetric(name string) cache.CounterMetric {
return watchesTotal.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewShortWatchesMetric(name string) cache.CounterMetric {
return shortWatchesTotal.WithLabelValues(name)
}
// use summary to get averages and percentiles
func (prometheusMetricsProvider) NewWatchDurationMetric(name string) cache.SummaryMetric {
return watchDuration.WithLabelValues(name)
}
// use summary to get averages and percentiles
func (prometheusMetricsProvider) NewItemsInWatchMetric(name string) cache.SummaryMetric {
return itemsPerWatch.WithLabelValues(name)
}
func (prometheusMetricsProvider) NewLastResourceVersionMetric(name string) cache.GaugeMetric {
rv := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "last_resource_version",
Help: "last resource version seen for the reflectors",
})
prometheus.MustRegister(rv)
return rv
}

View File

@ -25,6 +25,7 @@ go_library(
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/quota:go_default_library",
"//pkg/util/reflector/prometheus:go_default_library",
"//pkg/util/workqueue/prometheus:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota:go_default_library",
"//plugin/pkg/admission/resourcequota/apis/resourcequota/install:go_default_library",

View File

@ -34,6 +34,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/quota"
_ "k8s.io/kubernetes/pkg/util/reflector/prometheus" // for reflector metric registration
_ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // for workqueue metric registration
resourcequotaapi "k8s.io/kubernetes/plugin/pkg/admission/resourcequota/apis/resourcequota"
)

View File

@ -196,6 +196,7 @@ type Cacher struct {
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
@ -212,7 +213,7 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
copier: config.Copier,
objectType: reflect.TypeOf(config.Type),
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,

View File

@ -56,6 +56,7 @@ go_library(
"mutation_cache.go",
"mutation_detector.go",
"reflector.go",
"reflector_metrics.go",
"shared_informer.go",
"store.go",
"thread_safe_store.go",

View File

@ -48,6 +48,8 @@ import (
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics
// The type of object we expect to place in the store.
expectedType reflect.Type
@ -99,7 +101,9 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
name: name,
// we need this to be unique per process (some names are still the same)but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPromethusMetricName(fmt.Sprintf("reflector_"+name+"_%07d", rand.Intn(1000000)))),
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
@ -110,6 +114,11 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
return r
}
func makeValidPromethusMetricName(in string) string {
// this isn't perfect, but it removes our common characters
return strings.NewReplacer("/", "_", ".", "_", "-", "_").Replace(in)
}
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
// call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/", "/runtime/asm_"}
@ -231,10 +240,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
r.metrics.numberOfLists.Inc()
start := r.clock.Now()
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
@ -244,6 +256,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
@ -282,6 +295,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
TimeoutSeconds: &timemoutseconds,
}
r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
@ -333,6 +347,11 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
// update metrics
defer func() {
r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
r.metrics.watchDuration.Observe(time.Since(start).Seconds())
}()
loop:
for {
@ -388,8 +407,8 @@ loop:
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
return errors.New("very short watch")
r.metrics.numberOfShortWatches.Inc()
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
return nil
@ -407,4 +426,9 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
rv, err := strconv.Atoi(v)
if err == nil {
r.metrics.lastResourceVersion.Set(float64(rv))
}
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// This file provides abstractions for setting the provider (e.g., prometheus)
// of metrics.
package cache
import (
"sync"
)
// GaugeMetric represents a single numerical value that can arbitrarily go up
// and down.
type GaugeMetric interface {
Set(float64)
}
// CounterMetric represents a single numerical value that only ever
// goes up.
type CounterMetric interface {
Inc()
}
// SummaryMetric captures individual observations.
type SummaryMetric interface {
Observe(float64)
}
type noopMetric struct{}
func (noopMetric) Inc() {}
func (noopMetric) Dec() {}
func (noopMetric) Observe(float64) {}
func (noopMetric) Set(float64) {}
type reflectorMetrics struct {
numberOfLists CounterMetric
listDuration SummaryMetric
numberOfItemsInList SummaryMetric
numberOfWatches CounterMetric
numberOfShortWatches CounterMetric
watchDuration SummaryMetric
numberOfItemsInWatch SummaryMetric
lastResourceVersion GaugeMetric
}
// MetricsProvider generates various metrics used by the reflector.
type MetricsProvider interface {
NewListsMetric(name string) CounterMetric
NewListDurationMetric(name string) SummaryMetric
NewItemsInListMetric(name string) SummaryMetric
NewWatchesMetric(name string) CounterMetric
NewShortWatchesMetric(name string) CounterMetric
NewWatchDurationMetric(name string) SummaryMetric
NewItemsInWatchMetric(name string) SummaryMetric
NewLastResourceVersionMetric(name string) GaugeMetric
}
type noopMetricsProvider struct{}
func (noopMetricsProvider) NewListsMetric(name string) CounterMetric { return noopMetric{} }
func (noopMetricsProvider) NewListDurationMetric(name string) SummaryMetric { return noopMetric{} }
func (noopMetricsProvider) NewItemsInListMetric(name string) SummaryMetric { return noopMetric{} }
func (noopMetricsProvider) NewWatchesMetric(name string) CounterMetric { return noopMetric{} }
func (noopMetricsProvider) NewShortWatchesMetric(name string) CounterMetric { return noopMetric{} }
func (noopMetricsProvider) NewWatchDurationMetric(name string) SummaryMetric { return noopMetric{} }
func (noopMetricsProvider) NewItemsInWatchMetric(name string) SummaryMetric { return noopMetric{} }
func (noopMetricsProvider) NewLastResourceVersionMetric(name string) GaugeMetric {
return noopMetric{}
}
var metricsFactory = struct {
metricsProvider MetricsProvider
setProviders sync.Once
}{
metricsProvider: noopMetricsProvider{},
}
func newReflectorMetrics(name string) *reflectorMetrics {
var ret *reflectorMetrics
if len(name) == 0 {
return ret
}
return &reflectorMetrics{
numberOfLists: metricsFactory.metricsProvider.NewListsMetric(name),
listDuration: metricsFactory.metricsProvider.NewListDurationMetric(name),
numberOfItemsInList: metricsFactory.metricsProvider.NewItemsInListMetric(name),
numberOfWatches: metricsFactory.metricsProvider.NewWatchesMetric(name),
numberOfShortWatches: metricsFactory.metricsProvider.NewShortWatchesMetric(name),
watchDuration: metricsFactory.metricsProvider.NewWatchDurationMetric(name),
numberOfItemsInWatch: metricsFactory.metricsProvider.NewItemsInWatchMetric(name),
lastResourceVersion: metricsFactory.metricsProvider.NewLastResourceVersionMetric(name),
}
}
// SetReflectorMetricsProvider sets the metrics provider
func SetReflectorMetricsProvider(metricsProvider MetricsProvider) {
metricsFactory.setProviders.Do(func() {
metricsFactory.metricsProvider = metricsProvider
})
}