This PR implements AdmissionOptions.ApplyTo

ApplyTo adds the admission chain to the server configuration the method lazily initializes a generic plugin
that is appended to the list of pluginInitializers.

apiserver.Config will hold an instance of SharedInformerFactory to ensure we only have once instance.
The field will be initialized in apisever.SecureServingOptions
pull/6/head
p0lyn0mial 2017-05-01 15:07:46 +02:00
parent 4807e6cadf
commit 8cea69aa98
13 changed files with 135 additions and 37 deletions

View File

@ -110,6 +110,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

View File

@ -359,39 +359,41 @@ func BuildGenericConfig(s *options.ServerRunOptions) (*genericapiserver.Config,
genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}
genericConfig.AdmissionControl, err = BuildAdmission(s,
s.Admission.Plugins,
pluginInitializer, err := BuildAdmissionPluginInitializer(
s,
client,
sharedInformers,
genericConfig.Authorizer,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
}
err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to initialize admission: %v", err)
}
return genericConfig, sharedInformers, insecureServingOptions, nil
}
// BuildAdmission constructs the admission chain
func BuildAdmission(s *options.ServerRunOptions, plugins *admission.Plugins, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.Interface, error) {
admissionControlPluginNames := strings.Split(s.Admission.Control, ",")
// BuildAdmissionPluginInitializer constructs the admission plugin initializer
func BuildAdmissionPluginInitializer(s *options.ServerRunOptions, client internalclientset.Interface, sharedInformers informers.SharedInformerFactory, apiAuthorizer authorizer.Authorizer) (admission.PluginInitializer, error) {
var cloudConfig []byte
var err error
if s.CloudProvider.CloudConfigFile != "" {
var err error
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
// TODO: use a dynamic restmapper. See https://github.com/kubernetes/kubernetes/pull/42615.
restMapper := api.Registry.RESTMapper()
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, restMapper)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to read plugin config: %v", err)
}
return plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
return pluginInitializer, nil
}
// BuildAuthenticator constructs the authenticator

View File

@ -70,6 +70,8 @@ func NewServerRunOptions() *ServerRunOptions {
}
// Overwrite the default for storage data format.
s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
// Set the default for admission plugins names
s.Admission.PluginNames = []string{"AlwaysAdmit"}
return &s
}

View File

@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
@ -185,21 +184,20 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
return fmt.Errorf("invalid Authorization Config: %v", err)
}
admissionControlPluginNames := strings.Split(s.Admission.Control, ",")
var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig, nil)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.Admission.ControlConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}
admissionController, err := kubeapiserveradmission.Plugins.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer)
err = s.Admission.ApplyTo(
genericConfig,
pluginInitializer,
)
if err != nil {
return fmt.Errorf("failed to initialize plugins: %v", err)
}
@ -208,7 +206,6 @@ func NonBlockingRun(s *options.ServerRunOptions, stopCh <-chan struct{}) error {
genericConfig.Version = &kubeVersion
genericConfig.Authenticator = apiAuthenticator
genericConfig.Authorizer = apiAuthorizer
genericConfig.AdmissionControl = admissionController
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, api.Scheme)
genericConfig.OpenAPIConfig.PostProcessSpec = postProcessOpenAPISpecForBackwardCompatibility
genericConfig.OpenAPIConfig.SecurityDefinitions = securityDefinitions

View File

@ -37,6 +37,8 @@ import (
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
@ -101,6 +103,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
TLSClientConfig: &tls.Config{},
})
clientset, err := kubernetes.NewForConfig(config.GenericConfig.LoopbackClientConfig)
if err != nil {
t.Fatalf("unable to create client set due to %v", err)
}
config.GenericConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.GenericConfig.LoopbackClientConfig.Timeout)
return server, *config, assert.New(t)
}

View File

