mirror of https://github.com/k3s-io/k3s
rewire aggregation handling chain to be normal
parent
e38c575ae6
commit
b28966b48a
|
@ -91,6 +91,7 @@ type Config struct {
|
|||
EnableSwaggerUI bool
|
||||
EnableIndex bool
|
||||
EnableProfiling bool
|
||||
EnableDiscovery bool
|
||||
// Requires generic profiling enabled
|
||||
EnableContentionProfiling bool
|
||||
EnableMetrics bool
|
||||
|
@ -203,6 +204,7 @@ func NewConfig() *Config {
|
|||
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
|
||||
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
|
||||
EnableIndex: true,
|
||||
EnableDiscovery: true,
|
||||
EnableProfiling: true,
|
||||
MaxRequestsInFlight: 400,
|
||||
MaxMutatingRequestsInFlight: 200,
|
||||
|
@ -345,6 +347,10 @@ func (c *Config) Complete() completedConfig {
|
|||
c.FallThroughHandler = mux.NewPathRecorderMux()
|
||||
}
|
||||
|
||||
if c.FallThroughHandler == nil {
|
||||
c.FallThroughHandler = mux.NewPathRecorderMux()
|
||||
}
|
||||
|
||||
return completedConfig{c}
|
||||
}
|
||||
|
||||
|
@ -535,7 +541,10 @@ func installAPI(s *GenericAPIServer, c *Config, delegate http.Handler) {
|
|||
}
|
||||
}
|
||||
routes.Version{Version: c.Version}.Install(s.HandlerContainer)
|
||||
s.HandlerContainer.Add(s.DynamicApisDiscovery())
|
||||
|
||||
if c.EnableDiscovery {
|
||||
s.HandlerContainer.Add(s.DynamicApisDiscovery())
|
||||
}
|
||||
}
|
||||
|
||||
func NewRequestInfoResolver(c *Config) *apirequest.RequestInfoFactory {
|
||||
|
|
|
@ -189,6 +189,30 @@ func (s *GenericAPIServer) ListedPaths() []string {
|
|||
return s.listedPathProvider.ListedPaths()
|
||||
}
|
||||
|
||||
var EmptyDelegate = emptyDelegate{
|
||||
requestContextMapper: apirequest.NewRequestContextMapper(),
|
||||
}
|
||||
|
||||
type emptyDelegate struct {
|
||||
requestContextMapper apirequest.RequestContextMapper
|
||||
}
|
||||
|
||||
func (s emptyDelegate) UnprotectedHandler() http.Handler {
|
||||
return http.NotFoundHandler()
|
||||
}
|
||||
func (s emptyDelegate) PostStartHooks() map[string]postStartHookEntry {
|
||||
return map[string]postStartHookEntry{}
|
||||
}
|
||||
func (s emptyDelegate) HealthzChecks() []healthz.HealthzChecker {
|
||||
return []healthz.HealthzChecker{}
|
||||
}
|
||||
func (s emptyDelegate) ListedPaths() []string {
|
||||
return []string{}
|
||||
}
|
||||
func (s emptyDelegate) RequestContextMapper() apirequest.RequestContextMapper {
|
||||
return s.requestContextMapper
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Send correct mime type for .svg files.
|
||||
// TODO: remove when https://github.com/golang/go/commit/21e47d831bafb59f22b1ea8098f709677ec8ce33
|
||||
|
|
|
@ -18,7 +18,6 @@ package apiserver
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apimachinery/announced"
|
||||
|
@ -28,11 +27,9 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
genericfilters "k8s.io/apiserver/pkg/server/filters"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
|
@ -87,9 +84,9 @@ type Config struct {
|
|||
type APIAggregator struct {
|
||||
GenericAPIServer *genericapiserver.GenericAPIServer
|
||||
|
||||
contextMapper genericapirequest.RequestContextMapper
|
||||
delegateHandler http.Handler
|
||||
|
||||
handlerChainConfig *handlerChainConfig
|
||||
contextMapper genericapirequest.RequestContextMapper
|
||||
|
||||
// proxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
|
||||
// this to confirm the proxy's identity
|
||||
|
@ -109,9 +106,6 @@ type APIAggregator struct {
|
|||
serviceLister v1listers.ServiceLister
|
||||
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
// proxyMux intercepts requests that need to be proxied to backing API servers
|
||||
proxyMux *http.ServeMux
|
||||
}
|
||||
|
||||
type completedConfig struct {
|
||||
|
@ -120,6 +114,8 @@ type completedConfig struct {
|
|||
|
||||
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
|
||||
func (c *Config) Complete() completedConfig {
|
||||
// generic discovery must be disabled so that we can register our own discovery handler
|
||||
c.GenericConfig.EnableDiscovery = false
|
||||
c.GenericConfig.Complete()
|
||||
|
||||
version := version.Get()
|
||||
|
@ -134,41 +130,29 @@ func (c *Config) SkipComplete() completedConfig {
|
|||
}
|
||||
|
||||
// New returns a new instance of APIAggregator from the given config.
|
||||
func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
|
||||
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget, stopCh <-chan struct{}) (*APIAggregator, error) {
|
||||
informerFactory := informers.NewSharedInformerFactory(
|
||||
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
|
||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||
)
|
||||
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
|
||||
|
||||
proxyMux := http.NewServeMux()
|
||||
|
||||
handlerChainConfig := &handlerChainConfig{
|
||||
informers: informerFactory,
|
||||
proxyMux: proxyMux,
|
||||
serviceLister: kubeInformers.Core().V1().Services().Lister(),
|
||||
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
|
||||
}
|
||||
// most API servers don't need to do this, but we need a custom handler chain to handle the special /apis handling here
|
||||
c.Config.GenericConfig.BuildHandlerChainsFunc = handlerChainConfig.handlerChain
|
||||
|
||||
genericServer, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
|
||||
genericServer, err := c.Config.GenericConfig.SkipComplete().NewWithDelegate(delegationTarget) // completion is done in Complete, no need for a second time
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &APIAggregator{
|
||||
GenericAPIServer: genericServer,
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
handlerChainConfig: handlerChainConfig,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
serviceLister: kubeInformers.Core().V1().Services().Lister(),
|
||||
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
|
||||
proxyMux: proxyMux,
|
||||
GenericAPIServer: genericServer,
|
||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
serviceLister: kubeInformers.Core().V1().Services().Lister(),
|
||||
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
|
||||
}
|
||||
|
||||
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
|
||||
|
@ -181,6 +165,15 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
apisHandler := &apisHandler{
|
||||
codecs: Codecs,
|
||||
lister: s.lister,
|
||||
delegate: s.GenericAPIServer.FallThroughHandler,
|
||||
serviceLister: s.serviceLister,
|
||||
endpointsLister: s.endpointsLister,
|
||||
}
|
||||
s.GenericAPIServer.HandlerContainer.Handle("/apis", apisHandler)
|
||||
|
||||
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
|
@ -196,49 +189,6 @@ func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
// handlerChainConfig is the config used to build the custom handler chain for this api server
|
||||
type handlerChainConfig struct {
|
||||
informers informers.SharedInformerFactory
|
||||
proxyMux *http.ServeMux
|
||||
serviceLister v1listers.ServiceLister
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
// fallThroughHandler keeps track of the handler that comes after the proxy so that it can be used
|
||||
// to satisfy delegation cases. It is set as a side-effect while building the handler chain.
|
||||
// TODO refactor to run the proxy *after* authorization so we can simply run at the end of the handling chain
|
||||
fallThroughHandler http.Handler
|
||||
}
|
||||
|
||||
// handlerChain is a method to build the handler chain for this API server. We need a custom handler chain so that we
|
||||
// can have custom handling for `/apis`, since we're hosting aggregation differently from anyone else and we're hosting
|
||||
// the endpoints differently, since we're proxying all groups except for apiregistration.k8s.io.
|
||||
func (h *handlerChainConfig) handlerChain(apiHandler http.Handler, c *genericapiserver.Config) (secure, insecure http.Handler) {
|
||||
// add this as a filter so that we never collide with "already registered" failures on `/apis`
|
||||
handler := WithAPIs(apiHandler, Codecs, h.informers.Apiregistration().InternalVersion().APIServices(), h.serviceLister, h.endpointsLister)
|
||||
|
||||
handler = genericapifilters.WithAuthorization(handler, c.RequestContextMapper, c.Authorizer)
|
||||
h.fallThroughHandler = handler
|
||||
|
||||
// this mux is NOT protected by authorization, but DOES have authentication information
|
||||
// this is so that everyone can hit the proxy and we can properly identify the user. The backing
|
||||
// API server will deal with authorization
|
||||
handler = WithProxyMux(handler, h.proxyMux)
|
||||
|
||||
handler = genericapifilters.WithImpersonation(handler, c.RequestContextMapper, c.Authorizer)
|
||||
// audit to stdout to help with debugging as we get this started
|
||||
handler = genericapifilters.WithAudit(handler, c.RequestContextMapper, os.Stdout)
|
||||
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, genericapifilters.Unauthorized(c.SupportsBasicAuth))
|
||||
|
||||
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
|
||||
handler = genericfilters.WithPanicRecovery(handler, c.RequestContextMapper)
|
||||
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc)
|
||||
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.RequestContextMapper, c.LongRunningFunc)
|
||||
handler = genericapifilters.WithRequestInfo(handler, genericapiserver.NewRequestInfoResolver(c), c.RequestContextMapper)
|
||||
handler = genericapirequest.WithRequestContext(handler, c.RequestContextMapper)
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.
|
||||
// It's a slow moving API, so its ok to run the controller on a single thread
|
||||
func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
|
||||
|
@ -258,14 +208,14 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService) {
|
|||
// register the proxy handler
|
||||
proxyHandler := &proxyHandler{
|
||||
contextMapper: s.contextMapper,
|
||||
localDelegate: s.handlerChainConfig.fallThroughHandler,
|
||||
localDelegate: s.delegateHandler,
|
||||
proxyClientCert: s.proxyClientCert,
|
||||
proxyClientKey: s.proxyClientKey,
|
||||
}
|
||||
proxyHandler.updateAPIService(apiService)
|
||||
s.proxyHandlers[apiService.Name] = proxyHandler
|
||||
s.proxyMux.Handle(proxyPath, proxyHandler)
|
||||
s.proxyMux.Handle(proxyPath+"/", proxyHandler)
|
||||
s.GenericAPIServer.FallThroughHandler.Handle(proxyPath, proxyHandler)
|
||||
s.GenericAPIServer.FallThroughHandler.Handle(proxyPath+"/", proxyHandler)
|
||||
|
||||
// if we're dealing with the legacy group, we're done here
|
||||
if apiService.Name == legacyAPIServiceName {
|
||||
|
@ -301,17 +251,3 @@ func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
|
|||
}
|
||||
proxyHandler.removeAPIService()
|
||||
}
|
||||
|
||||
func WithProxyMux(handler http.Handler, mux *http.ServeMux) http.Handler {
|
||||
if mux == nil {
|
||||
return handler
|
||||
}
|
||||
|
||||
// register the handler at this stage against everything under slash. More specific paths that get registered will take precedence
|
||||
// this effectively delegates by default unless something specific gets registered.
|
||||
mux.Handle("/", handler)
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
mux.ServeHTTP(w, req)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error {
|
|||
return err
|
||||
}
|
||||
|
||||
server, err := config.Complete().New(stopCh)
|
||||
server, err := config.Complete().NewWithDelegate(genericapiserver.EmptyDelegate, stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -15496,13 +15496,11 @@ go_library(
|
|||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||
"//vendor:k8s.io/apiserver/pkg/endpoints/filters",
|
||||
"//vendor:k8s.io/apiserver/pkg/endpoints/handlers/responsewriters",
|
||||
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
|
||||
"//vendor:k8s.io/apiserver/pkg/registry/generic/rest",
|
||||
"//vendor:k8s.io/apiserver/pkg/registry/rest",
|
||||
"//vendor:k8s.io/apiserver/pkg/server",
|
||||
"//vendor:k8s.io/apiserver/pkg/server/filters",
|
||||
"//vendor:k8s.io/client-go/informers",
|
||||
"//vendor:k8s.io/client-go/kubernetes",
|
||||
"//vendor:k8s.io/client-go/listers/core/v1",
|
||||
|
|
Loading…
Reference in New Issue