mirror of https://github.com/k3s-io/k3s
Merge pull request #54795 from sttts/sttts-audit-shutdown-sync-revert
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Revert: Shutdown http handlers before shutting down audit backend Fixes https://github.com/kubernetes/kubernetes/issues/54793pull/6/head
commit
3096a32568
|
@ -156,7 +156,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
|
||||||
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
|
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
|
||||||
if insecureServingOptions != nil {
|
if insecureServingOptions != nil {
|
||||||
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
||||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
|
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
|
||||||
|
|
||||||
if insecureServingOptions != nil {
|
if insecureServingOptions != nil {
|
||||||
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
|
||||||
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
|
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -488,7 +488,6 @@ func BuildGenericConfig(s *options.ServerRunOptions, proxyTransport *http.Transp
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
|
return nil, nil, nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil
|
return genericConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package server
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
|
@ -46,12 +45,11 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
|
||||||
}
|
}
|
||||||
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil)
|
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, insecureSuperuser{}, nil)
|
||||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||||
|
handler = genericfilters.WithPanicRecovery(handler)
|
||||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
||||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
||||||
handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
|
||||||
handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper)
|
handler = genericapifilters.WithRequestInfo(handler, server.NewRequestInfoResolver(c), c.RequestContextMapper)
|
||||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||||
handler = genericfilters.WithPanicRecovery(handler)
|
|
||||||
|
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
|
@ -86,12 +84,12 @@ func (s *InsecureServingInfo) NewLoopbackClientConfig(token string) (*rest.Confi
|
||||||
|
|
||||||
// NonBlockingRun spawns the insecure http server. An error is
|
// NonBlockingRun spawns the insecure http server. An error is
|
||||||
// returned if the ports cannot be listened on.
|
// returned if the ports cannot be listened on.
|
||||||
func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error {
|
func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error {
|
||||||
// Use an internal stop channel to allow cleanup of the listeners on error.
|
// Use an internal stop channel to allow cleanup of the listeners on error.
|
||||||
internalStopCh := make(chan struct{})
|
internalStopCh := make(chan struct{})
|
||||||
|
|
||||||
if insecureServingInfo != nil && insecureHandler != nil {
|
if insecureServingInfo != nil && insecureHandler != nil {
|
||||||
if err := serveInsecurely(insecureServingInfo, insecureHandler, shutDownTimeout, internalStopCh); err != nil {
|
if err := serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh); err != nil {
|
||||||
close(internalStopCh)
|
close(internalStopCh)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -111,7 +109,7 @@ func NonBlockingRun(insecureServingInfo *InsecureServingInfo, insecureHandler ht
|
||||||
// serveInsecurely run the insecure http server. It fails only if the initial listen
|
// serveInsecurely run the insecure http server. It fails only if the initial listen
|
||||||
// call fails. The actual server loop (stoppable by closing stopCh) runs in a go
|
// call fails. The actual server loop (stoppable by closing stopCh) runs in a go
|
||||||
// routine, i.e. serveInsecurely does not block.
|
// routine, i.e. serveInsecurely does not block.
|
||||||
func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, shutDownTimeout time.Duration, stopCh <-chan struct{}) error {
|
func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler http.Handler, stopCh <-chan struct{}) error {
|
||||||
insecureServer := &http.Server{
|
insecureServer := &http.Server{
|
||||||
Addr: insecureServingInfo.BindAddress,
|
Addr: insecureServingInfo.BindAddress,
|
||||||
Handler: insecureHandler,
|
Handler: insecureHandler,
|
||||||
|
@ -119,7 +117,7 @@ func serveInsecurely(insecureServingInfo *InsecureServingInfo, insecureHandler h
|
||||||
}
|
}
|
||||||
glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress)
|
glog.Infof("Serving insecurely on %s", insecureServingInfo.BindAddress)
|
||||||
var err error
|
var err error
|
||||||
_, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, shutDownTimeout, stopCh)
|
_, err = server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,10 +30,8 @@ go_test(
|
||||||
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
|
|
||||||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||||
|
|
|
@ -27,7 +27,6 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/emicklei/go-restful-swagger12"
|
"github.com/emicklei/go-restful-swagger12"
|
||||||
|
@ -129,8 +128,6 @@ type Config struct {
|
||||||
|
|
||||||
// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
|
// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
|
||||||
BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
|
BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
|
||||||
// HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown.
|
|
||||||
HandlerChainWaitGroup *sync.WaitGroup
|
|
||||||
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
|
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
|
||||||
// always reported
|
// always reported
|
||||||
DiscoveryAddresses discovery.Addresses
|
DiscoveryAddresses discovery.Addresses
|
||||||
|
@ -239,7 +236,6 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
||||||
ReadWritePort: 443,
|
ReadWritePort: 443,
|
||||||
RequestContextMapper: apirequest.NewRequestContextMapper(),
|
RequestContextMapper: apirequest.NewRequestContextMapper(),
|
||||||
BuildHandlerChainFunc: DefaultBuildHandlerChain,
|
BuildHandlerChainFunc: DefaultBuildHandlerChain,
|
||||||
HandlerChainWaitGroup: new(sync.WaitGroup),
|
|
||||||
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
||||||
DisabledPostStartHooks: sets.NewString(),
|
DisabledPostStartHooks: sets.NewString(),
|
||||||
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
||||||
|
@ -450,10 +446,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||||
Serializer: c.Serializer,
|
Serializer: c.Serializer,
|
||||||
AuditBackend: c.AuditBackend,
|
AuditBackend: c.AuditBackend,
|
||||||
delegationTarget: delegationTarget,
|
delegationTarget: delegationTarget,
|
||||||
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
|
|
||||||
|
|
||||||
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
|
||||||
shutdownTimeout: c.RequestTimeout,
|
|
||||||
|
|
||||||
SecureServingInfo: c.SecureServingInfo,
|
SecureServingInfo: c.SecureServingInfo,
|
||||||
ExternalAddress: c.ExternalAddress,
|
ExternalAddress: c.ExternalAddress,
|
||||||
|
@ -494,7 +488,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
for _, delegateCheck := range delegationTarget.HealthzChecks() {
|
||||||
skip := false
|
skip := false
|
||||||
for _, existingCheck := range c.HealthzChecks {
|
for _, existingCheck := range c.HealthzChecks {
|
||||||
|
@ -542,7 +535,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
|
||||||
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
|
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
|
||||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
|
||||||
handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
|
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
|
||||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||||
handler = genericfilters.WithPanicRecovery(handler)
|
handler = genericfilters.WithPanicRecovery(handler)
|
||||||
|
|
|
@ -37,7 +37,6 @@ go_library(
|
||||||
"longrunning.go",
|
"longrunning.go",
|
||||||
"maxinflight.go",
|
"maxinflight.go",
|
||||||
"timeout.go",
|
"timeout.go",
|
||||||
"waitgroup.go",
|
|
||||||
"wrap.go",
|
"wrap.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/apiserver/pkg/server/filters",
|
importpath = "k8s.io/apiserver/pkg/server/filters",
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package filters
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
)
|
|
||||||
|
|
||||||
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
|
|
||||||
func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *sync.WaitGroup) http.Handler {
|
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
||||||
ctx, ok := requestContextMapper.Get(req)
|
|
||||||
if !ok {
|
|
||||||
// if this happens, the handler chain isn't setup correctly because there is no context mapper
|
|
||||||
handler.ServeHTTP(w, req)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
|
|
||||||
if !ok {
|
|
||||||
// if this happens, the handler chain isn't setup correctly because there is no request info
|
|
||||||
handler.ServeHTTP(w, req)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !longRunning(req, requestInfo) {
|
|
||||||
wg.Add(1)
|
|
||||||
defer wg.Done()
|
|
||||||
}
|
|
||||||
|
|
||||||
handler.ServeHTTP(w, req)
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -89,9 +89,6 @@ type GenericAPIServer struct {
|
||||||
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
|
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
|
||||||
minRequestTimeout time.Duration
|
minRequestTimeout time.Duration
|
||||||
|
|
||||||
// shutdownTimeout is the timeout used for server shutdown.
|
|
||||||
shutdownTimeout time.Duration
|
|
||||||
|
|
||||||
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
|
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
|
||||||
// to InstallLegacyAPIGroup
|
// to InstallLegacyAPIGroup
|
||||||
legacyAPIGroupPrefixes sets.String
|
legacyAPIGroupPrefixes sets.String
|
||||||
|
@ -155,9 +152,6 @@ type GenericAPIServer struct {
|
||||||
|
|
||||||
// delegationTarget is the next delegate in the chain or nil
|
// delegationTarget is the next delegate in the chain or nil
|
||||||
delegationTarget DelegationTarget
|
delegationTarget DelegationTarget
|
||||||
|
|
||||||
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
|
|
||||||
HandlerChainWaitGroup *sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
|
||||||
|
@ -287,28 +281,16 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
|
||||||
|
|
||||||
<-stopCh
|
<-stopCh
|
||||||
|
|
||||||
err = s.RunPreShutdownHooks()
|
return s.RunPreShutdownHooks()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all requests to finish, which is bounded by the RequestTimeout variable.
|
|
||||||
s.HandlerChainWaitGroup.Wait()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NonBlockingRun spawns the secure http server. An error is
|
// NonBlockingRun spawns the secure http server. An error is
|
||||||
// returned if the secure port cannot be listened on.
|
// returned if the secure port cannot be listened on.
|
||||||
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
|
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
|
||||||
// Use an stop channel to allow graceful shutdown without dropping audit events
|
|
||||||
// after http server shutdown.
|
|
||||||
auditStopCh := make(chan struct{})
|
|
||||||
|
|
||||||
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
// Start the audit backend before any request comes in. This means we must call Backend.Run
|
||||||
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
|
||||||
if s.AuditBackend != nil {
|
if s.AuditBackend != nil {
|
||||||
if err := s.AuditBackend.Run(auditStopCh); err != nil {
|
if err := s.AuditBackend.Run(stopCh); err != nil {
|
||||||
return fmt.Errorf("failed to run the audit backend: %v", err)
|
return fmt.Errorf("failed to run the audit backend: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -329,8 +311,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
|
||||||
go func() {
|
go func() {
|
||||||
<-stopCh
|
<-stopCh
|
||||||
close(internalStopCh)
|
close(internalStopCh)
|
||||||
s.HandlerChainWaitGroup.Wait()
|
|
||||||
close(auditStopCh)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s.RunPostStartHooks(stopCh)
|
s.RunPostStartHooks(stopCh)
|
||||||
|
|
|
@ -25,8 +25,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -46,11 +44,8 @@ import (
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||||
"k8s.io/apiserver/pkg/endpoints/discovery"
|
"k8s.io/apiserver/pkg/endpoints/discovery"
|
||||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
|
||||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||||
"k8s.io/apiserver/pkg/registry/rest"
|
"k8s.io/apiserver/pkg/registry/rest"
|
||||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
|
||||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
@ -514,75 +509,3 @@ func fakeVersion() version.Info {
|
||||||
Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
|
Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGracefulShutdown verifies server shutdown after request handler finish.
|
|
||||||
func TestGracefulShutdown(t *testing.T) {
|
|
||||||
etcdserver, config, _ := setUp(t)
|
|
||||||
defer etcdserver.Terminate(t)
|
|
||||||
|
|
||||||
var graceShutdown bool
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
|
|
||||||
handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
|
|
||||||
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
|
|
||||||
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
|
|
||||||
return handler
|
|
||||||
}
|
|
||||||
|
|
||||||
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
||||||
wg.Done()
|
|
||||||
time.Sleep(2 * time.Second)
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
graceShutdown = true
|
|
||||||
})
|
|
||||||
|
|
||||||
s, err := config.Complete(nil).New("test", EmptyDelegate)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Error in bringing up the server: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Handler.NonGoRestfulMux.Handle("/test", handler)
|
|
||||||
|
|
||||||
insecureServer := &http.Server{
|
|
||||||
Addr: "0.0.0.0:0",
|
|
||||||
Handler: s.Handler,
|
|
||||||
}
|
|
||||||
stopCh := make(chan struct{})
|
|
||||||
serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("RunServer err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
graceCh := make(chan struct{})
|
|
||||||
// mock a client request
|
|
||||||
go func() {
|
|
||||||
resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Unexpected http error: %v", err)
|
|
||||||
}
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
t.Errorf("Unexpected http status code: %v", resp.StatusCode)
|
|
||||||
}
|
|
||||||
close(graceCh)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// close stopCh after request sent to server to guarantee request handler is running.
|
|
||||||
wg.Wait()
|
|
||||||
close(stopCh)
|
|
||||||
// wait for wait group handler finish
|
|
||||||
s.HandlerChainWaitGroup.Wait()
|
|
||||||
|
|
||||||
// check server all handlers finished.
|
|
||||||
if !graceShutdown {
|
|
||||||
t.Errorf("server shutdown not gracefully.")
|
|
||||||
}
|
|
||||||
// check client to make sure receive response.
|
|
||||||
select {
|
|
||||||
case <-graceCh:
|
|
||||||
t.Logf("server shutdown gracefully.")
|
|
||||||
case <-time.After(30 * time.Second):
|
|
||||||
t.Errorf("Timed out waiting for response.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -85,13 +84,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
|
||||||
|
|
||||||
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
|
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
|
||||||
var err error
|
var err error
|
||||||
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.shutdownTimeout, stopCh)
|
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunServer listens on the given port, then spawns a go-routine continuously serving
|
// RunServer listens on the given port, then spawns a go-routine continuously serving
|
||||||
// until the stopCh is closed. The port is returned. This function does not block.
|
// until the stopCh is closed. The port is returned. This function does not block.
|
||||||
func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) {
|
func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
|
||||||
if len(server.Addr) == 0 {
|
if len(server.Addr) == 0 {
|
||||||
return 0, errors.New("address cannot be empty")
|
return 0, errors.New("address cannot be empty")
|
||||||
}
|
}
|
||||||
|
@ -112,12 +111,10 @@ func RunServer(server *http.Server, network string, shutDownTimeout time.Duratio
|
||||||
return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
|
return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown server gracefully.
|
// Stop the server by closing the listener
|
||||||
go func() {
|
go func() {
|
||||||
<-stopCh
|
<-stopCh
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
|
ln.Close()
|
||||||
server.Shutdown(ctx)
|
|
||||||
cancel()
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
|
Loading…
Reference in New Issue