Merge pull request #49117 from shyamjvs/add-size-metrics-to-apiserver

Automatic merge from submit-queue

Add apiserver metric for response sizes

Fixes https://github.com/kubernetes/kubernetes/issues/47728

This should help us understand GET/LIST call latencies better. It'll also help catch differences in object sizes across kubemark and real cluster.
I'm labelling the metrics by namespace (hoping that there won't be toooo many of them).

/cc @smarterclayton @gmarek 
cc @kubernetes/sig-scalability-misc @kubernetes/sig-api-machinery-misc @lavalamp @wojtek-t
pull/6/head
Kubernetes Submit Queue 2017-07-28 11:50:00 -07:00 committed by GitHub
commit 3cd5beceb4
5 changed files with 92 additions and 38 deletions

View File

@ -53,18 +53,22 @@ type ProxyHandler struct {
}
func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
reqStart := time.Now()
proxyHandlerTraceID := rand.Int63()
var verb string
var apiResource, subresource string
var verb, apiResource, subresource, scope string
var httpCode int
reqStart := time.Now()
defer func() {
responseLength := 0
if rw, ok := w.(*metrics.ResponseWriterDelegator); ok {
responseLength = rw.ContentLength()
}
metrics.Monitor(
verb, apiResource, subresource,
verb, apiResource, subresource, scope,
net.GetHTTPClient(req),
w.Header().Get("Content-Type"),
httpCode, reqStart,
httpCode, responseLength, reqStart,
)
}()
@ -88,6 +92,10 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
verb = requestInfo.Verb
namespace, resource, subresource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Subresource, requestInfo.Parts
scope = "cluster"
if namespace != "" {
scope = "namespace"
}
ctx = request.WithNamespace(ctx, namespace)
if len(parts) < 2 {

View File

@ -567,6 +567,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
namespaced = ""
}
// This variable is calculated for the purpose of instrumentation.
namespaceScope := "cluster"
if namespaced != "" {
namespaceScope = "namespace"
}
if kubeVerb, found := toDiscoveryKubeVerb[action.Verb]; found {
if len(kubeVerb) != 0 {
kubeVerbs[kubeVerb] = struct{}{}
@ -593,7 +599,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
} else {
handler = restfulGetResource(getter, exporter, reqScope)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, handler)
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
@ -625,7 +631,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "list " + subresource + " of objects of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
if a.enableAPIResponseCompression {
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
}
@ -660,7 +666,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "replace " + subresource + " of the specified " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulUpdateResource(updater, reqScope, a.group.Typer, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulUpdateResource(updater, reqScope, a.group.Typer, admit))
route := ws.PUT(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -676,7 +682,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "partially update " + subresource + " of the specified " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor))
route := ws.PATCH(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -695,7 +701,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
} else {
handler = restfulCreateResource(creater, reqScope, a.group.Typer, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, handler)
article := getArticleForNoun(kind, " ")
doc := "create" + article + kind
if hasSubresource {
@ -717,7 +723,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "delete " + subresource + " of" + article + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -738,7 +744,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "delete collection of " + subresource + " of a " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit))
route := ws.DELETE(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -757,7 +763,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "watch changes to " + subresource + " of an object of kind " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -776,7 +782,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if hasSubresource {
doc = "watch individual changes to a list of " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
@ -794,20 +800,20 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// TODO: DEPRECATED in v1.2.
case "PROXY": // Proxy requests to a resource.
// Accept all methods as per http://issue.k8s.io/3996
routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, kind, resource, subresource, namespaced, namespaceScope, hasSubresource, action.Params, proxyHandler, operationSuffix))
case "CONNECT":
for _, method := range connecter.ConnectMethods() {
doc := "connect " + method + " requests to " + kind
if hasSubresource {
doc = "connect " + method + " requests to " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource))
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, namespaceScope, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource))
route := ws.Method(method).Path(action.Path).
To(handler).
Doc(doc).
@ -868,12 +874,17 @@ func routeFunction(handler http.Handler) restful.RouteFunction {
}
}
func buildProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, namespaced, kind, resource, subresource string, hasSubresource bool, params []*restful.Parameter, operationSuffix string) *restful.RouteBuilder {
func buildProxyRoute(ws *restful.WebService,
method, prefix, path, kind, resource, subresource, namespaced, namespaceScope string,
hasSubresource bool,
params []*restful.Parameter,
proxyHandler http.Handler,
operationSuffix string) *restful.RouteBuilder {
doc := "proxy " + method + " requests to " + kind
if hasSubresource {
doc = "proxy " + method + " requests to " + subresource + " of " + kind
}
handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, routeFunction(proxyHandler))
handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, namespaceScope, routeFunction(proxyHandler))
proxyRoute := ws.Method(method).Path(path).To(handler).
Doc(doc).
Operation("proxy" + strings.Title(method) + namespaced + kind + strings.Title(subresource) + operationSuffix).

View File

