promote informers into master.Config

review:

1. move informers into master extra config
2. move one post start hook into New()

fixes npe from master integration test
pull/8/head
yue9944882 2018-08-02 12:11:58 +08:00
parent db9545e69e
commit 6bac6fafa0
8 changed files with 37 additions and 27 deletions

View File

@ -166,13 +166,13 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
return nil, err
}
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, insecureServingOptions, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount)
if err != nil {
return nil, err
}
@ -181,7 +181,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
return nil, err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, versionedInformers, admissionPostStartHook)
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
@ -194,7 +194,7 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer)
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
@ -215,16 +215,12 @@ func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan
}
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, sharedInformers informers.SharedInformerFactory, versionedInformers clientgoinformers.SharedInformerFactory, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete(versionedInformers).New(delegateAPIServer)
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
sharedInformers.Start(context.StopCh)
return nil
})
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
@ -282,8 +278,6 @@ func CreateKubeAPIServerConfig(
proxyTransport *http.Transport,
) (
config *master.Config,
sharedInformers informers.SharedInformerFactory,
versionedInformers clientgoinformers.SharedInformerFactory,
insecureServingInfo *kubeserver.InsecureServingInfo,
serviceResolver aggregatorapiserver.ServiceResolver,
pluginInitializers []admission.PluginInitializer,
@ -292,6 +286,8 @@ func CreateKubeAPIServerConfig(
) {
var genericConfig *genericapiserver.Config
var storageFactory *serverstorage.DefaultStorageFactory
var sharedInformers informers.SharedInformerFactory
var versionedInformers clientgoinformers.SharedInformerFactory
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
if lastErr != nil {
return
@ -403,6 +399,9 @@ func CreateKubeAPIServerConfig(
ServiceAccountIssuer: issuer,
ServiceAccountAPIAudiences: apiAudiences,
ServiceAccountMaxExpiration: maxExpiration,
InternalInformers: sharedInformers,
VersionedInformers: versionedInformers,
},
}

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/apis/settings/install:go_default_library",
"//pkg/apis/storage/install:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/kubeapiserver/options:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master/reconcilers:go_default_library",

View File

@ -66,6 +66,7 @@ import (
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/reconcilers"
@ -167,6 +168,9 @@ type ExtraConfig struct {
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountAPIAudiences []string
ServiceAccountMaxExpiration time.Duration
VersionedInformers informers.SharedInformerFactory
InternalInformers internalinformers.SharedInformerFactory
}
type Config struct {
@ -248,9 +252,9 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
}
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (cfg *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
func (cfg *Config) Complete() CompletedConfig {
c := completedConfig{
cfg.GenericConfig.Complete(informers),
cfg.GenericConfig.Complete(cfg.ExtraConfig.VersionedInformers),
&cfg.ExtraConfig,
}
@ -367,6 +371,12 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-informers", func(context genericapiserver.PostStartHookContext) error {
if c.ExtraConfig.InternalInformers != nil {
c.ExtraConfig.InternalInformers.Start(context.StopCh)
}
return nil
})
return m, nil
}

View File

@ -41,7 +41,7 @@ import (
// TestValidOpenAPISpec verifies that the open api is added
// at the proper endpoint and the spec is valid.
func TestValidOpenAPISpec(t *testing.T) {
etcdserver, config, sharedInformers, assert := setUp(t)
etcdserver, config, assert := setUp(t)
defer etcdserver.Terminate(t)
config.GenericConfig.EnableIndex = true
@ -54,7 +54,7 @@ func TestValidOpenAPISpec(t *testing.T) {
}
config.GenericConfig.SwaggerConfig = genericapiserver.DefaultSwaggerConfig()
master, err := config.Complete(sharedInformers).New(genericapiserver.NewEmptyDelegate())
master, err := config.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@ -65,7 +65,7 @@ import (
)
// setUp is a convience function for setting up for (most) tests.
func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, informers.SharedInformerFactory, *assert.Assertions) {
func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
config := &Config{
@ -120,9 +120,9 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, informers.SharedI
if err != nil {
t.Fatalf("unable to create client set due to %v", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientset, config.GenericConfig.LoopbackClientConfig.Timeout)
config.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, config.GenericConfig.LoopbackClientConfig.Timeout)
return server, *config, sharedInformers, assert.New(t)
return server, *config, assert.New(t)
}
type fakeLocalhost443Listener struct{}
@ -202,9 +202,9 @@ func TestCertificatesRestStorageStrategies(t *testing.T) {
}
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
etcdserver, config, sharedInformers, assert := setUp(t)
etcdserver, config, assert := setUp(t)
master, err := config.Complete(sharedInformers).New(genericapiserver.NewEmptyDelegate())
master, err := config.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
t.Fatalf("Error in bringing up the master: %v", err)
}

View File

@ -113,7 +113,7 @@ func TestAggregatedAPIServer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
@ -124,7 +124,7 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerClientConfig.ServerName = ""
kubeClientConfigValue.Store(kubeAPIServerClientConfig)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook)
if err != nil {
t.Fatal(err)
}

View File

@ -111,7 +111,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
kubeAPIServerConfig, _, _, _, admissionPostStartHook, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
@ -119,7 +119,7 @@ func startTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
if setup.ModifyServerConfig != nil {
setup.ModifyServerConfig(kubeAPIServerConfig)
}
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), sharedInformers, versionedInformers, admissionPostStartHook)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.NewEmptyDelegate(), admissionPostStartHook)
if err != nil {
t.Fatal(err)
}

View File

@ -177,8 +177,8 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
glog.Fatal(err)
}
sharedInformers := informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
m, err = masterConfig.Complete(sharedInformers).New(genericapiserver.NewEmptyDelegate())
masterConfig.ExtraConfig.VersionedInformers = informers.NewSharedInformerFactory(clientset, masterConfig.GenericConfig.LoopbackClientConfig.Timeout)
m, err = masterConfig.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
closeFn()
glog.Fatalf("error in bringing up the master: %v", err)