Merge pull request #71380 from sttts/sttts-aggregator-metrics-available

aggregator: add APIService unavailability metrics
pull/564/head
Kubernetes Prow Robot 2018-12-06 00:02:55 -08:00 committed by GitHub
commit 82b0d8fd37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 148 additions and 55 deletions

View File

@ -248,7 +248,7 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
if handler != nil { if handler != nil {
handler = metrics.InstrumentHandlerFunc(verb, resource, subresource, scope, handler) handler = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, handler)
handler(w, req) handler(w, req)
return return
} }

View File

@ -252,7 +252,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
return return
} }
requestInfo, _ := request.RequestInfoFrom(ctx) requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() { metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, req, w, timeout) serveWatch(watcher, scope, req, w, timeout)
}) })
return return

View File

@ -64,7 +64,7 @@ func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSer
stream, ok := object.(rest.ResourceStreamer) stream, ok := object.(rest.ResourceStreamer)
if ok { if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context()) requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, func() { metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req) StreamObject(statusCode, gv, s, stream, w, req)
}) })
return return

View File

@ -144,7 +144,7 @@ func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admissi
} }
} }
requestInfo, _ := request.RequestInfoFrom(ctx) requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, func() { metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w}) handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil { if err != nil {
scope.err(err, w, req) scope.err(err, w, req)

View File

@ -176,6 +176,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
return nil, err return nil, err
} }
group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version
fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer) fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
if err != nil { if err != nil {
return nil, err return nil, err
@ -571,9 +573,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if needOverride { if needOverride {
// need change the reported verb // need change the reported verb
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler) handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
} else { } else {
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler) handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
} }
if a.enableAPIResponseCompression { if a.enableAPIResponseCompression {
@ -607,7 +609,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource { if isSubresource {
doc = "list " + subresource + " of objects of kind " + kind doc = "list " + subresource + " of objects of kind " + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if a.enableAPIResponseCompression { if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler) handler = genericfilters.RestfulWithCompression(handler)
} }
@ -642,7 +644,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource { if isSubresource {
doc = "replace " + subresource + " of the specified " + kind doc = "replace " + subresource + " of the specified " + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulUpdateResource(updater, reqScope, admit)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulUpdateResource(updater, reqScope, admit))
route := ws.PUT(action.Path).To(handler). route := ws.PUT(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -669,7 +671,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
string(types.MergePatchType), string(types.MergePatchType),
string(types.StrategicMergePatchType), string(types.StrategicMergePatchType),
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulPatchResource(patcher, reqScope, admit, supportedTypes)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulPatchResource(patcher, reqScope, admit, supportedTypes))
route := ws.PATCH(action.Path).To(handler). route := ws.PATCH(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -691,7 +693,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
} else { } else {
handler = restfulCreateResource(creater, reqScope, admit) handler = restfulCreateResource(creater, reqScope, admit)
} }
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler) handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := getArticleForNoun(kind, " ") article := getArticleForNoun(kind, " ")
doc := "create" + article + kind doc := "create" + article + kind
if isSubresource { if isSubresource {
@ -720,7 +722,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource { if isSubresource {
doc = "delete " + subresource + " of" + article + kind doc = "delete " + subresource + " of" + article + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler). route := ws.DELETE(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -743,7 +745,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource { if isSubresource {
doc = "delete collection of " + subresource + " of a " + kind doc = "delete collection of " + subresource + " of a " + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler). route := ws.DELETE(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -763,7 +765,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "watch changes to " + subresource + " of an object of kind " + kind doc = "watch changes to " + subresource + " of an object of kind " + kind
} }
doc += ". deprecated: use the 'watch' parameter with a list operation instead, filtered to a single item with the 'fieldSelector' parameter." doc += ". deprecated: use the 'watch' parameter with a list operation instead, filtered to a single item with the 'fieldSelector' parameter."
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler). route := ws.GET(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -783,7 +785,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "watch individual changes to a list of " + subresource + " of " + kind doc = "watch individual changes to a list of " + subresource + " of " + kind
} }
doc += ". deprecated: use the 'watch' parameter with a list operation instead." doc += ". deprecated: use the 'watch' parameter with a list operation instead."
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler). route := ws.GET(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -806,7 +808,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource { if isSubresource {
doc = "connect " + method + " requests to " + subresource + " of " + kind doc = "connect " + method + " requests to " + subresource + " of " + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulConnectResource(connecter, reqScope, admit, path, isSubresource)) handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, restfulConnectResource(connecter, reqScope, admit, path, isSubresource))
route := ws.Method(method).Path(action.Path). route := ws.Method(method).Path(action.Path).
To(handler). To(handler).
Doc(doc). Doc(doc).

View File

@ -40,50 +40,54 @@ type resettableCollector interface {
Reset() Reset()
} }
const (
APIServerComponent string = "apiserver"
)
var ( var (
// TODO(a-robinson): Add unit tests for the handling of these metrics once // TODO(a-robinson): Add unit tests for the handling of these metrics once
// the upstream library supports it. // the upstream library supports it.
requestCounter = prometheus.NewCounterVec( requestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "apiserver_request_count", Name: "apiserver_request_count",
Help: "Counter of apiserver requests broken out for each verb, API resource, client, and HTTP response contentType and code.", Help: "Counter of apiserver requests broken out for each verb, group, version, resource, scope, component, client, and HTTP response contentType and code.",
}, },
[]string{"verb", "resource", "subresource", "scope", "client", "contentType", "code"}, []string{"verb", "group", "version", "resource", "subresource", "scope", "component", "client", "contentType", "code"},
) )
longRunningRequestGauge = prometheus.NewGaugeVec( longRunningRequestGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "apiserver_longrunning_gauge", Name: "apiserver_longrunning_gauge",
Help: "Gauge of all active long-running apiserver requests broken out by verb, API resource, and scope. Not all requests are tracked this way.", Help: "Gauge of all active long-running apiserver requests broken out by verb, group, version, resource, scope and component. Not all requests are tracked this way.",
}, },
[]string{"verb", "resource", "subresource", "scope"}, []string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
) )
requestLatencies = prometheus.NewHistogramVec( requestLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Name: "apiserver_request_latencies", Name: "apiserver_request_latencies",
Help: "Response latency distribution in microseconds for each verb, resource and subresource.", Help: "Response latency distribution in microseconds for each verb, group, version, resource, subresource, scope and component.",
// Use buckets ranging from 125 ms to 8 seconds. // Use buckets ranging from 125 ms to 8 seconds.
Buckets: prometheus.ExponentialBuckets(125000, 2.0, 7), Buckets: prometheus.ExponentialBuckets(125000, 2.0, 7),
}, },
[]string{"verb", "resource", "subresource", "scope"}, []string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
) )
requestLatenciesSummary = prometheus.NewSummaryVec( requestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{ prometheus.SummaryOpts{
Name: "apiserver_request_latencies_summary", Name: "apiserver_request_latencies_summary",
Help: "Response latency summary in microseconds for each verb, resource and subresource.", Help: "Response latency summary in microseconds for each verb, group, version, resource, subresource, scope and component.",
// Make the sliding window of 5h. // Make the sliding window of 5h.
// TODO: The value for this should be based on our SLI definition (medium term). // TODO: The value for this should be based on our SLI definition (medium term).
MaxAge: 5 * time.Hour, MaxAge: 5 * time.Hour,
}, },
[]string{"verb", "resource", "subresource", "scope"}, []string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
) )
responseSizes = prometheus.NewHistogramVec( responseSizes = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Name: "apiserver_response_sizes", Name: "apiserver_response_sizes",
Help: "Response size distribution in bytes for each verb, resource, subresource and scope (namespace/cluster).", Help: "Response size distribution in bytes for each group, version, verb, resource, subresource, scope and component.",
// Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB). // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB).
Buckets: prometheus.ExponentialBuckets(1000, 10.0, 7), Buckets: prometheus.ExponentialBuckets(1000, 10.0, 7),
}, },
[]string{"verb", "resource", "subresource", "scope"}, []string{"verb", "group", "version", "resource", "subresource", "scope", "component"},
) )
// DroppedRequests is a number of requests dropped with 'Try again later' response" // DroppedRequests is a number of requests dropped with 'Try again later' response"
DroppedRequests = prometheus.NewCounterVec( DroppedRequests = prometheus.NewCounterVec(
@ -157,21 +161,21 @@ func UpdateInflightRequestMetrics(nonmutating, mutating int) {
// Record records a single request to the standard metrics endpoints. For use by handlers that perform their own // Record records a single request to the standard metrics endpoints. For use by handlers that perform their own
// processing. All API paths should use InstrumentRouteFunc implicitly. Use this instead of MonitorRequest if // processing. All API paths should use InstrumentRouteFunc implicitly. Use this instead of MonitorRequest if
// you already have a RequestInfo object. // you already have a RequestInfo object.
func Record(req *http.Request, requestInfo *request.RequestInfo, contentType string, code int, responseSizeInBytes int, elapsed time.Duration) { func Record(req *http.Request, requestInfo *request.RequestInfo, component, contentType string, code int, responseSizeInBytes int, elapsed time.Duration) {
if requestInfo == nil { if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path} requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
} }
scope := CleanScope(requestInfo) scope := CleanScope(requestInfo)
if requestInfo.IsResourceRequest { if requestInfo.IsResourceRequest {
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, scope, contentType, code, responseSizeInBytes, elapsed) MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, contentType, code, responseSizeInBytes, elapsed)
} else { } else {
MonitorRequest(req, strings.ToUpper(requestInfo.Verb), "", requestInfo.Path, scope, contentType, code, responseSizeInBytes, elapsed) MonitorRequest(req, strings.ToUpper(requestInfo.Verb), "", "", "", requestInfo.Path, scope, component, contentType, code, responseSizeInBytes, elapsed)
} }
} }
// RecordLongRunning tracks the execution of a long running request against the API server. It provides an accurate count // RecordLongRunning tracks the execution of a long running request against the API server. It provides an accurate count
// of the total number of open long running requests. requestInfo may be nil if the caller is not in the normal request flow. // of the total number of open long running requests. requestInfo may be nil if the caller is not in the normal request flow.
func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn func()) { func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, component string, fn func()) {
if requestInfo == nil { if requestInfo == nil {
requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path} requestInfo = &request.RequestInfo{Verb: req.Method, Path: req.URL.Path}
} }
@ -179,9 +183,9 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn f
scope := CleanScope(requestInfo) scope := CleanScope(requestInfo)
reportedVerb := cleanVerb(strings.ToUpper(requestInfo.Verb), req) reportedVerb := cleanVerb(strings.ToUpper(requestInfo.Verb), req)
if requestInfo.IsResourceRequest { if requestInfo.IsResourceRequest {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.Resource, requestInfo.Subresource, scope) g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component)
} else { } else {
g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", requestInfo.Path, scope) g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component)
} }
g.Inc() g.Inc()
defer g.Dec() defer g.Dec()
@ -190,22 +194,22 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, fn f
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record // MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. // a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
func MonitorRequest(req *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, elapsed time.Duration) { func MonitorRequest(req *http.Request, verb, group, version, resource, subresource, scope, component, contentType string, httpCode, respSize int, elapsed time.Duration) {
reportedVerb := cleanVerb(verb, req) reportedVerb := cleanVerb(verb, req)
client := cleanUserAgent(utilnet.GetHTTPClient(req)) client := cleanUserAgent(utilnet.GetHTTPClient(req))
elapsedMicroseconds := float64(elapsed / time.Microsecond) elapsedMicroseconds := float64(elapsed / time.Microsecond)
requestCounter.WithLabelValues(reportedVerb, resource, subresource, scope, client, contentType, codeToString(httpCode)).Inc() requestCounter.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component, client, contentType, codeToString(httpCode)).Inc()
requestLatencies.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(elapsedMicroseconds) requestLatencies.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(elapsedMicroseconds)
requestLatenciesSummary.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(elapsedMicroseconds) requestLatenciesSummary.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(elapsedMicroseconds)
// We are only interested in response sizes of read requests. // We are only interested in response sizes of read requests.
if verb == "GET" || verb == "LIST" { if verb == "GET" || verb == "LIST" {
responseSizes.WithLabelValues(reportedVerb, resource, subresource, scope).Observe(float64(respSize)) responseSizes.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize))
} }
} }
// InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps // InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps
// the go-restful RouteFunction instead of a HandlerFunc plus some Kubernetes endpoint specific information. // the go-restful RouteFunction instead of a HandlerFunc plus some Kubernetes endpoint specific information.
func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc restful.RouteFunction) restful.RouteFunction { func InstrumentRouteFunc(verb, group, version, resource, subresource, scope, component string, routeFunc restful.RouteFunction) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
now := time.Now() now := time.Now()
@ -224,12 +228,12 @@ func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc re
routeFunc(request, response) routeFunc(request, response)
MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Since(now)) MonitorRequest(request.Request, verb, group, version, resource, subresource, scope, component, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Since(now))
}) })
} }
// InstrumentHandlerFunc works like Prometheus' InstrumentHandlerFunc but adds some Kubernetes endpoint specific information. // InstrumentHandlerFunc works like Prometheus' InstrumentHandlerFunc but adds some Kubernetes endpoint specific information.
func InstrumentHandlerFunc(verb, resource, subresource, scope string, handler http.HandlerFunc) http.HandlerFunc { func InstrumentHandlerFunc(verb, group, version, resource, subresource, scope, component string, handler http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) { return func(w http.ResponseWriter, req *http.Request) {
now := time.Now() now := time.Now()
@ -246,7 +250,7 @@ func InstrumentHandlerFunc(verb, resource, subresource, scope string, handler ht
handler(w, req) handler(w, req)
MonitorRequest(req, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Since(now)) MonitorRequest(req, verb, group, version, resource, subresource, scope, component, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), time.Since(now))
} }
} }

