From 8cea69aa9812d6627ebdfa4f8b9c1d7624a8f3f5 Mon Sep 17 00:00:00 2001 From: p0lyn0mial Date: Mon, 1 May 2017 15:07:46 +0200 Subject: [PATCH] This PR implements AdmissionOptions.ApplyTo ApplyTo adds the admission chain to the server configuration the method lazily initializes a generic plugin that is appended to the list of pluginInitializers. apiserver.Config will hold an instance of SharedInformerFactory to ensure we only have once instance. The field will be initialized in apisever.SecureServingOptions --- cmd/kube-apiserver/app/options/options.go | 2 + cmd/kube-apiserver/app/server.go | 26 +++++----- .../app/options/options.go | 2 + .../cmd/federation-apiserver/app/server.go | 15 +++--- pkg/master/master_test.go | 8 +++ .../src/k8s.io/apiserver/pkg/server/config.go | 13 +++++ .../apiserver/pkg/server/config_test.go | 10 ++++ .../pkg/server/genericapiserver_test.go | 16 +++--- .../src/k8s.io/apiserver/pkg/server/hooks.go | 9 +++- .../apiserver/pkg/server/options/admission.go | 51 ++++++++++++++++--- .../apiserver/pkg/server/options/serving.go | 9 ++++ test/integration/federation/framework/api.go | 1 - test/integration/framework/master_utils.go | 10 +++- 13 files changed, 135 insertions(+), 37 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index c3719901b4..a2f606114e 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -110,6 +110,8 @@ func NewServerRunOptions() *ServerRunOptions { } // Overwrite the default for storage data format. s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" + // Set the default for admission plugins names + s.Admission.PluginNames = []string{"AlwaysAdmit"} return &s } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f98ab300ae..ece70bb421 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -359,39 +359,41 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config, genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName) } - genericConfig.AdmissionControl, err = BuildAdmission(s, - s.Admission.Plugins, + pluginInitializer, err := BuildAdmissionPluginInitializer( + s, client, sharedInformers, genericConfig.Authorizer, ) if err != nil { - return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) + return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err) } + err = s.Admission.ApplyTo( + genericConfig, + pluginInitializer) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err) + } return genericConfig, sharedInformers, insecureServingOptions, nil } -// BuildAdmission constructs the admission chain -func BuildAdmission(s *options.ServerRunOptions, plugins *admission.Plugins, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) { - admissionControlPluginNames := strings.Split(s.Admission.Control, ",") +// BuildAdmissionPluginInitializer constructs the admission plugin initializer +func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) { var cloudConfig []byte - var err error if s.CloudProvider.CloudConfigFile != "" { + var err error cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile) if err != nil { glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err) } } + // TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615. restMapper := api.Registry.RESTMapper() pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper) - admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile) - if err != nil { - return nil, fmt.Errorf("failed to read plugin config: %v", err) - } - return plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer) + return pluginInitializer, nil } // BuildAuthenticator constructs the authenticator diff --git a/federation/cmd/federation-apiserver/app/options/options.go b/federation/cmd/federation-apiserver/app/options/options.go index a2f49d491d..b4d9292e50 100644 --- a/federation/cmd/federation-apiserver/app/options/options.go +++ b/federation/cmd/federation-apiserver/app/options/options.go @@ -70,6 +70,8 @@ func NewServerRunOptions() *ServerRunOptions { } // Overwrite the default for storage data format. s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" + // Set the default for admission plugins names + s.Admission.PluginNames = []string{"AlwaysAdmit"} return &s } diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index 05a8d3e2fe..fbe481cce7 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apiserver/pkg/admission" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/filters" serverstorage "k8s.io/apiserver/pkg/server/storage" @@ -185,21 +184,20 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { return fmt.Errorf("invalid Authorization Config: %v", err) } - admissionControlPluginNames := strings.Split(s.Admission.Control, ",") var cloudConfig []byte - if s.CloudProvider.CloudConfigFile != "" { cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile) if err != nil { glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err) } } + pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil) - admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile) - if err != nil { - return fmt.Errorf("failed to read plugin config: %v", err) - } - admissionController, err := kubeapiserveradmission.Plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer) + + err = s.Admission.ApplyTo( + genericConfig, + pluginInitializer, + ) if err != nil { return fmt.Errorf("failed to initialize plugins: %v", err) } @@ -208,7 +206,6 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error { genericConfig.Version = &kubeVersion genericConfig.Authenticator = apiAuthenticator genericConfig.Authorizer = apiAuthorizer - genericConfig.AdmissionControl = admissionController genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, api.Scheme) genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 57f46dd1fc..b978589019 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -37,6 +37,8 @@ import ( "k8s.io/apiserver/pkg/server/options" serverstorage "k8s.io/apiserver/pkg/server/storage" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -101,6 +103,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion TLSClientConfig: &tls.Config{}, }) + clientset, err := kubernetes.NewForConfig(config.GenericConfig.LoopbackClientConfig) + if err != nil { + t.Fatalf("unable to create client set due to %v", err) + } + config.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.GenericConfig.LoopbackClientConfig.Timeout) + return server, *config, assert.New(t) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index a1e2db3b1f..e949a93e91 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -54,6 +54,7 @@ import ( genericfilters "k8s.io/apiserver/pkg/server/filters" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" + "k8s.io/client-go/informers" restclient "k8s.io/client-go/rest" certutil "k8s.io/client-go/util/cert" @@ -109,6 +110,8 @@ type Config struct { // Will default to a value based on secure serving info and available ipv4 IPs. ExternalAddress string + // SharedInformerFactory provides shared informers for resources + SharedInformerFactory informers.SharedInformerFactory //=========================================================================== // Fields you probably don't care about changing //=========================================================================== @@ -405,6 +408,16 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ s.postStartHooks[k] = v } + genericApiServerHookName := "generic-apiserver-start-informers" + if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) { + err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { + c.SharedInformerFactory.Start(context.StopCh) + return nil + }) + if err != nil { + return nil, err + } + } for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { diff --git a/staging/src/k8s.io/apiserver/pkg/server/config_test.go b/staging/src/k8s.io/apiserver/pkg/server/config_test.go index 0ce592c2af..8597ec541e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config_test.go @@ -28,6 +28,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/healthz" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" ) @@ -38,6 +40,11 @@ func TestNewWithDelegate(t *testing.T) { delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") delegateConfig.LoopbackClientConfig = &rest.Config{} delegateConfig.SwaggerConfig = DefaultSwaggerConfig() + clientset := fake.NewSimpleClientset() + if clientset == nil { + t.Fatal("unable to create fake client set") + } + delegateConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout) delegateHealthzCalled := false delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error { @@ -66,6 +73,7 @@ func TestNewWithDelegate(t *testing.T) { wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") wrappingConfig.LoopbackClientConfig = &rest.Config{} wrappingConfig.SwaggerConfig = DefaultSwaggerConfig() + wrappingConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout) wrappingHealthzCalled := false wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error { @@ -102,6 +110,7 @@ func TestNewWithDelegate(t *testing.T) { "/healthz/delegate-health", "/healthz/ping", "/healthz/poststarthook/delegate-post-start-hook", + "/healthz/poststarthook/generic-apiserver-start-informers", "/healthz/poststarthook/wrapping-post-start-hook", "/healthz/wrapping-health", "/swaggerapi/" @@ -110,6 +119,7 @@ func TestNewWithDelegate(t *testing.T) { checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok [-]wrapping-health failed: reason withheld [-]delegate-health failed: reason withheld +[+]poststarthook/generic-apiserver-start-informers ok [+]poststarthook/delegate-post-start-hook ok [+]poststarthook/wrapping-post-start-hook ok healthz check failed diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go index 6dfb340861..c1f355f0ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver_test.go @@ -48,6 +48,8 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" ) @@ -90,6 +92,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion config.LegacyAPIGroupPrefixes = sets.NewString("/api") config.LoopbackClientConfig = &restclient.Config{} + clientset := fake.NewSimpleClientset() + if clientset == nil { + t.Fatal("unable to create fake client set") + } + config.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout) + // TODO restore this test, but right now, eliminate our cycle // config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme()) // config.OpenAPIConfig.Info = &spec.Info{ @@ -300,17 +308,13 @@ func TestPrepareRun(t *testing.T) { defer etcdserver.Terminate(t) assert.NotNil(config.SwaggerConfig) - // assert.NotNil(config.OpenAPIConfig) server := httptest.NewServer(s.Handler.GoRestfulContainer.ServeMux) defer server.Close() + done := make(chan struct{}) s.PrepareRun() - - // openapi is installed in PrepareRun - // resp, err := http.Get(server.URL + "/swagger.json") - // assert.NoError(err) - // assert.Equal(http.StatusOK, resp.StatusCode) + s.RunPostStartHooks(done) // swagger is installed in PrepareRun resp, err := http.Get(server.URL + "/swaggerapi/") diff --git a/staging/src/k8s.io/apiserver/pkg/server/hooks.go b/staging/src/k8s.io/apiserver/pkg/server/hooks.go index 1d6eb8cc86..5fbae16ae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/hooks.go +++ b/staging/src/k8s.io/apiserver/pkg/server/hooks.go @@ -106,6 +106,14 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { } } +// isHookRegistered checks whether a given hook is registered +func (s *GenericAPIServer) isHookRegistered(name string) bool { + s.postStartHookLock.Lock() + defer s.postStartHookLock.Unlock() + _, exists := s.postStartHooks[name] + return exists +} + func runPostStartHook(name string, entry postStartHookEntry, context PostStartHookContext) { var err error func() { @@ -117,7 +125,6 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo if err != nil { glog.Fatalf("PostStartHook %q failed: %v", name, err) } - close(entry.done) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go index 6f1774a72b..6316c065f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/admission.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/admission.go @@ -17,33 +17,70 @@ limitations under the License. package options import ( + "fmt" "strings" "github.com/spf13/pflag" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/kubernetes" ) // AdmissionOptions holds the admission options type AdmissionOptions struct { - Control string - ControlConfigFile string - Plugins *admission.Plugins + PluginNames []string + ConfigFile string + Plugins *admission.Plugins } // NewAdmissionOptions creates a new instance of AdmissionOptions func NewAdmissionOptions(plugins *admission.Plugins) *AdmissionOptions { return &AdmissionOptions{ - Plugins: plugins, - Control: "AlwaysAdmit", + Plugins: plugins, + PluginNames: []string{}, } } // AddFlags adds flags related to admission for a specific APIServer to the specified FlagSet func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&a.Control, "admission-control", a.Control, ""+ + fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, ""+ "Ordered list of plug-ins to do admission control of resources into cluster. "+ "Comma-delimited list of: "+strings.Join(a.Plugins.Registered(), ", ")+".") - fs.StringVar(&a.ControlConfigFile, "admission-control-config-file", a.ControlConfigFile, + fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile, "File with admission control configuration.") } + +// ApplyTo adds the admission chain to the server configuration +// the method lazily initializes a generic plugin that is appended to the list of pluginInitializers +// note this method uses: +// genericconfig.LoopbackClientConfig +// genericconfig.SharedInformerFactory +// genericconfig.Authorizer +func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers ...admission.PluginInitializer) error { + pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile) + if err != nil { + return fmt.Errorf("failed to read plugin config: %v", err) + } + + clientset, err := kubernetes.NewForConfig(serverCfg.LoopbackClientConfig) + if err != nil { + return err + } + genericInitializer, err := initializer.New(clientset, serverCfg.SharedInformerFactory, serverCfg.Authorizer) + if err != nil { + return err + } + initializersChain := admission.PluginInitializers{} + pluginInitializers = append(pluginInitializers, genericInitializer) + initializersChain = append(initializersChain, pluginInitializers...) + + admissionChain, err := a.Plugins.NewFromPlugins(a.PluginNames, pluginsConfigProvider, initializersChain) + if err != nil { + return err + } + + serverCfg.AdmissionControl = admissionChain + return nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/serving.go b/staging/src/k8s.io/apiserver/pkg/server/options/serving.go index 7b9ea97847..a53252f72e 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/serving.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/serving.go @@ -32,6 +32,8 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/server" utilflag "k8s.io/apiserver/pkg/util/flag" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" certutil "k8s.io/client-go/util/cert" ) @@ -167,6 +169,13 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error { c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert } + // create shared informers + clientset, err := kubernetes.NewForConfig(c.LoopbackClientConfig) + if err != nil { + return err + } + c.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, c.LoopbackClientConfig.Timeout) + return nil } diff --git a/test/integration/federation/framework/api.go b/test/integration/federation/framework/api.go index f12f2d8f26..07df7f09a7 100644 --- a/test/integration/federation/framework/api.go +++ b/test/integration/federation/framework/api.go @@ -104,7 +104,6 @@ func startServer(t *testing.T, runOptions *options.ServerRunOptions, stopChan <- } runOptions.InsecureServing.BindPort = port - err = app.NonBlockingRun(runOptions, stopChan) if err != nil { t.Logf("Error starting the %s: %v", apiNoun, err) diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 88cc19856b..499a842493 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -47,6 +47,8 @@ import ( "k8s.io/apiserver/pkg/server/options" serverstorage "k8s.io/apiserver/pkg/server/storage" "k8s.io/apiserver/pkg/storage/storagebackend" + extinformers "k8s.io/client-go/informers" + extclient "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" @@ -245,7 +247,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv masterConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken - m, err := masterConfig.Complete().New(genericapiserver.EmptyDelegate) + clientset, err := extclient.NewForConfig(masterConfig.GenericConfig.LoopbackClientConfig) + if err != nil { + glog.Fatal(err) + } + masterConfig.GenericConfig.SharedInformerFactory = extinformers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout) + + m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate) if err != nil { closeFn() glog.Fatalf("error in bringing up the master: %v", err)