@ -54,6 +54,7 @@ import (
genericfilters "k8s.io/apiserver/pkg/server/filters"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert"
@ -109,6 +110,8 @@ type Config struct {
// Will default to a value based on secure serving info and available ipv4 IPs.
ExternalAddress string
// SharedInformerFactory provides shared informers for resources
SharedInformerFactory informers.SharedInformerFactory
//===========================================================================
// Fields you probably don't care about changing
//===========================================================================
@ -405,6 +408,16 @@ func (c completedConfig) New(delegationTarget DelegationTarget) (*GenericAPIServ
s.postStartHooks[k] = v
}
genericApiServerHookName := "generic-apiserver-start-informers"
if c.SharedInformerFactory != nil && !s.isHookRegistered(genericApiServerHookName) {
err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
c.SharedInformerFactory.Start(context.StopCh)
return nil
})
if err != nil {
return nil, err
}
}
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {

View File

@ -28,6 +28,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
)
@ -38,6 +40,11 @@ func TestNewWithDelegate(t *testing.T) {
delegateConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
delegateConfig.LoopbackClientConfig = &rest.Config{}
delegateConfig.SwaggerConfig = DefaultSwaggerConfig()
clientset := fake.NewSimpleClientset()
if clientset == nil {
t.Fatal("unable to create fake client set")
}
delegateConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, delegateConfig.LoopbackClientConfig.Timeout)
delegateHealthzCalled := false
delegateConfig.HealthzChecks = append(delegateConfig.HealthzChecks, healthz.NamedCheck("delegate-health", func(r *http.Request) error {
@ -66,6 +73,7 @@ func TestNewWithDelegate(t *testing.T) {
wrappingConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
wrappingConfig.LoopbackClientConfig = &rest.Config{}
wrappingConfig.SwaggerConfig = DefaultSwaggerConfig()
wrappingConfig.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, wrappingConfig.LoopbackClientConfig.Timeout)
wrappingHealthzCalled := false
wrappingConfig.HealthzChecks = append(wrappingConfig.HealthzChecks, healthz.NamedCheck("wrapping-health", func(r *http.Request) error {
@ -102,6 +110,7 @@ func TestNewWithDelegate(t *testing.T) {
"/healthz/delegate-health",
"/healthz/ping",
"/healthz/poststarthook/delegate-post-start-hook",
"/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/wrapping-post-start-hook",
"/healthz/wrapping-health",
"/swaggerapi/"
@ -110,6 +119,7 @@ func TestNewWithDelegate(t *testing.T) {
checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok
[-]wrapping-health failed: reason withheld
[-]delegate-health failed: reason withheld
[+]poststarthook/generic-apiserver-start-informers ok
[+]poststarthook/delegate-post-start-hook ok
[+]poststarthook/wrapping-post-start-hook ok
healthz check failed

View File

@ -48,6 +48,8 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
)
@ -90,6 +92,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
config.LegacyAPIGroupPrefixes = sets.NewString("/api")
config.LoopbackClientConfig = &restclient.Config{}
clientset := fake.NewSimpleClientset()
if clientset == nil {
t.Fatal("unable to create fake client set")
}
config.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, config.LoopbackClientConfig.Timeout)
// TODO restore this test, but right now, eliminate our cycle
// config.OpenAPIConfig = DefaultOpenAPIConfig(testGetOpenAPIDefinitions, runtime.NewScheme())
// config.OpenAPIConfig.Info = &spec.Info{
@ -300,17 +308,13 @@ func TestPrepareRun(t *testing.T) {
defer etcdserver.Terminate(t)
assert.NotNil(config.SwaggerConfig)
// assert.NotNil(config.OpenAPIConfig)
server := httptest.NewServer(s.Handler.GoRestfulContainer.ServeMux)
defer server.Close()
done := make(chan struct{})
s.PrepareRun()
// openapi is installed in PrepareRun
// resp, err := http.Get(server.URL + "/swagger.json")
// assert.NoError(err)
// assert.Equal(http.StatusOK, resp.StatusCode)
s.RunPostStartHooks(done)
// swagger is installed in PrepareRun
resp, err := http.Get(server.URL + "/swaggerapi/")

View File

@ -106,6 +106,14 @@ func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
}
}
// isHookRegistered checks whether a given hook is registered
func (s *GenericAPIServer) isHookRegistered(name string) bool {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
_, exists := s.postStartHooks[name]
return exists
}
func runPostStartHook(name string, entry postStartHookEntry, context PostStartHookContext) {
var err error
func() {
@ -117,7 +125,6 @@ func runPostStartHook(name string, entry postStartHookEntry, context PostStartHo
if err != nil {
glog.Fatalf("PostStartHook %q failed: %v", name, err)
}
close(entry.done)
}

View File

@ -17,33 +17,70 @@ limitations under the License.
package options
import (
"fmt"
"strings"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
)
// AdmissionOptions holds the admission options
type AdmissionOptions struct {
Control string
ControlConfigFile string
Plugins *admission.Plugins
PluginNames []string
ConfigFile string
Plugins *admission.Plugins
}
// NewAdmissionOptions creates a new instance of AdmissionOptions
func NewAdmissionOptions(plugins *admission.Plugins) *AdmissionOptions {
return &AdmissionOptions{
Plugins: plugins,
Control: "AlwaysAdmit",
Plugins: plugins,
PluginNames: []string{},
}
}
// AddFlags adds flags related to admission for a specific APIServer to the specified FlagSet
func (a *AdmissionOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.Control, "admission-control", a.Control, ""+
fs.StringSliceVar(&a.PluginNames, "admission-control", a.PluginNames, ""+
"Ordered list of plug-ins to do admission control of resources into cluster. "+
"Comma-delimited list of: "+strings.Join(a.Plugins.Registered(), ", ")+".")
fs.StringVar(&a.ControlConfigFile, "admission-control-config-file", a.ControlConfigFile,
fs.StringVar(&a.ConfigFile, "admission-control-config-file", a.ConfigFile,
"File with admission control configuration.")
}
// ApplyTo adds the admission chain to the server configuration
// the method lazily initializes a generic plugin that is appended to the list of pluginInitializers
// note this method uses:
// genericconfig.LoopbackClientConfig
// genericconfig.SharedInformerFactory
// genericconfig.Authorizer
func (a *AdmissionOptions) ApplyTo(serverCfg *server.Config, pluginInitializers ...admission.PluginInitializer) error {
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(a.PluginNames, a.ConfigFile)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}
clientset, err := kubernetes.NewForConfig(serverCfg.LoopbackClientConfig)
if err != nil {
return err
}
genericInitializer, err := initializer.New(clientset, serverCfg.SharedInformerFactory, serverCfg.Authorizer)
if err != nil {
return err
}
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
initializersChain = append(initializersChain, pluginInitializers...)
admissionChain, err := a.Plugins.NewFromPlugins(a.PluginNames, pluginsConfigProvider, initializersChain)
if err != nil {
return err
}
serverCfg.AdmissionControl = admissionChain
return nil
}

View File

@ -32,6 +32,8 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
certutil "k8s.io/client-go/util/cert"
)
@ -167,6 +169,13 @@ func (s *SecureServingOptions) ApplyTo(c *server.Config) error {
c.SecureServingInfo.SNICerts[server.LoopbackClientServerNameOverride] = &tlsCert
}
// create shared informers
clientset, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return err
}
c.SharedInformerFactory = informers.NewSharedInformerFactory(clientset, c.LoopbackClientConfig.Timeout)
return nil
}

View File

@ -104,7 +104,6 @@ func startServer(t *testing.T, runOptions *options.ServerRunOptions, stopChan <-
}
runOptions.InsecureServing.BindPort = port
err = app.NonBlockingRun(runOptions, stopChan)
if err != nil {
t.Logf("Error starting the %s: %v", apiNoun, err)

View File

@ -47,6 +47,8 @@ import (
"k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
extinformers "k8s.io/client-go/informers"
extclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
@ -245,7 +247,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
masterConfig.GenericConfig.LoopbackClientConfig.BearerToken = privilegedLoopbackToken
m, err := masterConfig.Complete().New(genericapiserver.EmptyDelegate)
clientset, err := extclient.NewForConfig(masterConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
glog.Fatal(err)
}
masterConfig.GenericConfig.SharedInformerFactory = extinformers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
m, err = masterConfig.Complete().New(genericapiserver.EmptyDelegate)
if err != nil {
closeFn()
glog.Fatalf("error in bringing up the master: %v", err)