diff --git a/cmd/hyperkube/BUILD b/cmd/hyperkube/BUILD index a73441c336..0edcdf1035 100644 --- a/cmd/hyperkube/BUILD +++ b/cmd/hyperkube/BUILD @@ -65,6 +65,7 @@ go_library( "//plugin/cmd/kube-scheduler/app:go_default_library", "//plugin/cmd/kube-scheduler/app/options:go_default_library", "//vendor:github.com/spf13/pflag", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/server/healthz", "//vendor:k8s.io/apiserver/pkg/util/flag", "//vendor:k8s.io/apiserver/pkg/util/logs", diff --git a/cmd/hyperkube/kube-aggregator.go b/cmd/hyperkube/kube-aggregator.go index ca948792ee..8d30553914 100644 --- a/cmd/hyperkube/kube-aggregator.go +++ b/cmd/hyperkube/kube-aggregator.go @@ -19,6 +19,7 @@ package main import ( "os" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kube-aggregator/pkg/cmd/server" ) @@ -39,7 +40,7 @@ func NewKubeAggregator() *Server { if err := o.Validate(args); err != nil { return err } - if err := o.RunAggregator(); err != nil { + if err := o.RunAggregator(wait.NeverStop); err != nil { return err } return nil diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 48a1df541f..f2e036e84c 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -85,26 +85,47 @@ cluster's shared state through which all other components interact.`, // Run runs the specified APIServer. This should never exit. func Run(s *options.ServerRunOptions) error { + config, sharedInformers, err := BuildMasterConfig(s) + if err != nil { + return err + } + + return RunServer(config, sharedInformers, wait.NeverStop) +} + +// RunServer uses the provided config and shared informers to run the apiserver. It does not return. +func RunServer(config *master.Config, sharedInformers informers.SharedInformerFactory, stopCh <-chan struct{}) error { + m, err := config.Complete().New() + if err != nil { + return err + } + + sharedInformers.Start(stopCh) + return m.GenericAPIServer.PrepareRun().Run(stopCh) +} + +// BuildMasterConfig creates all the resources for running the API server, but runs none of them +func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.SharedInformerFactory, error) { // set defaults if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing, s.InsecureServing); err != nil { - return err + return nil, nil, err } serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) if err != nil { - return fmt.Errorf("error determining service IP ranges: %v", err) + return nil, nil, fmt.Errorf("error determining service IP ranges: %v", err) } if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), apiServerServiceIP); err != nil { - return fmt.Errorf("error creating self-signed certificates: %v", err) + return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err) } if err := s.CloudProvider.DefaultExternalHost(s.GenericServerRunOptions); err != nil { - return fmt.Errorf("error setting the external host value: %v", err) + return nil, nil, fmt.Errorf("error setting the external host value: %v", err) } s.Authentication.ApplyAuthorization(s.Authorization) // validate options if errs := s.Validate(); len(errs) != 0 { - return utilerrors.NewAggregate(errs) + return nil, nil, utilerrors.NewAggregate(errs) } // create config from options @@ -112,22 +133,22 @@ func Run(s *options.ServerRunOptions) error { WithSerializer(api.Codecs) if err := s.GenericServerRunOptions.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } if err := s.InsecureServing.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } if err := s.SecureServing.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } if err := s.Authentication.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } if err := s.Audit.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } if err := s.Features.ApplyTo(genericConfig); err != nil { - return err + return nil, nil, err } // Use protobufs for self-communication. @@ -155,7 +176,7 @@ func Run(s *options.ServerRunOptions) error { var installSSHKey tunneler.InstallSSHKey cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider.CloudProvider, s.CloudProvider.CloudConfigFile) if err != nil { - return fmt.Errorf("cloud provider could not be initialized: %v", err) + return nil, nil, fmt.Errorf("cloud provider could not be initialized: %v", err) } if cloud != nil { if instances, supported := cloud.Instances(); supported { @@ -163,10 +184,10 @@ func Run(s *options.ServerRunOptions) error { } } if s.KubeletConfig.Port == 0 { - return fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") + return nil, nil, fmt.Errorf("must enable kubelet port if proxy ssh-tunneling is specified") } if s.KubeletConfig.ReadOnlyPort == 0 { - return fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") + return nil, nil, fmt.Errorf("must enable kubelet readonly port if proxy ssh-tunneling is specified") } // Set up the nodeTunneler // TODO(cjcullen): If we want this to handle per-kubelet ports or other @@ -211,7 +232,7 @@ func Run(s *options.ServerRunOptions) error { storageGroupsToEncodingVersion, err := s.StorageSerialization.StorageGroupsToEncodingVersion() if err != nil { - return fmt.Errorf("error generating storage version map: %s", err) + return nil, nil, fmt.Errorf("error generating storage version map: %s", err) } storageFactory, err := kubeapiserver.NewStorageFactory( s.Etcd.StorageConfig, s.Etcd.DefaultStorageMediaType, api.Codecs, @@ -220,7 +241,7 @@ func Run(s *options.ServerRunOptions) error { []schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")}, master.DefaultAPIResourceConfigSource(), s.APIEnablement.RuntimeConfig) if err != nil { - return fmt.Errorf("error in initializing storage factory: %s", err) + return nil, nil, fmt.Errorf("error in initializing storage factory: %s", err) } for _, override := range s.Etcd.EtcdServersOverrides { tokens := strings.Split(override, "#") @@ -257,7 +278,7 @@ func Run(s *options.ServerRunOptions) error { // go directly to etcd to avoid recursive auth insanity storageConfig, err := storageFactory.NewConfig(api.Resource("serviceaccounts")) if err != nil { - return fmt.Errorf("unable to get serviceaccounts storage: %v", err) + return nil, nil, fmt.Errorf("unable to get serviceaccounts storage: %v", err) } authenticatorConfig.ServiceAccountTokenGetter = serviceaccountcontroller.NewGetterFromStorageInterface(storageConfig, storageFactory.ResourcePrefix(api.Resource("serviceaccounts")), storageFactory.ResourcePrefix(api.Resource("secrets"))) } @@ -266,7 +287,7 @@ func Run(s *options.ServerRunOptions) error { if err != nil { kubeAPIVersions := os.Getenv("KUBE_API_VERSIONS") if len(kubeAPIVersions) == 0 { - return fmt.Errorf("failed to create clientset: %v", err) + return nil, nil, fmt.Errorf("failed to create clientset: %v", err) } // KUBE_API_VERSIONS is used in test-update-storage-objects.sh, disabling a number of API @@ -289,24 +310,24 @@ func Run(s *options.ServerRunOptions) error { apiAuthenticator, securityDefinitions, err := authenticatorConfig.New() if err != nil { - return fmt.Errorf("invalid authentication config: %v", err) + return nil, nil, fmt.Errorf("invalid authentication config: %v", err) } authorizationConfig := s.Authorization.ToAuthorizationConfig(sharedInformers) apiAuthorizer, err := authorizationConfig.New() if err != nil { - return fmt.Errorf("invalid Authorization Config: %v", err) + return nil, nil, fmt.Errorf("invalid Authorization Config: %v", err) } admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",") pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer) admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile) if err != nil { - return fmt.Errorf("failed to read plugin config: %v", err) + return nil, nil, fmt.Errorf("failed to read plugin config: %v", err) } admissionController, err := admission.NewFromPlugins(admissionControlPluginNames, admissionConfigProvider, pluginInitializer) if err != nil { - return fmt.Errorf("failed to initialize plugins: %v", err) + return nil, nil, fmt.Errorf("failed to initialize plugins: %v", err) } proxyTransport := utilnet.SetTransportDefaults(&http.Transport{ @@ -331,16 +352,16 @@ func Run(s *options.ServerRunOptions) error { ) if err := s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); err != nil { - return err + return nil, nil, err } clientCA, err := readCAorNil(s.Authentication.ClientCert.ClientCA) if err != nil { - return err + return nil, nil, err } requestHeaderProxyCA, err := readCAorNil(s.Authentication.RequestHeader.ClientCAFile) if err != nil { - return err + return nil, nil, err } config := &master.Config{ GenericConfig: genericConfig, @@ -381,14 +402,7 @@ func Run(s *options.ServerRunOptions) error { cachesize.SetWatchCacheSizes(s.GenericServerRunOptions.WatchCacheSizes) } - m, err := config.Complete().New() - if err != nil { - return err - } - - sharedInformers.Start(wait.NeverStop) - m.GenericAPIServer.PrepareRun().Run(wait.NeverStop) - return nil + return config, sharedInformers, nil } func readCAorNil(file string) ([]byte, error) { diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index bc25d2f739..25276dd0d3 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -213,8 +213,7 @@ func Run(s *options.ServerRunOptions) error { installAutoscalingAPIs(m, genericConfig.RESTOptionsGetter) sharedInformers.Start(wait.NeverStop) - m.PrepareRun().Run(wait.NeverStop) - return nil + return m.PrepareRun().Run(wait.NeverStop) } // PostProcessSpec adds removed definitions for backward compatibility diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 4babef1bf1..3a9cec73bc 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -189,16 +189,16 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the http servers (secure and insecure). It only returns if stopCh is closed // or one of the ports cannot be listened on initially. -func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) { +func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { if s.SecureServingInfo != nil && s.Handler != nil { if err := s.serveSecurely(stopCh); err != nil { - glog.Fatal(err) + return err } } if s.InsecureServingInfo != nil && s.InsecureHandler != nil { if err := s.serveInsecurely(stopCh); err != nil { - glog.Fatal(err) + return err } } @@ -210,6 +210,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) { } <-stopCh + return nil } // EffectiveSecurePort returns the secure port we bound to. diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go index a828c56e63..ea615b7853 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/serving_test.go @@ -448,114 +448,121 @@ NextTest: } stopCh := make(chan struct{}) + func() { + defer close(stopCh) - // launch server - config := setUp(t) + // launch server + config := setUp(t) - v := fakeVersion() - config.Version = &v + v := fakeVersion() + config.Version = &v - config.EnableIndex = true - secureOptions := &SecureServingOptions{ - ServingOptions: ServingOptions{ - BindAddress: net.ParseIP("127.0.0.1"), - BindPort: 6443, - }, - ServerCert: GeneratableKeyCert{ - CertKey: CertKey{ - CertFile: serverCertBundleFile, - KeyFile: serverKeyFile, + config.EnableIndex = true + secureOptions := &SecureServingOptions{ + ServingOptions: ServingOptions{ + BindAddress: net.ParseIP("127.0.0.1"), + BindPort: 6443, }, - }, - SNICertKeys: namedCertKeys, - } - config.LoopbackClientConfig = &restclient.Config{} - if err := secureOptions.ApplyTo(&config); err != nil { - t.Errorf("%q - failed applying the SecureServingOptions: %v", title, err) - continue NextTest - } - config.InsecureServingInfo = nil - - s, err := config.Complete().New() - if err != nil { - t.Errorf("%q - failed creating the server: %v", title, err) - continue NextTest - } - - // patch in a 0-port to enable auto port allocation - s.SecureServingInfo.BindAddress = "127.0.0.1:0" - - // add poststart hook to know when the server is up. - startedCh := make(chan struct{}) - s.AddPostStartHook("test-notifier", func(context PostStartHookContext) error { - close(startedCh) - return nil - }) - preparedServer := s.PrepareRun() - go preparedServer.Run(stopCh) - - // load ca certificates into a pool - roots := x509.NewCertPool() - for _, caCert := range caCerts { - roots.AddCert(caCert) - } - - <-startedCh - - effectiveSecurePort := fmt.Sprintf("%d", preparedServer.EffectiveSecurePort()) - // try to dial - addr := fmt.Sprintf("localhost:%s", effectiveSecurePort) - t.Logf("Dialing %s as %q", addr, test.ServerName) - conn, err := tls.Dial("tcp", addr, &tls.Config{ - RootCAs: roots, - ServerName: test.ServerName, // used for SNI in the client HELLO packet - }) - if err != nil { - t.Errorf("%q - failed to connect: %v", title, err) - continue NextTest - } - - // check returned server certificate - sig := x509CertSignature(conn.ConnectionState().PeerCertificates[0]) - gotCertIndex, found := signatures[sig] - if !found { - t.Errorf("%q - unknown signature returned from server: %s", title, sig) - } - if gotCertIndex != test.ExpectedCertIndex { - t.Errorf("%q - expected cert index %d, got cert index %d", title, test.ExpectedCertIndex, gotCertIndex) - } - - conn.Close() - - // check that the loopback client can connect - host := "127.0.0.1" - if len(test.LoopbackClientBindAddressOverride) != 0 { - host = test.LoopbackClientBindAddressOverride - } - s.LoopbackClientConfig.Host = net.JoinHostPort(host, effectiveSecurePort) - if test.ExpectLoopbackClientError { - if err == nil { - t.Errorf("%q - expected error creating loopback client config", title) + ServerCert: GeneratableKeyCert{ + CertKey: CertKey{ + CertFile: serverCertBundleFile, + KeyFile: serverKeyFile, + }, + }, + SNICertKeys: namedCertKeys, } - continue NextTest - } - if err != nil { - t.Errorf("%q - failed creating loopback client config: %v", title, err) - continue NextTest - } - client, err := discovery.NewDiscoveryClientForConfig(s.LoopbackClientConfig) - if err != nil { - t.Errorf("%q - failed to create loopback client: %v", title, err) - continue NextTest - } - got, err := client.ServerVersion() - if err != nil { - t.Errorf("%q - failed to connect with loopback client: %v", title, err) - continue NextTest - } - if expected := &v; !reflect.DeepEqual(got, expected) { - t.Errorf("%q - loopback client didn't get correct version info: expected=%v got=%v", title, expected, got) - } + config.LoopbackClientConfig = &restclient.Config{} + if err := secureOptions.ApplyTo(&config); err != nil { + t.Errorf("%q - failed applying the SecureServingOptions: %v", title, err) + return + } + config.InsecureServingInfo = nil + + s, err := config.Complete().New() + if err != nil { + t.Errorf("%q - failed creating the server: %v", title, err) + return + } + + // patch in a 0-port to enable auto port allocation + s.SecureServingInfo.BindAddress = "127.0.0.1:0" + + // add poststart hook to know when the server is up. + startedCh := make(chan struct{}) + s.AddPostStartHook("test-notifier", func(context PostStartHookContext) error { + close(startedCh) + return nil + }) + preparedServer := s.PrepareRun() + go func() { + if err := preparedServer.Run(stopCh); err != nil { + t.Fatal(err) + } + }() + + // load ca certificates into a pool + roots := x509.NewCertPool() + for _, caCert := range caCerts { + roots.AddCert(caCert) + } + + <-startedCh + + effectiveSecurePort := fmt.Sprintf("%d", preparedServer.EffectiveSecurePort()) + // try to dial + addr := fmt.Sprintf("localhost:%s", effectiveSecurePort) + t.Logf("Dialing %s as %q", addr, test.ServerName) + conn, err := tls.Dial("tcp", addr, &tls.Config{ + RootCAs: roots, + ServerName: test.ServerName, // used for SNI in the client HELLO packet + }) + if err != nil { + t.Errorf("%q - failed to connect: %v", title, err) + return + } + + // check returned server certificate + sig := x509CertSignature(conn.ConnectionState().PeerCertificates[0]) + gotCertIndex, found := signatures[sig] + if !found { + t.Errorf("%q - unknown signature returned from server: %s", title, sig) + } + if gotCertIndex != test.ExpectedCertIndex { + t.Errorf("%q - expected cert index %d, got cert index %d", title, test.ExpectedCertIndex, gotCertIndex) + } + + conn.Close() + + // check that the loopback client can connect + host := "127.0.0.1" + if len(test.LoopbackClientBindAddressOverride) != 0 { + host = test.LoopbackClientBindAddressOverride + } + s.LoopbackClientConfig.Host = net.JoinHostPort(host, effectiveSecurePort) + if test.ExpectLoopbackClientError { + if err == nil { + t.Errorf("%q - expected error creating loopback client config", title) + } + return + } + if err != nil { + t.Errorf("%q - failed creating loopback client config: %v", title, err) + return + } + client, err := discovery.NewDiscoveryClientForConfig(s.LoopbackClientConfig) + if err != nil { + t.Errorf("%q - failed to create loopback client: %v", title, err) + return + } + got, err := client.ServerVersion() + if err != nil { + t.Errorf("%q - failed to connect with loopback client: %v", title, err) + return + } + if expected := &v; !reflect.DeepEqual(got, expected) { + t.Errorf("%q - loopback client didn't get correct version info: expected=%v got=%v", title, expected, got) + } + }() } } diff --git a/staging/src/k8s.io/kube-aggregator/main.go b/staging/src/k8s.io/kube-aggregator/main.go index b61a259548..7ab11c0d8f 100644 --- a/staging/src/k8s.io/kube-aggregator/main.go +++ b/staging/src/k8s.io/kube-aggregator/main.go @@ -21,6 +21,7 @@ import ( "os" "runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/logs" "k8s.io/kube-aggregator/pkg/cmd/server" @@ -40,7 +41,7 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) } - cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr) + cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, wait.NeverStop) cmd.Flags().AddGoFlagSet(flag.CommandLine) if err := cmd.Execute(); err != nil { panic(err) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index dc59e63997..142882a89e 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -22,7 +22,6 @@ import ( "time" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" genericapifilters "k8s.io/apiserver/pkg/endpoints/filters" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -106,7 +105,7 @@ func (c *Config) SkipComplete() completedConfig { } // New returns a new instance of APIAggregator from the given config. -func (c completedConfig) New() (*APIAggregator, error) { +func (c completedConfig) New(stopCh <-chan struct{}) (*APIAggregator, error) { informerFactory := informers.NewSharedInformerFactory( internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig), 5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on. @@ -154,12 +153,12 @@ func (c completedConfig) New() (*APIAggregator, error) { apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s) s.GenericAPIServer.AddPostStartHook("start-informers", func(context genericapiserver.PostStartHookContext) error { - informerFactory.Start(wait.NeverStop) - kubeInformers.Start(wait.NeverStop) + informerFactory.Start(stopCh) + kubeInformers.Start(stopCh) return nil }) s.GenericAPIServer.AddPostStartHook("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { - apiserviceRegistrationController.Run(wait.NeverStop) + apiserviceRegistrationController.Run(stopCh) return nil }) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go index 2afc02be84..4b04b92ac4 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/cmd/server/start.go @@ -25,7 +25,6 @@ import ( "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/filters" genericoptions "k8s.io/apiserver/pkg/server/options" @@ -56,7 +55,7 @@ type AggregatorOptions struct { } // NewCommandStartAggregator provides a CLI handler for 'start master' command -func NewCommandStartAggregator(out, err io.Writer) *cobra.Command { +func NewCommandStartAggregator(out, err io.Writer, stopCh <-chan struct{}) *cobra.Command { o := NewDefaultOptions(out, err) cmd := &cobra.Command{ @@ -69,7 +68,7 @@ func NewCommandStartAggregator(out, err io.Writer) *cobra.Command { if err := o.Validate(args); err != nil { return err } - if err := o.RunAggregator(); err != nil { + if err := o.RunAggregator(stopCh); err != nil { return err } return nil @@ -110,7 +109,7 @@ func (o *AggregatorOptions) Complete() error { return nil } -func (o AggregatorOptions) RunAggregator() error { +func (o AggregatorOptions) RunAggregator(stopCh <-chan struct{}) error { // TODO have a "real" external address if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil { return fmt.Errorf("error creating self-signed certificates: %v", err) @@ -161,11 +160,9 @@ func (o AggregatorOptions) RunAggregator() error { return err } - server, err := config.Complete().New() + server, err := config.Complete().New(stopCh) if err != nil { return err } - server.GenericAPIServer.PrepareRun().Run(wait.NeverStop) - - return nil + return server.GenericAPIServer.PrepareRun().Run(stopCh) } diff --git a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go index f4c5780e94..53bf023095 100644 --- a/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/sample-apiserver/pkg/cmd/server/start.go @@ -19,6 +19,7 @@ package server import ( "fmt" "io" + "net" "github.com/spf13/cobra" @@ -85,7 +86,7 @@ func (o *WardleServerOptions) Complete() error { func (o WardleServerOptions) Config() (*apiserver.Config, error) { // TODO have a "real" external address - if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"); err != nil { + if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", net.ParseIP("127.0.0.1")); err != nil { return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } @@ -110,7 +111,5 @@ func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { if err != nil { return err } - server.GenericAPIServer.PrepareRun().Run(stopCh) - - return nil + return server.GenericAPIServer.PrepareRun().Run(stopCh) } diff --git a/test/integration/examples/BUILD b/test/integration/examples/BUILD index 64e0a428dc..39c6b0caa2 100644 --- a/test/integration/examples/BUILD +++ b/test/integration/examples/BUILD @@ -12,13 +12,20 @@ go_test( srcs = ["apiserver_test.go"], tags = ["automanaged"], deps = [ + "//cmd/kube-apiserver/app:go_default_library", + "//cmd/kube-apiserver/app/options:go_default_library", "//test/integration/framework:go_default_library", - "//vendor:github.com/golang/glog", "//vendor:github.com/stretchr/testify/assert", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/client-go/kubernetes", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/tools/clientcmd", "//vendor:k8s.io/client-go/tools/clientcmd/api", + "//vendor:k8s.io/client-go/util/cert", + "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1", + "//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset", + "//vendor:k8s.io/kube-aggregator/pkg/cmd/server", "//vendor:k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1", "//vendor:k8s.io/sample-apiserver/pkg/cmd/server", ], diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 40fc308e0e..31fa67acb6 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -17,29 +17,37 @@ limitations under the License. package apiserver import ( - "crypto/tls" + "crypto/x509" "encoding/json" "fmt" "io/ioutil" - "net/http" + "net" "os" + "path" + "strconv" + "strings" "testing" "time" - "github.com/golang/glog" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + client "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/util/cert" + apiregistrationv1alpha1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1" + aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + kubeaggregatorserver "k8s.io/kube-aggregator/pkg/cmd/server" + "k8s.io/kubernetes/cmd/kube-apiserver/app" + "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/test/integration/framework" "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" - "k8s.io/sample-apiserver/pkg/cmd/server" + sampleserver "k8s.io/sample-apiserver/pkg/cmd/server" ) -const securePort = "6444" - var groupVersion = v1alpha1.SchemeGroupVersion var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{ @@ -47,41 +55,277 @@ var groupVersionForDiscovery = metav1.GroupVersionForDiscovery{ Version: groupVersion.Version, } -func TestRunServer(t *testing.T) { - masterConfig := framework.NewIntegrationTestMasterConfig() - _, s := framework.RunAMaster(masterConfig) - defer s.Close() +func localPort() (int, error) { + l, err := net.Listen("tcp", ":0") + if err != nil { + return 0, err + } + defer l.Close() + addr := strings.Split(l.Addr().String(), ":") + port, err := strconv.Atoi(addr[len(addr)-1]) + if err != nil { + return 0, err + } + return port, nil +} - adminKubeConfig := createKubeConfig(masterConfig.GenericConfig.LoopbackClientConfig) +func TestAggregatedAPIServer(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + + kubePort, err := localPort() + if err != nil { + t.Fatal(err) + } + certDir, _ := ioutil.TempDir("", "test-integration-apiserver") + defer os.RemoveAll(certDir) + _, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24") + proxySigningKey, err := cert.NewPrivateKey() + + if err != nil { + t.Fatal(err) + } + proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) + if err != nil { + t.Fatal(err) + } + proxyCACertFile, _ := ioutil.TempFile(certDir, "proxy-ca.crt") + if err := ioutil.WriteFile(proxyCACertFile.Name(), cert.EncodeCertPEM(proxySigningCert), 0644); err != nil { + t.Fatal(err) + } + + kubeAPIServerOptions := options.NewServerRunOptions() + kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1") + kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort + kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir + kubeAPIServerOptions.InsecureServing.BindPort = 0 + kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()} + kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange + kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} + kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} + kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"} + kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"} + kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name() + kubeAPIServerOptions.Authorization.Mode = "RBAC" + + config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + + go func() { + for { + if err := app.RunServer(config, sharedInformers, stopCh); err != nil { + t.Log(err) + } + time.Sleep(100 * time.Millisecond) + } + }() + + // just use json because everyone speaks it + config.GenericConfig.LoopbackClientConfig.ContentType = "" + config.GenericConfig.LoopbackClientConfig.AcceptContentTypes = "" + kubeClient := client.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) + err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if _, err := kubeClient.Discovery().ServerVersion(); err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config + adminKubeConfig := createKubeConfig(config.GenericConfig.LoopbackClientConfig) kubeconfigFile, _ := ioutil.TempFile("", "") defer os.Remove(kubeconfigFile.Name()) clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name()) - // Avoid default cert-dir of /var/run/kubernetes to allow this to run on darwin - certDir, _ := ioutil.TempDir("", "test-integration-apiserver") - defer os.Remove(certDir) - - stopCh := make(chan struct{}) - defer close(stopCh) - cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh) - cmd.SetArgs([]string{ - "--secure-port", securePort, - "--requestheader-username-headers", "", + // start the wardle server to prove we can aggregate it + wardlePort, err := localPort() + if err != nil { + t.Fatal(err) + } + wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server") + defer os.RemoveAll(wardleCertDir) + wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh) + wardleCmd.SetArgs([]string{ + "--bind-address", "127.0.0.1", + "--secure-port", strconv.Itoa(wardlePort), + "--requestheader-username-headers=X-Remote-User", + "--requestheader-group-headers=X-Remote-Group", + "--requestheader-extra-headers-prefix=X-Remote-Extra-", + "--requestheader-client-ca-file=" + proxyCACertFile.Name(), + "--requestheader-allowed-names=kube-aggregator", "--authentication-kubeconfig", kubeconfigFile.Name(), "--authorization-kubeconfig", kubeconfigFile.Name(), "--etcd-servers", framework.GetEtcdURLFromEnv(), - "--cert-dir", certDir, + "--cert-dir", wardleCertDir, }) - go cmd.Execute() + go func() { + for { + if err := wardleCmd.Execute(); err != nil { + t.Log(err) + } + time.Sleep(100 * time.Millisecond) + } + }() - serverLocation := fmt.Sprintf("https://localhost:%s", securePort) - if err := waitForApiserverUp(serverLocation); err != nil { - t.Fatalf("%v", err) + wardleClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig) + wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort) + wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt") + wardleClientConfig.CAData = nil + wardleClientConfig.ServerName = "" + wardleClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken + var wardleClient client.Interface + err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + wardleClient, err = client.NewForConfig(wardleClientConfig) + if err != nil { + // this happens if we race the API server for writing the cert + t.Log(err) + return false, nil + } + if _, err := wardleClient.Discovery().ServerVersion(); err != nil { + t.Log(err) + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatal(err) } - testAPIGroupList(t, serverLocation) - testAPIGroup(t, serverLocation) - testAPIResourceList(t, serverLocation) + // start the aggregator + aggregatorPort, err := localPort() + if err != nil { + t.Fatal(err) + } + aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator") + defer os.RemoveAll(aggregatorCertDir) + proxyClientKey, err := cert.NewPrivateKey() + if err != nil { + t.Fatal(err) + } + proxyClientCert, err := cert.NewSignedCert( + cert.Config{ + CommonName: "kube-aggregator", + Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + }, + proxyClientKey, proxySigningCert, proxySigningKey, + ) + proxyClientCertFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.crt") + proxyClientKeyFile, _ := ioutil.TempFile(aggregatorCertDir, "proxy-client.key") + if err := ioutil.WriteFile(proxyClientCertFile.Name(), cert.EncodeCertPEM(proxyClientCert), 0600); err != nil { + t.Fatal(err) + } + if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil { + t.Fatal(err) + } + aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh) + aggregatorCmd.SetArgs([]string{ + "--bind-address", "127.0.0.1", + "--secure-port", strconv.Itoa(aggregatorPort), + "--requestheader-username-headers", "", + "--proxy-client-cert-file", proxyClientCertFile.Name(), + "--proxy-client-key-file", proxyClientKeyFile.Name(), + "--core-kubeconfig", kubeconfigFile.Name(), + "--authentication-kubeconfig", kubeconfigFile.Name(), + "--authorization-kubeconfig", kubeconfigFile.Name(), + "--etcd-servers", framework.GetEtcdURLFromEnv(), + "--cert-dir", aggregatorCertDir, + }) + go func() { + for { + if err := aggregatorCmd.Execute(); err != nil { + t.Log(err) + } + time.Sleep(100 * time.Millisecond) + } + }() + + aggregatorClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig) + aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", aggregatorPort) + aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt") + aggregatorClientConfig.CAData = nil + aggregatorClientConfig.ServerName = "" + aggregatorClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken + var aggregatorDiscoveryClient client.Interface + err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig) + if err != nil { + // this happens if we race the API server for writing the cert + return false, nil + } + if _, err := aggregatorDiscoveryClient.Discovery().ServerVersion(); err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatal(err) + } + + // now we're finally ready to test. These are what's run by defautl now + testAPIGroupList(t, wardleClient.Discovery().RESTClient()) + testAPIGroup(t, wardleClient.Discovery().RESTClient()) + testAPIResourceList(t, wardleClient.Discovery().RESTClient()) + + wardleCA, err := ioutil.ReadFile(wardleClientConfig.CAFile) + if err != nil { + t.Fatal(err) + } + aggregatorClient := aggregatorclient.NewForConfigOrDie(aggregatorClientConfig) + _, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.k8s.io"}, + Spec: apiregistrationv1alpha1.APIServiceSpec{ + Service: apiregistrationv1alpha1.ServiceReference{ + Namespace: "kube-wardle", + Name: "api", + }, + Group: "wardle.k8s.io", + Version: "v1alpha1", + CABundle: wardleCA, + Priority: 200, + }, + }) + if err != nil { + t.Fatal(err) + } + + // this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery + // (the service is missing), we don't have an external signal. + time.Sleep(100 * time.Millisecond) + if _, err := aggregatorDiscoveryClient.Discovery().ServerResources(); err != nil { + t.Fatal(err) + } + + _, err = aggregatorClient.ApiregistrationV1alpha1().APIServices().Create(&apiregistrationv1alpha1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1."}, + Spec: apiregistrationv1alpha1.APIServiceSpec{ + Service: apiregistrationv1alpha1.ServiceReference{ + Namespace: "default", + Name: "kubernetes", + }, + Group: "", + Version: "v1", + CABundle: config.GenericConfig.LoopbackClientConfig.CAData, + Priority: 100, + }, + }) + if err != nil { + t.Fatal(err) + } + + // this is ugly, but sleep just a little bit so that the watch is probably observed. Since nothing will actually be added to discovery + // (the service is missing), we don't have an external signal. + time.Sleep(100 * time.Millisecond) + _, err = aggregatorDiscoveryClient.Discovery().ServerResources() + if err != nil && !strings.Contains(err.Error(), "lookup kubernetes.default.svc") { + t.Fatal(err) + } + + // TODO figure out how to turn on enough of services and dns to run more } func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { @@ -124,46 +368,12 @@ func createKubeConfig(clientCfg *rest.Config) *clientcmdapi.Config { return config } -func waitForApiserverUp(serverLocation string) error { - for start := time.Now(); time.Since(start) < 10*time.Second; time.Sleep(5 * time.Second) { - glog.Errorf("Waiting for : %#v", serverLocation) - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - _, err := client.Get(serverLocation) - if err == nil { - return nil - } - } - return fmt.Errorf("waiting for apiserver timed out") +func readResponse(client rest.Interface, location string) ([]byte, error) { + return client.Get().AbsPath(location).DoRaw() } -func readResponse(serverURL string) ([]byte, error) { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client := &http.Client{Transport: tr} - response, err := client.Get(serverURL) - if err != nil { - glog.Errorf("http get err code : %#v", err) - return nil, fmt.Errorf("Error in fetching %s: %v", serverURL, err) - } - defer response.Body.Close() - glog.Errorf("http get response code : %#v", response.StatusCode) - if response.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status: %d for URL: %s, expected status: %d", response.StatusCode, serverURL, http.StatusOK) - } - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, fmt.Errorf("Error reading response from %s: %v", serverURL, err) - } - return contents, nil -} - -func testAPIGroupList(t *testing.T, serverLocation string) { - serverURL := serverLocation + "/apis" - contents, err := readResponse(serverURL) +func testAPIGroupList(t *testing.T, client rest.Interface) { + contents, err := readResponse(client, "/apis") if err != nil { t.Fatalf("%v", err) } @@ -171,7 +381,7 @@ func testAPIGroupList(t *testing.T, serverLocation string) { var apiGroupList metav1.APIGroupList err = json.Unmarshal(contents, &apiGroupList) if err != nil { - t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err) + t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis", err) } assert.Equal(t, 1, len(apiGroupList.Groups)) assert.Equal(t, groupVersion.Group, apiGroupList.Groups[0].Name) @@ -180,9 +390,8 @@ func testAPIGroupList(t *testing.T, serverLocation string) { assert.Equal(t, groupVersionForDiscovery, apiGroupList.Groups[0].PreferredVersion) } -func testAPIGroup(t *testing.T, serverLocation string) { - serverURL := serverLocation + "/apis/wardle.k8s.io" - contents, err := readResponse(serverURL) +func testAPIGroup(t *testing.T, client rest.Interface) { + contents, err := readResponse(client, "/apis/wardle.k8s.io") if err != nil { t.Fatalf("%v", err) } @@ -190,7 +399,7 @@ func testAPIGroup(t *testing.T, serverLocation string) { var apiGroup metav1.APIGroup err = json.Unmarshal(contents, &apiGroup) if err != nil { - t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err) + t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io", err) } assert.Equal(t, groupVersion.Group, apiGroup.Name) assert.Equal(t, 1, len(apiGroup.Versions)) @@ -199,9 +408,8 @@ func testAPIGroup(t *testing.T, serverLocation string) { assert.Equal(t, apiGroup.PreferredVersion, apiGroup.Versions[0]) } -func testAPIResourceList(t *testing.T, serverLocation string) { - serverURL := serverLocation + "/apis/wardle.k8s.io/v1alpha1" - contents, err := readResponse(serverURL) +func testAPIResourceList(t *testing.T, client rest.Interface) { + contents, err := readResponse(client, "/apis/wardle.k8s.io/v1alpha1") if err != nil { t.Fatalf("%v", err) } @@ -209,10 +417,15 @@ func testAPIResourceList(t *testing.T, serverLocation string) { var apiResourceList metav1.APIResourceList err = json.Unmarshal(contents, &apiResourceList) if err != nil { - t.Fatalf("Error in unmarshalling response from server %s: %v", serverURL, err) + t.Fatalf("Error in unmarshalling response from server %s: %v", "/apis/wardle.k8s.io/v1alpha1", err) } assert.Equal(t, groupVersion.String(), apiResourceList.GroupVersion) assert.Equal(t, 1, len(apiResourceList.APIResources)) assert.Equal(t, "flunders", apiResourceList.APIResources[0].Name) assert.True(t, apiResourceList.APIResources[0].Namespaced) } + +const ( + policyCachePollInterval = 100 * time.Millisecond + policyCachePollTimeout = 5 * time.Second +) diff --git a/vendor/BUILD b/vendor/BUILD index 252f530c8b..adfd3b2f0b 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -15811,6 +15811,7 @@ go_library( srcs = ["k8s.io/kube-aggregator/main.go"], tags = ["automanaged"], deps = [ + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/util/logs", "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/install", "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration/validation", @@ -16270,7 +16271,6 @@ go_library( "//vendor:github.com/spf13/cobra", "//vendor:github.com/spf13/pflag", "//vendor:k8s.io/apimachinery/pkg/util/sets", - "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/server", "//vendor:k8s.io/apiserver/pkg/server/filters", "//vendor:k8s.io/apiserver/pkg/server/options",