From b28966b48ae8e3e10427e0347f33a36053884e0a Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 10 Mar 2017 13:20:07 -0500 Subject: [PATCH] rewire aggregation handling chain to be normal --- .../src/k8s.io/apiserver/pkg/server/config.go | 11 +- .../apiserver/pkg/server/genericapiserver.go | 24 ++++ .../pkg/apiserver/apiserver.go | 120 ++++-------------- .../kube-aggregator/pkg/cmd/server/start.go | 2 +- vendor/BUILD | 2 - 5 files changed, 63 insertions(+), 96 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 63dc5f04ab..7f023cf654 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -91,6 +91,7 @@ type Config struct { EnableSwaggerUI bool EnableIndex bool EnableProfiling bool + EnableDiscovery bool // Requires generic profiling enabled EnableContentionProfiling bool EnableMetrics bool @@ -203,6 +204,7 @@ func NewConfig() *Config { LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix), HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz}, EnableIndex: true, + EnableDiscovery: true, EnableProfiling: true, MaxRequestsInFlight: 400, MaxMutatingRequestsInFlight: 200, @@ -345,6 +347,10 @@ func (c *Config) Complete() completedConfig { c.FallThroughHandler = mux.NewPathRecorderMux() } + if c.FallThroughHandler == nil { + c.FallThroughHandler = mux.NewPathRecorderMux() + } + return completedConfig{c} } @@ -535,7 +541,10 @@ func installAPI(s *GenericAPIServer, c *Config, delegate http.Handler) { } } routes.Version{Version: c.Version}.Install(s.HandlerContainer) - s.HandlerContainer.Add(s.DynamicApisDiscovery()) + + if c.EnableDiscovery { + s.HandlerContainer.Add(s.DynamicApisDiscovery()) + } } func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index a34223a7c9..f7323d29a7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -189,6 +189,30 @@ func (s *GenericAPIServer) ListedPaths() []string { return s.listedPathProvider.ListedPaths() } +var EmptyDelegate = emptyDelegate{ + requestContextMapper: apirequest.NewRequestContextMapper(), +} + +type emptyDelegate struct { + requestContextMapper apirequest.RequestContextMapper +} + +func (s emptyDelegate) UnprotectedHandler() http.Handler { + return http.NotFoundHandler() +} +func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry { + return map[string]postStartHookEntry{} +} +func (s emptyDelegate) HealthzChecks() []healthz.HealthzChecker { + return []healthz.HealthzChecker{} +} +func (s emptyDelegate) ListedPaths() []string { + return []string{} +} +func (s emptyDelegate) RequestContextMapper() apirequest.RequestContextMapper { + return s.requestContextMapper +} + func init() { // Send correct mime type for .svg files. // TODO: remove when https://github.com/golang/go/commit/21e47d831bafb59f22b1ea8098f709677ec8ce33 diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 20621c8482..4dcdda7197 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -18,7 +18,6 @@ package apiserver import ( "net/http" - "os" "time" "k8s.io/apimachinery/pkg/apimachinery/announced" @@ -28,11 +27,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" - genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" - genericfilters "k8s.io/apiserver/pkg/server/filters" kubeinformers "k8s.io/client-go/informers" kubeclientset "k8s.io/client-go/kubernetes" v1listers "k8s.io/client-go/listers/core/v1" @@ -87,9 +84,9 @@ type Config struct { type APIAggregator struct { GenericAPIServer *genericapiserver.GenericAPIServer - contextMapper genericapirequest.RequestContextMapper + delegateHandler http.Handler - handlerChainConfig *handlerChainConfig + contextMapper genericapirequest.RequestContextMapper // proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use // this to confirm the proxy's identity @@ -109,9 +106,6 @@ type APIAggregator struct { serviceLister v1listers.ServiceLister // endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group endpointsLister v1listers.EndpointsLister - - // proxyMux intercepts requests that need to be proxied to backing API servers - proxyMux *http.ServeMux } type completedConfig struct { @@ -120,6 +114,8 @@ type completedConfig struct { // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. func (c *Config) Complete() completedConfig { + // generic discovery must be disabled so that we can register our own discovery handler + c.GenericConfig.EnableDiscovery = false c.GenericConfig.Complete() version := version.Get() @@ -134,41 +130,29 @@ func (c *Config) SkipComplete() completedConfig { } // New returns a new instance of APIAggregator from the given config. -func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) { +func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, stopCh <-chan struct{}) (*APIAggregator, error) { informerFactory := informers.NewSharedInformerFactory( internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig), 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. ) kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute) - proxyMux := http.NewServeMux() - - handlerChainConfig := &handlerChainConfig{ - informers: informerFactory, - proxyMux: proxyMux, - serviceLister: kubeInformers.Core().V1().Services().Lister(), - endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(), - } - // most API servers don't need to do this, but we need a custom handler chain to handle the special /apis handling here - c.Config.GenericConfig.BuildHandlerChainsFunc = handlerChainConfig.handlerChain - - genericServer, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time + genericServer, err := c.Config.GenericConfig.SkipComplete().NewWithDelegate(delegationTarget) // completion is done in Complete, no need for a second time if err != nil { return nil, err } s := &APIAggregator{ - GenericAPIServer: genericServer, - contextMapper: c.GenericConfig.RequestContextMapper, - handlerChainConfig: handlerChainConfig, - proxyClientCert: c.ProxyClientCert, - proxyClientKey: c.ProxyClientKey, - proxyHandlers: map[string]*proxyHandler{}, - handledGroups: sets.String{}, - lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), - serviceLister: kubeInformers.Core().V1().Services().Lister(), - endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(), - proxyMux: proxyMux, + GenericAPIServer: genericServer, + delegateHandler: delegationTarget.UnprotectedHandler(), + contextMapper: c.GenericConfig.RequestContextMapper, + proxyClientCert: c.ProxyClientCert, + proxyClientKey: c.ProxyClientKey, + proxyHandlers: map[string]*proxyHandler{}, + handledGroups: sets.String{}, + lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(), + serviceLister: kubeInformers.Core().V1().Services().Lister(), + endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(), } apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs) @@ -181,6 +165,15 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) { return nil, err } + apisHandler := &apisHandler{ + codecs: Codecs, + lister: s.lister, + delegate: s.GenericAPIServer.FallThroughHandler, + serviceLister: s.serviceLister, + endpointsLister: s.endpointsLister, + } + s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler) + apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { @@ -196,49 +189,6 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) { return s, nil } -// handlerChainConfig is the config used to build the custom handler chain for this api server -type handlerChainConfig struct { - informers informers.SharedInformerFactory - proxyMux *http.ServeMux - serviceLister v1listers.ServiceLister - endpointsLister v1listers.EndpointsLister - - // fallThroughHandler keeps track of the handler that comes after the proxy so that it can be used - // to satisfy delegation cases. It is set as a side-effect while building the handler chain. - // TODO refactor to run the proxy *after* authorization so we can simply run at the end of the handling chain - fallThroughHandler http.Handler -} - -// handlerChain is a method to build the handler chain for this API server. We need a custom handler chain so that we -// can have custom handling for `/apis`, since we're hosting aggregation differently from anyone else and we're hosting -// the endpoints differently, since we're proxying all groups except for apiregistration.k8s.io. -func (h *handlerChainConfig) handlerChain(apiHandler http.Handler, c *genericapiserver.Config) (secure, insecure http.Handler) { - // add this as a filter so that we never collide with "already registered" failures on `/apis` - handler := WithAPIs(apiHandler, Codecs, h.informers.Apiregistration().InternalVersion().APIServices(), h.serviceLister, h.endpointsLister) - - handler = genericapifilters.WithAuthorization(handler, c.RequestContextMapper, c.Authorizer) - h.fallThroughHandler = handler - - // this mux is NOT protected by authorization, but DOES have authentication information - // this is so that everyone can hit the proxy and we can properly identify the user. The backing - // API server will deal with authorization - handler = WithProxyMux(handler, h.proxyMux) - - handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer) - // audit to stdout to help with debugging as we get this started - handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, os.Stdout) - handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.SupportsBasicAuth)) - - handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") - handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper) - handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc) - handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc) - handler = genericapifilters.WithRequestInfo(handler, genericapiserver.NewRequestInfoResolver(c), c.RequestContextMapper) - handler = genericapirequest.WithRequestContext(handler, c.RequestContextMapper) - - return handler, nil -} - // AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please. // It's a slow moving API, so its ok to run the controller on a single thread func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { @@ -258,14 +208,14 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) { // register the proxy handler proxyHandler := &proxyHandler{ contextMapper: s.contextMapper, - localDelegate: s.handlerChainConfig.fallThroughHandler, + localDelegate: s.delegateHandler, proxyClientCert: s.proxyClientCert, proxyClientKey: s.proxyClientKey, } proxyHandler.updateAPIService(apiService) s.proxyHandlers[apiService.Name] = proxyHandler - s.proxyMux.Handle(proxyPath, proxyHandler) - s.proxyMux.Handle(proxyPath+"/", proxyHandler) + s.GenericAPIServer.FallThroughHandler.Handle(proxyPath, proxyHandler) + s.GenericAPIServer.FallThroughHandler.Handle(proxyPath+"/", proxyHandler) // if we're dealing with the legacy group, we're done here if apiService.Name == legacyAPIServiceName { @@ -301,17 +251,3 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) { } proxyHandler.removeAPIService() } - -func WithProxyMux(handler http.Handler, mux *http.ServeMux) http.Handler { - if mux == nil { - return handler - } - - // register the handler at this stage against everything under slash. More specific paths that get registered will take precedence - // this effectively delegates by default unless something specific gets registered. - mux.Handle("/", handler) - - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - mux.ServeHTTP(w, req) - }) -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index d2bdf3f74a..02463179df 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -159,7 +159,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { return err } - server, err := config.Complete().New(stopCh) + server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, stopCh) if err != nil { return err } diff --git a/vendor/BUILD b/vendor/BUILD index 6bba416665..7214cff915 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -15496,13 +15496,11 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apiserver/pkg/endpoints/filters", "//vendor:k8s.io/apiserver/pkg/endpoints/handlers/responsewriters", "//vendor:k8s.io/apiserver/pkg/endpoints/request", "//vendor:k8s.io/apiserver/pkg/registry/generic/rest", "//vendor:k8s.io/apiserver/pkg/registry/rest", "//vendor:k8s.io/apiserver/pkg/server", - "//vendor:k8s.io/apiserver/pkg/server/filters", "//vendor:k8s.io/client-go/informers", "//vendor:k8s.io/client-go/kubernetes", "//vendor:k8s.io/client-go/listers/core/v1",