From f6a89df3fb719f4db565c7dade63575ccbdb3031 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Mon, 30 Oct 2017 15:26:51 +0100 Subject: [PATCH] Revert "audit backend run shutdown gracefully after http handler finish" This reverts commit f42686081bff88e44b339562c4927775f4439671. --- cmd/kube-apiserver/app/server.go | 5 +- pkg/kubeapiserver/server/insecure_handler.go | 12 ++- staging/src/k8s.io/apiserver/pkg/server/BUILD | 2 - .../src/k8s.io/apiserver/pkg/server/config.go | 8 -- .../k8s.io/apiserver/pkg/server/filters/BUILD | 1 - .../apiserver/pkg/server/filters/waitgroup.go | 50 ------------ .../apiserver/pkg/server/genericapiserver.go | 24 +----- .../pkg/server/genericapiserver_test.go | 77 ------------------- .../src/k8s.io/apiserver/pkg/server/serve.go | 11 +-- 9 files changed, 13 insertions(+), 177 deletions(-) delete mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 5729463671..d83b368f5d 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -156,7 +156,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if len(os.Getenv("KUBE_API_VERSIONS")) > 0 { if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { return nil, err } } @@ -186,7 +186,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc if insecureServingOptions != nil { insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig) - if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil { + if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil { return nil, err } } @@ -488,7 +488,6 @@ func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transp if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) } - return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil } diff --git a/pkg/kubeapiserver/server/insecure_handler.go b/pkg/kubeapiserver/server/insecure_handler.go index cc7c0a79de..8186a0ac7a 100644 --- a/pkg/kubeapiserver/server/insecure_handler.go +++ b/pkg/kubeapiserver/server/insecure_handler.go @@ -19,7 +19,6 @@ package server import ( "net" "net/http" - "time" "github.com/golang/glog" @@ -46,12 +45,11 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H } handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") + handler = genericfilters.WithPanicRecovery(handler) handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) - handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) - handler = genericfilters.WithPanicRecovery(handler) return handler } @@ -86,12 +84,12 @@ func (s *InsecureServingInfo) NewLoopbackClientConfig(token string) (*rest.Confi // NonBlockingRun spawns the insecure http server. An error is // returned if the ports cannot be listened on. -func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { +func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) if insecureServingInfo != nil && insecureHandler != nil { - if err := serveInsecurely(insecureServingInfo, insecureHandler, shutDownTimeout, internalStopCh); err != nil { + if err := serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh); err != nil { close(internalStopCh) return err } @@ -111,7 +109,7 @@ func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler ht // serveInsecurely run the insecure http server. It fails only if the initial listen // call fails. The actual server loop (stoppable by closing stopCh) runs in a go // routine, i.e. serveInsecurely does not block. -func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error { +func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error { insecureServer := &http.Server{ Addr: insecureServingInfo.BindAddress, Handler: insecureHandler, @@ -119,7 +117,7 @@ func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler h } glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress) var err error - _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, shutDownTimeout, stopCh) + _, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) return err } diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index fff7bbc7d9..3d56d3a266 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -30,10 +30,8 @@ go_test( "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library", - "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", - "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 378cd17e0d..e1c81c7d4a 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -27,7 +27,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/emicklei/go-restful-swagger12" @@ -129,8 +128,6 @@ type Config struct { // BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler. BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler) - // HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown. - HandlerChainWaitGroup *sync.WaitGroup // DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is // always reported DiscoveryAddresses discovery.Addresses @@ -239,7 +236,6 @@ func NewConfig(codecs serializer.CodecFactory) *Config { ReadWritePort: 443, RequestContextMapper: apirequest.NewRequestContextMapper(), BuildHandlerChainFunc: DefaultBuildHandlerChain, - HandlerChainWaitGroup: new(sync.WaitGroup), LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), DisabledPostStartHooks: sets.NewString(), HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz}, @@ -450,10 +446,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G Serializer: c.Serializer, AuditBackend: c.AuditBackend, delegationTarget: delegationTarget, - HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, - shutdownTimeout: c.RequestTimeout, SecureServingInfo: c.SecureServingInfo, ExternalAddress: c.ExternalAddress, @@ -494,7 +488,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G return nil, err } } - for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { @@ -542,7 +535,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler) handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout) - handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) handler = genericfilters.WithPanicRecovery(handler) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index af31791276..a7e94dca6d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -37,7 +37,6 @@ go_library( "longrunning.go", "maxinflight.go", "timeout.go", - "waitgroup.go", "wrap.go", ], importpath = "k8s.io/apiserver/pkg/server/filters", diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go b/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go deleted file mode 100644 index d68e3dc945..0000000000 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/waitgroup.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package filters - -import ( - "net/http" - "sync" - - apirequest "k8s.io/apiserver/pkg/endpoints/request" -) - -// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown. -func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *sync.WaitGroup) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - ctx, ok := requestContextMapper.Get(req) - if !ok { - // if this happens, the handler chain isn't setup correctly because there is no context mapper - handler.ServeHTTP(w, req) - return - } - - requestInfo, ok := apirequest.RequestInfoFrom(ctx) - if !ok { - // if this happens, the handler chain isn't setup correctly because there is no request info - handler.ServeHTTP(w, req) - return - } - - if !longRunning(req, requestInfo) { - wg.Add(1) - defer wg.Done() - } - - handler.ServeHTTP(w, req) - }) -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index e74cec8d53..6dc65a20c8 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -89,9 +89,6 @@ type GenericAPIServer struct { // minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler minRequestTimeout time.Duration - // shutdownTimeout is the timeout used for server shutdown. - shutdownTimeout time.Duration - // legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests // to InstallLegacyAPIGroup legacyAPIGroupPrefixes sets.String @@ -155,9 +152,6 @@ type GenericAPIServer struct { // delegationTarget is the next delegate in the chain or nil delegationTarget DelegationTarget - - // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown. - HandlerChainWaitGroup *sync.WaitGroup } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -287,28 +281,16 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { <-stopCh - err = s.RunPreShutdownHooks() - if err != nil { - return err - } - - // Wait for all requests to finish, which is bounded by the RequestTimeout variable. - s.HandlerChainWaitGroup.Wait() - - return nil + return s.RunPreShutdownHooks() } // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { - // Use an stop channel to allow graceful shutdown without dropping audit events - // after http server shutdown. - auditStopCh := make(chan struct{}) - // Start the audit backend before any request comes in. This means we must call Backend.Run // before http server start serving. Otherwise the Backend.ProcessEvents call might block. if s.AuditBackend != nil { - if err := s.AuditBackend.Run(auditStopCh); err != nil { + if err := s.AuditBackend.Run(stopCh); err != nil { return fmt.Errorf("failed to run the audit backend: %v", err) } } @@ -329,8 +311,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { go func() { <-stopCh close(internalStopCh) - s.HandlerChainWaitGroup.Wait() - close(auditStopCh) }() s.RunPostStartHooks(stopCh) diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index f4293fc48f..54a6540fda 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -25,8 +25,6 @@ import ( "net/http" "net/http/httptest" goruntime "runtime" - "strconv" - "sync" "testing" "time" @@ -46,11 +44,8 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/endpoints/discovery" - genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" - apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - genericfilters "k8s.io/apiserver/pkg/server/filters" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -514,75 +509,3 @@ func fakeVersion() version.Info { Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH), } } - -// TestGracefulShutdown verifies server shutdown after request handler finish. -func TestGracefulShutdown(t *testing.T) { - etcdserver, config, _ := setUp(t) - defer etcdserver.Terminate(t) - - var graceShutdown bool - wg := sync.WaitGroup{} - wg.Add(1) - - config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler { - handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup) - handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper) - handler = apirequest.WithRequestContext(handler, c.RequestContextMapper) - return handler - } - - handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - wg.Done() - time.Sleep(2 * time.Second) - w.WriteHeader(http.StatusOK) - graceShutdown = true - }) - - s, err := config.Complete(nil).New("test", EmptyDelegate) - if err != nil { - t.Fatalf("Error in bringing up the server: %v", err) - } - - s.Handler.NonGoRestfulMux.Handle("/test", handler) - - insecureServer := &http.Server{ - Addr: "0.0.0.0:0", - Handler: s.Handler, - } - stopCh := make(chan struct{}) - serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh) - if err != nil { - t.Errorf("RunServer err: %v", err) - } - - graceCh := make(chan struct{}) - // mock a client request - go func() { - resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test") - if err != nil { - t.Errorf("Unexpected http error: %v", err) - } - if resp.StatusCode != http.StatusOK { - t.Errorf("Unexpected http status code: %v", resp.StatusCode) - } - close(graceCh) - }() - - // close stopCh after request sent to server to guarantee request handler is running. - wg.Wait() - close(stopCh) - // wait for wait group handler finish - s.HandlerChainWaitGroup.Wait() - - // check server all handlers finished. - if !graceShutdown { - t.Errorf("server shutdown not gracefully.") - } - // check client to make sure receive response. - select { - case <-graceCh: - t.Logf("server shutdown gracefully.") - case <-time.After(30 * time.Second): - t.Errorf("Timed out waiting for response.") - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/serve.go b/staging/src/k8s.io/apiserver/pkg/server/serve.go index ce4a1f7279..90f4078c75 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/serve.go +++ b/staging/src/k8s.io/apiserver/pkg/server/serve.go @@ -17,7 +17,6 @@ limitations under the License. package server import ( - "context" "crypto/tls" "crypto/x509" "fmt" @@ -85,13 +84,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error { glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress) var err error - s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.shutdownTimeout, stopCh) + s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh) return err } // RunServer listens on the given port, then spawns a go-routine continuously serving // until the stopCh is closed. The port is returned. This function does not block. -func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) { +func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) { if len(server.Addr) == 0 { return 0, errors.New("address cannot be empty") } @@ -112,12 +111,10 @@ func RunServer(server *http.Server, network string, shutDownTimeout time.Duratio return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String()) } - // Shutdown server gracefully. + // Stop the server by closing the listener go func() { <-stopCh - ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) - server.Shutdown(ctx) - cancel() + ln.Close() }() go func() {