View File

@ -176,7 +176,7 @@ func WithMaxInFlightLimit(
} }
} }
} }
metrics.Record(r, requestInfo, "", http.StatusTooManyRequests, 0, 0) metrics.Record(r, requestInfo, metrics.APIServerComponent, "", http.StatusTooManyRequests, 0, 0)
tooManyRequests(r, w) tooManyRequests(r, w)
} }
} }

View File

@ -58,7 +58,7 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apir
postTimeoutFn := func() { postTimeoutFn := func() {
cancel() cancel()
metrics.Record(req, requestInfo, "", http.StatusGatewayTimeout, 0, 0) metrics.Record(req, requestInfo, metrics.APIServerComponent, "", http.StatusGatewayTimeout, 0, 0)
} }
return req, time.After(timeout), postTimeoutFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0) return req, time.After(timeout), postTimeoutFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0)
} }

View File

@ -55,6 +55,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library",

View File

@ -30,6 +30,7 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
genericfeatures "k8s.io/apiserver/pkg/features" genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -38,6 +39,8 @@ import (
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration" apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
) )
const aggregatorComponent string = "aggregator"
// proxyHandler provides a http.Handler which will proxy traffic to locations // proxyHandler provides a http.Handler which will proxy traffic to locations
// specified by items implementing Redirector. // specified by items implementing Redirector.
type proxyHandler struct { type proxyHandler struct {
@ -60,6 +63,8 @@ type proxyHandlingInfo struct {
// local indicates that this APIService is locally satisfied // local indicates that this APIService is locally satisfied
local bool local bool
// name is the name of the APIService
name string
// restConfig holds the information for building a roundtripper // restConfig holds the information for building a roundtripper
restConfig *restclient.Config restConfig *restclient.Config
// transportBuildingError is an error produced while building the transport. If this // transportBuildingError is an error produced while building the transport. If this
@ -75,6 +80,19 @@ type proxyHandlingInfo struct {
serviceAvailable bool serviceAvailable bool
} }
func proxyError(w http.ResponseWriter, req *http.Request, error string, code int) {
http.Error(w, error, code)
ctx := req.Context()
info, ok := genericapirequest.RequestInfoFrom(ctx)
if !ok {
klog.Warning("no RequestInfo found in the context")
return
}
// TODO: record long-running request differently? The long-running check func does not necessarily match the one of the aggregated apiserver
endpointmetrics.Record(req, info, aggregatorComponent, "", code, 0, 0)
}
func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
value := r.handlingInfo.Load() value := r.handlingInfo.Load()
if value == nil { if value == nil {
@ -92,18 +110,18 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
if !handlingInfo.serviceAvailable { if !handlingInfo.serviceAvailable {
http.Error(w, "service unavailable", http.StatusServiceUnavailable) proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return return
} }
if handlingInfo.transportBuildingError != nil { if handlingInfo.transportBuildingError != nil {
http.Error(w, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError) proxyError(w, req, handlingInfo.transportBuildingError.Error(), http.StatusInternalServerError)
return return
} }
user, ok := genericapirequest.UserFrom(req.Context()) user, ok := genericapirequest.UserFrom(req.Context())
if !ok { if !ok {
http.Error(w, "missing user", http.StatusInternalServerError) proxyError(w, req, "missing user", http.StatusInternalServerError)
return return
} }
@ -113,7 +131,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName) rloc, err := r.serviceResolver.ResolveEndpoint(handlingInfo.serviceNamespace, handlingInfo.serviceName)
if err != nil { if err != nil {
klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err) klog.Errorf("error resolving %s/%s: %v", handlingInfo.serviceNamespace, handlingInfo.serviceName, err)
http.Error(w, "service unavailable", http.StatusServiceUnavailable) proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
return return
} }
location.Host = rloc.Host location.Host = rloc.Host
@ -126,14 +144,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newReq.URL = location newReq.URL = location
if handlingInfo.proxyRoundTripper == nil { if handlingInfo.proxyRoundTripper == nil {
http.Error(w, "", http.StatusNotFound) proxyError(w, req, "", http.StatusNotFound)
return return
} }
// we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers // we need to wrap the roundtripper in another roundtripper which will apply the front proxy headers
proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req) proxyRoundTripper, upgrade, err := maybeWrapForConnectionUpgrades(handlingInfo.restConfig, handlingInfo.proxyRoundTripper, req)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) proxyError(w, req, err.Error(), http.StatusInternalServerError)
return return
} }
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper) proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
@ -195,6 +213,7 @@ func (r *proxyHandler) updateAPIService(apiService *apiregistrationapi.APIServic
} }
newInfo := proxyHandlingInfo{ newInfo := proxyHandlingInfo{
name: apiService.Name,
restConfig: &restclient.Config{ restConfig: &restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{ TLSClientConfig: restclient.TLSClientConfig{
Insecure: apiService.Spec.InsecureSkipTLSVerify, Insecure: apiService.Spec.InsecureSkipTLSVerify,

View File

@ -8,7 +8,10 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["available_controller.go"], srcs = [
"available_controller.go",
"metrics.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/status", importmap = "k8s.io/kubernetes/vendor/k8s.io/kube-aggregator/pkg/controllers/status",
importpath = "k8s.io/kube-aggregator/pkg/controllers/status", importpath = "k8s.io/kube-aggregator/pkg/controllers/status",
deps = [ deps = [
@ -16,7 +19,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -30,6 +32,7 @@ go_library(
"//staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library", "//staging/src/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -152,9 +151,8 @@ func (c *AvailableConditionController) sync(key string) error {
apiService := originalAPIService.DeepCopy() apiService := originalAPIService.DeepCopy()
availableCondition := apiregistration.APIServiceCondition{ availableCondition := apiregistration.APIServiceCondition{
Type: apiregistration.Available, Type: apiregistration.Available,
Status: apiregistration.ConditionTrue, Status: apiregistration.ConditionTrue,
LastTransitionTime: metav1.Now(),
} }
// local API services are always considered available // local API services are always considered available
@ -283,7 +281,30 @@ func updateAPIServiceStatus(client apiregistrationclient.APIServicesGetter, orig
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil return newAPIService, nil
} }
return client.APIServices().UpdateStatus(newAPIService)
newAPIService, err := client.APIServices().UpdateStatus(newAPIService)
if err != nil {
return nil, err
}
// update metrics
wasAvailable := apiregistration.IsAPIServiceConditionTrue(originalAPIService, apiregistration.Available)
isAvailable := apiregistration.IsAPIServiceConditionTrue(newAPIService, apiregistration.Available)
if isAvailable != wasAvailable {
if isAvailable {
unavailableGauge.WithLabelValues(newAPIService.Name).Set(0.0)
} else {
unavailableGauge.WithLabelValues(newAPIService.Name).Set(1.0)
reason := "UnknownReason"
if newCondition := apiregistration.GetAPIServiceConditionByType(newAPIService, apiregistration.Available); newCondition != nil {
reason = newCondition.Reason
}
unavailableCounter.WithLabelValues(newAPIService.Name, reason).Inc()
}
}
return newAPIService, nil
} }
func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) { func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) {

View File

@ -0,0 +1,43 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
unavailableCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "aggregator_unavailable_apiservice_count",
Help: "Counter of APIServices which are marked as unavailable broken down by APIService name and reason.",
},
[]string{"name", "reason"},
)
unavailableGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "aggregator_unavailable_apiserver_gauge",
Help: "Gauge of APIServices which are marked as unavailable broken down by APIService name.",
},
[]string{"name"},
)
)
func init() {
prometheus.MustRegister(unavailableCounter)
prometheus.MustRegister(unavailableGauge)
}