diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go index d856440a83..33f69b2a26 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go @@ -60,10 +60,12 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { var httpCode int reqStart := time.Now() defer func() { - metrics.Monitor(&verb, &apiResource, &subresource, + metrics.Monitor( + verb, apiResource, subresource, net.GetHTTPClient(req), w.Header().Get("Content-Type"), - httpCode, reqStart) + httpCode, reqStart, + ) }() ctx, ok := r.Mapper.Get(req) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index 2b5a3e0fd4..d26d94d24d 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -69,11 +69,29 @@ func Register() { prometheus.MustRegister(requestLatenciesSummary) } -func Monitor(verb, resource, subresource *string, client, contentType string, httpCode int, reqStart time.Time) { +// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be +// uppercase to be backwards compatible with existing monitoring tooling. +func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) { elapsed := float64((time.Since(reqStart)) / time.Microsecond) - requestCounter.WithLabelValues(*verb, *resource, *subresource, client, contentType, codeToString(httpCode)).Inc() - requestLatencies.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed) - requestLatenciesSummary.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed) + requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc() + requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed) + requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed) +} + +// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record +// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. +func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) { + reportedVerb := verb + if verb == "LIST" { + // see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool + if values := request.URL.Query()["watch"]; len(values) > 0 { + if value := strings.ToLower(values[0]); value != "0" && value != "false" { + reportedVerb = "WATCH" + } + } + } + client := cleanUserAgent(utilnet.GetHTTPClient(request)) + Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart) } func Reset() { @@ -103,11 +121,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R routeFunc(request, response) - reportedVerb := verb - if verb == "LIST" && strings.ToLower(request.QueryParameter("watch")) == "true" { - reportedVerb = "WATCH" - } - Monitor(&reportedVerb, &resource, &subresource, cleanUserAgent(utilnet.GetHTTPClient(request.Request)), rw.Header().Get("Content-Type"), delegate.status, now) + MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go index 2d19c8176f..a10e3df326 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go @@ -195,12 +195,17 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er // if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" { // Assumes v1.ListOptions - // Duplicates logic of Convert_Slice_string_To_bool - switch strings.ToLower(req.URL.Query().Get("watch")) { - case "false", "0", "": + // Any query value that is not 0 or false is considered true + // see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool + if values := req.URL.Query()["watch"]; len(values) > 0 { + switch strings.ToLower(values[0]) { + case "false", "0": + requestInfo.Verb = "list" + default: + requestInfo.Verb = "watch" + } + } else { requestInfo.Verb = "list" - default: - requestInfo.Verb = "watch" } } // if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 6efce820f4..77e148ed7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -48,6 +48,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library", ], diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index 0100e05f8f..6a1f4da078 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -19,9 +19,12 @@ package filters import ( "fmt" "net/http" + "strings" + "time" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/httplog" @@ -94,6 +97,7 @@ func WithMaxInFlightLimit( defer func() { <-c }() handler.ServeHTTP(w, r) default: + metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now()) tooManyRequests(r, w) } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go index 19894b470b..341670bc9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go @@ -22,11 +22,13 @@ import ( "fmt" "net" "net/http" + "strings" "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" ) @@ -39,24 +41,28 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa if longRunning == nil { return handler } - timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) { + timeoutFunc := func(req *http.Request) (<-chan time.Time, func(), *apierrors.StatusError) { // TODO unify this with apiserver.MaxInFlightLimit ctx, ok := requestContextMapper.Get(req) if !ok { // if this happens, the handler chain isn't setup correctly because there is no context mapper - return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout")) + return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout")) } requestInfo, ok := apirequest.RequestInfoFrom(ctx) if !ok { // if this happens, the handler chain isn't setup correctly because there is no request info - return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout")) + return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout")) } if longRunning(req, requestInfo) { - return nil, nil + return nil, nil, nil } - return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0) + now := time.Now() + metricFn := func() { + metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now) + } + return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0) } return WithTimeout(handler, timeoutFunc) } @@ -68,18 +74,19 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa // provided. (If msg is empty, a suitable default message will be sent.) After // the handler times out, writes by h to its http.ResponseWriter will return // http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no -// timeout will be enforced. -func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler { +// timeout will be enforced. recordFn is a function that will be invoked whenever +// a timeout happens. +func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler { return &timeoutHandler{h, timeoutFunc} } type timeoutHandler struct { handler http.Handler - timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError) + timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) } func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - after, err := t.timeout(r) + after, recordFn, err := t.timeout(r) if after == nil { t.handler.ServeHTTP(w, r) return @@ -95,6 +102,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case <-done: return case <-after: + recordFn() tw.timeout(err) } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go index 991427231a..6283a6cc21 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" "time" @@ -31,12 +32,30 @@ import ( "k8s.io/apimachinery/pkg/util/diff" ) +type recorder struct { + lock sync.Mutex + count int +} + +func (r *recorder) Record() { + r.lock.Lock() + defer r.lock.Unlock() + r.count++ +} + +func (r *recorder) Count() int { + r.lock.Lock() + defer r.lock.Unlock() + return r.count +} + func TestTimeout(t *testing.T) { sendResponse := make(chan struct{}, 1) writeErrors := make(chan error, 1) timeout := make(chan time.Time, 1) resp := "test response" timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0) + record := &recorder{} ts := httptest.NewServer(WithTimeout(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { @@ -44,8 +63,8 @@ func TestTimeout(t *testing.T) { _, err := w.Write([]byte(resp)) writeErrors <- err }), - func(*http.Request) (<-chan time.Time, *apierrors.StatusError) { - return timeout, timeoutErr + func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) { + return timeout, record.Record, timeoutErr })) defer ts.Close() @@ -65,6 +84,9 @@ func TestTimeout(t *testing.T) { if err := <-writeErrors; err != nil { t.Errorf("got unexpected Write error on first request: %v", err) } + if record.Count() != 0 { + t.Errorf("invoked record method: %#v", record) + } // Times out timeout <- time.Time{} @@ -83,6 +105,9 @@ func TestTimeout(t *testing.T) { if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) { t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status)) } + if record.Count() != 1 { + t.Errorf("did not invoke record method: %#v", record) + } // Now try to send a response sendResponse <- struct{}{}