Merge pull request #61048 from sttts/sttts-cancel-context

Automatic merge from submit-queue (batch tested with PRs 61400, 61048). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

apiserver: cancel context on timeout in WithTimeoutForNonLongRunningRequests

Requests that block time out after 60sec, but keep the handler body running. This can lead to exhaustion of clients or other leaks. This PR adds a cancel func to the context of the request and calls it on timeout.

Note: we still do our own timeout handling as we don't trust the context to really cancel every blocking call we do.

This might explain why we see so many handler backtraces like https://gist.github.com/sttts/0ce972dc8a7911e4ca9eea7bf1ded5fa when an etcd node goes down with a hard poweroff. But it does not explain why we see oc to block for 15 minutes.
pull/8/head
Kubernetes Submit Queue 2018-04-06 14:47:08 -07:00 committed by GitHub
commit 8d28c5102b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 6 deletions

View File

@ -18,6 +18,7 @@ package filters
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
@ -54,14 +55,23 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
if longRunning(req, requestInfo) {
return nil, nil, nil
}
metricFn := func() {
ctx, cancel := context.WithCancel(ctx)
if err := requestContextMapper.Update(req, ctx); err != nil {
return time.After(timeout), func() {}, apierrors.NewInternalError(fmt.Errorf("failed to update context during timeout"))
}
postTimeoutFn := func() {
cancel()
metrics.Record(req, requestInfo, "", http.StatusGatewayTimeout, 0, 0)
}
return time.After(timeout), metricFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0)
return time.After(timeout), postTimeoutFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0)
}
return WithTimeout(handler, timeoutFunc)
}
type timeoutFunc = func(*http.Request) (timeout <-chan time.Time, postTimeoutFunc func(), err *apierrors.StatusError)
// WithTimeout returns an http.Handler that runs h with a timeout
// determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle
// each request, but if a call runs for longer than its time limit, the
@ -71,17 +81,17 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
// timeout will be enforced. recordFn is a function that will be invoked whenever
// a timeout happens.
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler {
func WithTimeout(h http.Handler, timeoutFunc timeoutFunc) http.Handler {
return &timeoutHandler{h, timeoutFunc}
}
type timeoutHandler struct {
handler http.Handler
timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError)
timeout timeoutFunc
}
func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
after, recordFn, err := t.timeout(r)
after, postTimeoutFn, err := t.timeout(r)
if after == nil {
t.handler.ServeHTTP(w, r)
return
@ -97,7 +107,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case <-done:
return
case <-after:
recordFn()
postTimeoutFn()
tw.timeout(err)
}
}