make kube-apiserver ServerRunOptions setdefault and Validate before use

pull/8/head
hzxuzhonghu 2018-02-27 12:22:00 +08:00
parent 200bc66938
commit 8cce8bdc85
7 changed files with 77 additions and 50 deletions

View File

@ -112,15 +112,23 @@ func NewAPIServerCommand() *cobra.Command {
for the api objects which include pods, services, replicationcontrollers, and for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`, cluster's shared state through which all other components interact.`,
Run: func(cmd *cobra.Command, args []string) { RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested() verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags()) utilflag.PrintFlags(cmd.Flags())
stopCh := server.SetupSignalHandler() // set default options
if err := Run(s, stopCh); err != nil { completedOptions, err := Complete(s)
fmt.Fprintf(os.Stderr, "%v\n", err) if err != nil {
os.Exit(1) return err
} }
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
stopCh := server.SetupSignalHandler()
return Run(completedOptions, stopCh)
}, },
} }
s.AddFlags(cmd.Flags()) s.AddFlags(cmd.Flags())
@ -129,11 +137,11 @@ cluster's shared state through which all other components interact.`,
} }
// Run runs the specified APIServer. This should never exit. // Run runs the specified APIServer. This should never exit.
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error { func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version // To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get()) glog.Infof("Version: %+v", version.Get())
server, err := CreateServerChain(runOptions, stopCh) server, err := CreateServerChain(completeOptions, stopCh)
if err != nil { if err != nil {
return err return err
} }
@ -142,19 +150,19 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
} }
// CreateServerChain creates the apiservers connected via delegation. // CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) { func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions) nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport) kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// If additional API servers are added, they should be gated. // If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, runOptions) apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -189,7 +197,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
apiExtensionsServer.GenericAPIServer.PrepareRun() apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain // aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer) aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -224,7 +232,7 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g
} }
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes. // CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) { func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed // Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc var proxyDialerFn utilnet.DialFunc
@ -270,7 +278,7 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig( func CreateKubeAPIServerConfig(
s *options.ServerRunOptions, s completedServerRunOptions,
nodeTunneler tunneler.Tunneler, nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport, proxyTransport *http.Transport,
) ( ) (
@ -282,19 +290,8 @@ func CreateKubeAPIServerConfig(
pluginInitializers []admission.PluginInitializer, pluginInitializers []admission.PluginInitializer,
lastErr error, lastErr error,
) { ) {
// set defaults in the options before trying to create the generic config
if lastErr = defaultOptions(s); lastErr != nil {
return
}
// validate options
if errs := s.Validate(); len(errs) != 0 {
lastErr = utilerrors.NewAggregate(errs)
return
}
var genericConfig *genericapiserver.Config var genericConfig *genericapiserver.Config
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s, proxyTransport) genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport)
if lastErr != nil { if lastErr != nil {
return return
} }
@ -322,7 +319,7 @@ func CreateKubeAPIServerConfig(
return return
} }
storageFactory, lastErr := BuildStorageFactory(s, genericConfig.MergedResourceConfig) storageFactory, lastErr := BuildStorageFactory(s.ServerRunOptions, genericConfig.MergedResourceConfig)
if lastErr != nil { if lastErr != nil {
return return
} }
@ -673,21 +670,29 @@ func BuildStorageFactory(s *options.ServerRunOptions, apiResourceConfig *servers
return storageFactory, nil return storageFactory, nil
} }
func defaultOptions(s *options.ServerRunOptions) error { // completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
type completedServerRunOptions struct {
*options.ServerRunOptions
}
// Complete set default ServerRunOptions.
// Should be called after kube-apiserver flags parsed.
func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
var options completedServerRunOptions
// set defaults // set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil { if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
return err return options, err
} }
if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing); err != nil { if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing); err != nil {
return err return options, err
} }
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil { if err != nil {
return fmt.Errorf("error determining service IP ranges: %v", err) return options, fmt.Errorf("error determining service IP ranges: %v", err)
} }
s.ServiceClusterIPRange = serviceIPRange s.ServiceClusterIPRange = serviceIPRange
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil { if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err) return options, fmt.Errorf("error creating self-signed certificates: %v", err)
} }
if len(s.GenericServerRunOptions.ExternalHost) == 0 { if len(s.GenericServerRunOptions.ExternalHost) == 0 {
@ -697,7 +702,7 @@ func defaultOptions(s *options.ServerRunOptions) error {
if hostname, err := os.Hostname(); err == nil { if hostname, err := os.Hostname(); err == nil {
s.GenericServerRunOptions.ExternalHost = hostname s.GenericServerRunOptions.ExternalHost = hostname
} else { } else {
return fmt.Errorf("error finding host name: %v", err) return options, fmt.Errorf("error finding host name: %v", err)
} }
} }
glog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost) glog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
@ -752,7 +757,7 @@ func defaultOptions(s *options.ServerRunOptions) error {
} }
s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes) s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
if err != nil { if err != nil {
return err return options, err
} }
} }
@ -769,8 +774,8 @@ func defaultOptions(s *options.ServerRunOptions) error {
} }
} }
} }
options.ServerRunOptions = s
return nil return options, nil
} }
func readCAorNil(file string) ([]byte, error) { func readCAorNil(file string) ([]byte, error) {

View File

@ -102,9 +102,13 @@ func StartTestServer(t Logger, customFlags []string, storageConfig *storagebacke
s.APIEnablement.RuntimeConfig.Set("api/all=true") s.APIEnablement.RuntimeConfig.Set("api/all=true")
fs.Parse(customFlags) fs.Parse(customFlags)
completedOptions, err := app.Complete(s)
if err != nil {
return result, fmt.Errorf("failed to set default ServerRunOptions: %v", err)
}
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(s, stopCh) server, err := app.CreateServerChain(completedOptions, stopCh)
if err != nil { if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err) return result, fmt.Errorf("failed to create server chain: %v", err)

View File

@ -1,5 +1,6 @@
cluster/images/etcd-version-monitor cluster/images/etcd-version-monitor
cmd/hyperkube cmd/hyperkube
cmd/kube-apiserver/app
cmd/kube-controller-manager/app cmd/kube-controller-manager/app
cmd/kube-proxy/app cmd/kube-proxy/app
cmd/kube-scheduler/app cmd/kube-scheduler/app

View File

@ -40,27 +40,33 @@ func NewAPIServer() *APIServer {
// Start starts the apiserver, returns when apiserver is ready. // Start starts the apiserver, returns when apiserver is ready.
func (a *APIServer) Start() error { func (a *APIServer) Start() error {
config := options.NewServerRunOptions() o := options.NewServerRunOptions()
config.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()} o.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()}
// TODO: Current setup of etcd in e2e-node tests doesn't support etcd v3 // TODO: Current setup of etcd in e2e-node tests doesn't support etcd v3
// protocol. We should migrate it to use the same infrastructure as all // protocol. We should migrate it to use the same infrastructure as all
// other tests (pkg/storage/etcd/testing). // other tests (pkg/storage/etcd/testing).
config.Etcd.StorageConfig.Type = "etcd2" o.Etcd.StorageConfig.Type = "etcd2"
_, ipnet, err := net.ParseCIDR(clusterIPRange) _, ipnet, err := net.ParseCIDR(clusterIPRange)
if err != nil { if err != nil {
return err return err
} }
config.ServiceClusterIPRange = *ipnet o.ServiceClusterIPRange = *ipnet
config.AllowPrivileged = true o.AllowPrivileged = true
config.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} o.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
errCh := make(chan error) errCh := make(chan error)
go func() { go func() {
defer close(errCh) defer close(errCh)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
err := apiserver.Run(config, stopCh) completedOptions, err := apiserver.Complete(o)
if err != nil {
errCh <- fmt.Errorf("set apiserver default options error: %v", err)
return
}
err = apiserver.Run(completedOptions, stopCh)
if err != nil { if err != nil {
errCh <- fmt.Errorf("run apiserver error: %v", err) errCh <- fmt.Errorf("run apiserver error: %v", err)
return
} }
}() }()

View File

@ -735,12 +735,16 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
completedOptions, err := app.Complete(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -110,12 +110,16 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name() kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
completedOptions, err := app.Complete(kubeAPIServerOptions)
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -61,6 +61,9 @@ func runBasicSecureAPIServer(t *testing.T, ciphers []string) (uint32, error) {
kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
if err := app.DefaultOptions(kubeAPIServerOptions); err != nil {
t.Fatal(err)
}
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions) tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil { if err != nil {