Record 429 and timeout errors to prometheus

Allows gathering of load being shed.
pull/6/head
Clayton Coleman 2017-07-07 00:26:13 -04:00
parent a9bf44101b
commit 2e33a2f0bc
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
7 changed files with 86 additions and 27 deletions

View File

@ -60,10 +60,12 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var httpCode int
reqStart := time.Now()
defer func() {
metrics.Monitor(&verb, &apiResource, &subresource,
metrics.Monitor(
verb, apiResource, subresource,
net.GetHTTPClient(req),
w.Header().Get("Content-Type"),
httpCode, reqStart)
httpCode, reqStart,
)
}()
ctx, ok := r.Mapper.Get(req)

View File

@ -69,11 +69,29 @@ func Register() {
prometheus.MustRegister(requestLatenciesSummary)
}
func Monitor(verb, resource, subresource *string, client, contentType string, httpCode int, reqStart time.Time) {
// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
// uppercase to be backwards compatible with existing monitoring tooling.
func Monitor(verb, resource, subresource, client, contentType string, httpCode int, reqStart time.Time) {
elapsed := float64((time.Since(reqStart)) / time.Microsecond)
requestCounter.WithLabelValues(*verb, *resource, *subresource, client, contentType, codeToString(httpCode)).Inc()
requestLatencies.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
requestLatenciesSummary.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed)
requestCounter.WithLabelValues(verb, resource, subresource, client, contentType, codeToString(httpCode)).Inc()
requestLatencies.WithLabelValues(verb, resource, subresource).Observe(elapsed)
requestLatenciesSummary.WithLabelValues(verb, resource, subresource).Observe(elapsed)
}
// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record
// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling.
func MonitorRequest(request *http.Request, verb, resource, subresource, contentType string, httpCode int, reqStart time.Time) {
reportedVerb := verb
if verb == "LIST" {
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
if values := request.URL.Query()["watch"]; len(values) > 0 {
if value := strings.ToLower(values[0]); value != "0" && value != "false" {
reportedVerb = "WATCH"
}
}
}
client := cleanUserAgent(utilnet.GetHTTPClient(request))
Monitor(reportedVerb, resource, subresource, client, contentType, httpCode, reqStart)
}
func Reset() {
@ -103,11 +121,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R
routeFunc(request, response)
reportedVerb := verb
if verb == "LIST" && strings.ToLower(request.QueryParameter("watch")) == "true" {
reportedVerb = "WATCH"
}
Monitor(&reportedVerb, &resource, &subresource, cleanUserAgent(utilnet.GetHTTPClient(request.Request)), rw.Header().Get("Content-Type"), delegate.status, now)
MonitorRequest(request.Request, verb, resource, subresource, rw.Header().Get("Content-Type"), delegate.status, now)
})
}

View File

@ -195,12 +195,17 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er
// if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch
if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" {
// Assumes v1.ListOptions
// Duplicates logic of Convert_Slice_string_To_bool
switch strings.ToLower(req.URL.Query().Get("watch")) {
case "false", "0", "":
// Any query value that is not 0 or false is considered true
// see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool
if values := req.URL.Query()["watch"]; len(values) > 0 {
switch strings.ToLower(values[0]) {
case "false", "0":
requestInfo.Verb = "list"
default:
requestInfo.Verb = "watch"
}
} else {
requestInfo.Verb = "list"
default:
requestInfo.Verb = "watch"
}
}
// if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection

View File

@ -48,6 +48,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
],

View File

@ -19,9 +19,12 @@ package filters
import (
"fmt"
"net/http"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
@ -94,6 +97,7 @@ func WithMaxInFlightLimit(
defer func() { <-c }()
handler.ServeHTTP(w, r)
default:
metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", errors.StatusTooManyRequests, time.Now())
tooManyRequests(r, w)
}
}

View File

@ -22,11 +22,13 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
@ -39,24 +41,28 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
if longRunning == nil {
return handler
}
timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) {
timeoutFunc := func(req *http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
// TODO unify this with apiserver.MaxInFlightLimit
ctx, ok := requestContextMapper.Get(req)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no context mapper
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout"))
}
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
}
if longRunning(req, requestInfo) {
return nil, nil
return nil, nil, nil
}
return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
now := time.Now()
metricFn := func() {
metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", http.StatusInternalServerError, now)
}
return time.After(globalTimeout), metricFn, apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0)
}
return WithTimeout(handler, timeoutFunc)
}
@ -68,18 +74,19 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
// provided. (If msg is empty, a suitable default message will be sent.) After
// the handler times out, writes by h to its http.ResponseWriter will return
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
// timeout will be enforced.
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler {
// timeout will be enforced. recordFn is a function that will be invoked whenever
// a timeout happens.
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler {
return &timeoutHandler{h, timeoutFunc}
}
type timeoutHandler struct {
handler http.Handler
timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError)
timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError)
}
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
after, err := t.timeout(r)
after, recordFn, err := t.timeout(r)
if after == nil {
t.handler.ServeHTTP(w, r)
return
@ -95,6 +102,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case <-done:
return
case <-after:
recordFn()
tw.timeout(err)
}
}

View File

@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"
@ -31,12 +32,30 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
)
type recorder struct {
lock sync.Mutex
count int
}
func (r *recorder) Record() {
r.lock.Lock()
defer r.lock.Unlock()
r.count++
}
func (r *recorder) Count() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.count
}
func TestTimeout(t *testing.T) {
sendResponse := make(chan struct{}, 1)
writeErrors := make(chan error, 1)
timeout := make(chan time.Time, 1)
resp := "test response"
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
record := &recorder{}
ts := httptest.NewServer(WithTimeout(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
@ -44,8 +63,8 @@ func TestTimeout(t *testing.T) {
_, err := w.Write([]byte(resp))
writeErrors <- err
}),
func(*http.Request) (<-chan time.Time, *apierrors.StatusError) {
return timeout, timeoutErr
func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) {
return timeout, record.Record, timeoutErr
}))
defer ts.Close()
@ -65,6 +84,9 @@ func TestTimeout(t *testing.T) {
if err := <-writeErrors; err != nil {
t.Errorf("got unexpected Write error on first request: %v", err)
}
if record.Count() != 0 {
t.Errorf("invoked record method: %#v", record)
}
// Times out
timeout <- time.Time{}
@ -83,6 +105,9 @@ func TestTimeout(t *testing.T) {
if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) {
t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status))
}
if record.Count() != 1 {
t.Errorf("did not invoke record method: %#v", record)
}
// Now try to send a response
sendResponse <- struct{}{}