@ -18,6 +18,7 @@ package metrics
import (
"bufio"
//"fmt"
"net"
"net/http"
"regexp"
@ -26,6 +27,7 @@ import (
"time"
utilnet "k8s.io/apimachinery/pkg/util/net"
//utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"github.com/emicklei/go-restful"
"github.com/prometheus/client_golang/prometheus"
@ -44,7 +46,7 @@ var (
requestLatencies = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "apiserver_request_latencies",
Help: "Response latency distribution in microseconds for each verb, resource and client.",
Help: "Response latency distribution in microseconds for each verb, resource and subresource.",
// Use buckets ranging from 125 ms to 8 seconds.
Buckets: prometheus.ExponentialBuckets(125000, 2.0, 7),
},
@ -53,12 +55,21 @@ var (
requestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "apiserver_request_latencies_summary",
Help: "Response latency summary in microseconds for each verb and resource.",
Help: "Response latency summary in microseconds for each verb, resource and subresource.",
// Make the sliding window of 1h.
MaxAge: time.Hour,
},
[]string{"verb", "resource", "subresource"},
)
responseSizes = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "apiserver_response_sizes",
Help: "Response size distribution in bytes for each verb, resource, subresource and scope (namespace/cluster).",
// Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB).
Buckets: prometheus.ExponentialBuckets(1000, 10.0, 7),
},
[]string{"verb", "resource", "subresource", "scope"},
)
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
)
@ -67,20 +78,25 @@ func Register() {
prometheus.MustRegister(requestCounter)
prometheus.MustRegister(requestLatencies)
prometheus.MustRegister(requestLatenciesSummary)
prometheus.MustRegister(responseSizes)
}
// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
// uppercase to be backwards compatible with existing monitoring tooling.
func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) {
func Monitor(verb, resource, subresource, scope, client, contentType string, httpCode, respSize int, reqStart time.Time) {
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc()
requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed)
requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed)
// We are only interested in response sizes of read requests.
if verb == "GET" || verb == "LIST" {
responseSizes.WithLabelValues(verb, resource, subresource, scope).Observe(float64(respSize))
}
}
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) {
func MonitorRequest(request *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, reqStart time.Time) {
reportedVerb := verb
if verb == "LIST" {
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
@ -90,23 +106,25 @@ func MonitorRequest(request *http.Request, verb, resource, subresource, contentT
}
}
}
client := cleanUserAgent(utilnet.GetHTTPClient(request))
Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart)
Monitor(reportedVerb, resource, subresource, scope, client, contentType, httpCode, respSize, reqStart)
}
func Reset() {
requestCounter.Reset()
requestLatencies.Reset()
requestLatenciesSummary.Reset()
responseSizes.Reset()
}
// InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps
// the go-restful RouteFunction instead of a HandlerFunc
func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.RouteFunction) restful.RouteFunction {
func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc restful.RouteFunction) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
now := time.Now()
delegate := &responseWriterDelegator{ResponseWriter: response.ResponseWriter}
delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter}
_, cn := response.ResponseWriter.(http.CloseNotifier)
_, fl := response.ResponseWriter.(http.Flusher)
@ -121,7 +139,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R
routeFunc(request, response)
MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now)
MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), now)
})
}
@ -135,7 +153,8 @@ func cleanUserAgent(ua string) string {
return ua
}
type responseWriterDelegator struct {
// ResponseWriterDelegator interface wraps http.ResponseWriter to additionally record content-length, status-code, etc.
type ResponseWriterDelegator struct {
http.ResponseWriter
status int
@ -143,13 +162,13 @@ type responseWriterDelegator struct {
wroteHeader bool
}
func (r *responseWriterDelegator) WriteHeader(code int) {
func (r *ResponseWriterDelegator) WriteHeader(code int) {
r.status = code
r.wroteHeader = true
r.ResponseWriter.WriteHeader(code)
}
func (r *responseWriterDelegator) Write(b []byte) (int, error) {
func (r *ResponseWriterDelegator) Write(b []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
@ -158,8 +177,16 @@ func (r *responseWriterDelegator) Write(b []byte) (int, error) {
return n, err
}
func (r *ResponseWriterDelegator) Status() int {
return r.status
}
func (r *ResponseWriterDelegator) ContentLength() int {
return int(r.written)
}
type fancyResponseWriterDelegator struct {
*responseWriterDelegator
*ResponseWriterDelegator
}
func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool {

View File

@ -109,7 +109,11 @@ func WithMaxInFlightLimit(
}
}
}
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
scope := "cluster"
if requestInfo.Namespace != "" {
scope = "namespace"
}
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, errors.StatusTooManyRequests, 0, time.Now())
tooManyRequests(r, w)
}
}

View File

@ -60,7 +60,11 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
}
now := time.Now()
metricFn := func() {
metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now)
scope := "cluster"
if requestInfo.Namespace != "" {
scope = "namespace"
}
metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, http.StatusInternalServerError, 0, now)
}
return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
}