apiserver: sync with https server shutdown to flush existing connections

pull/564/head
Dr. Stefan Schimanski 2019-01-16 15:58:53 +01:00
parent 8743a0e3c6
commit 5b47f99164
7 changed files with 34 additions and 19 deletions

View File

@ -149,7 +149,8 @@ func Run(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}) error
if c.SecureServing != nil { if c.SecureServing != nil {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { // TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }
} }

View File

@ -170,7 +170,8 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
if c.SecureServing != nil { if c.SecureServing != nil {
unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...) unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication) handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil { // TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err return err
} }
} }

View File

@ -214,7 +214,8 @@ func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
} }
if cc.SecureServing != nil { if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
if err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil { // TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above // fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start healthz server: %v", err) return fmt.Errorf("failed to start healthz server: %v", err)
} }

View File

@ -50,7 +50,9 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime
} else { } else {
klog.Infof("Serving insecurely on %s", s.Listener.Addr()) klog.Infof("Serving insecurely on %s", s.Listener.Addr())
} }
return RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh) _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
// NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here
return err
} }
func (s *DeprecatedInsecureServingInfo) NewLoopbackClientConfig() (*rest.Config, error) { func (s *DeprecatedInsecureServingInfo) NewLoopbackClientConfig() (*rest.Config, error) {

View File

@ -296,9 +296,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
// Use an internal stop channel to allow cleanup of the listeners on error. // Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{}) internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil { if s.SecureServingInfo != nil && s.Handler != nil {
if err := s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh); err != nil { var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh) close(internalStopCh)
return err return err
} }
@ -310,6 +312,9 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
go func() { go func() {
<-stopCh <-stopCh
close(internalStopCh) close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait() s.HandlerChainWaitGroup.Wait()
close(auditStopCh) close(auditStopCh)
}() }()

View File

@ -548,9 +548,9 @@ func TestGracefulShutdown(t *testing.T) {
// get port // get port
serverPort := ln.Addr().(*net.TCPAddr).Port serverPort := ln.Addr().(*net.TCPAddr).Port
err = RunServer(insecureServer, ln, 10*time.Second, stopCh) stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
if err != nil { if err != nil {
t.Errorf("RunServer err: %v", err) t.Fatalf("RunServer err: %v", err)
} }
graceCh := make(chan struct{}) graceCh := make(chan struct{})
@ -577,6 +577,7 @@ func TestGracefulShutdown(t *testing.T) {
// wait for wait group handler finish // wait for wait group handler finish
s.HandlerChainWaitGroup.Wait() s.HandlerChainWaitGroup.Wait()
<-stoppedCh
// check server all handlers finished. // check server all handlers finished.
if !graceShutdown { if !graceShutdown {

View File

@ -37,12 +37,12 @@ const (
defaultKeepAlivePeriod = 3 * time.Minute defaultKeepAlivePeriod = 3 * time.Minute
) )
// serveSecurely runs the secure http server. It fails only if certificates cannot // Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// be loaded or the initial listen call fails. The actual server loop (stoppable by closing // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block.
// stopCh) runs in a go routine, i.e. serveSecurely does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error { func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
if s.Listener == nil { if s.Listener == nil {
return fmt.Errorf("listener must not be nil") return nil, fmt.Errorf("listener must not be nil")
} }
secureServer := &http.Server{ secureServer := &http.Server{
@ -110,7 +110,7 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur
// apply settings to the server // apply settings to the server
if err := http2.ConfigureServer(secureServer, http2Options); err != nil { if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
return fmt.Errorf("error configuring http2: %v", err) return nil, fmt.Errorf("error configuring http2: %v", err)
} }
klog.Infof("Serving securely on %s", secureServer.Addr) klog.Infof("Serving securely on %s", secureServer.Addr)
@ -118,21 +118,25 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur
} }
// RunServer listens on the given port if listener is not given, // RunServer listens on the given port if listener is not given,
// then spawns a go-routine continuously serving // then spawns a go-routine continuously serving until the stopCh is closed.
// until the stopCh is closed. This function does not block. // It returns a stoppedCh that is closed when all non-hijacked active requests
// have been processed.
// This function does not block
// TODO: make private when insecure serving is gone from the kube-apiserver // TODO: make private when insecure serving is gone from the kube-apiserver
func RunServer( func RunServer(
server *http.Server, server *http.Server,
ln net.Listener, ln net.Listener,
shutDownTimeout time.Duration, shutDownTimeout time.Duration,
stopCh <-chan struct{}, stopCh <-chan struct{},
) error { ) (<-chan struct{}, error) {
if ln == nil { if ln == nil {
return fmt.Errorf("listener must not be nil") return nil, fmt.Errorf("listener must not be nil")
} }
// Shutdown server gracefully. // Shutdown server gracefully.
stoppedCh := make(chan struct{})
go func() { go func() {
defer close(stoppedCh)
<-stopCh <-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout) ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx) server.Shutdown(ctx)
@ -159,7 +163,7 @@ func RunServer(
} }
}() }()
return nil return stoppedCh, nil
} }
type NamedTLSCert struct { type NamedTLSCert struct {