From 037f51ae891050c2187394dc7a824300572dd013 Mon Sep 17 00:00:00 2001 From: deads2k Date: Wed, 1 Mar 2017 08:27:16 -0500 Subject: [PATCH] get fresh ports on startup failure for integration test --- test/integration/examples/apiserver_test.go | 171 +++++++++++--------- 1 file changed, 98 insertions(+), 73 deletions(-) diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index f0dec1955d..8c2621b5ac 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -26,6 +26,7 @@ import ( "path" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -73,10 +74,6 @@ func TestAggregatedAPIServer(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - kubePort, err := localPort() - if err != nil { - t.Fatal(err) - } certDir, _ := ioutil.TempDir("", "test-integration-apiserver") defer os.RemoveAll(certDir) _, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24") @@ -105,28 +102,36 @@ func TestAggregatedAPIServer(t *testing.T) { t.Fatal(err) } - kubeAPIServerOptions := options.NewServerRunOptions() - kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1") - kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort - kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir - kubeAPIServerOptions.InsecureServing.BindPort = 0 - kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()} - kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange - kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} - kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} - kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"} - kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"} - kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name() - kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() - kubeAPIServerOptions.Authorization.Mode = "RBAC" - - config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions) - if err != nil { - t.Fatal(err) - } - + kubeClientConfigValue := atomic.Value{} go func() { for { + // always get a fresh port in case something claimed the old one + kubePort, err := localPort() + if err != nil { + t.Fatal(err) + } + + kubeAPIServerOptions := options.NewServerRunOptions() + kubeAPIServerOptions.SecureServing.ServingOptions.BindAddress = net.ParseIP("127.0.0.1") + kubeAPIServerOptions.SecureServing.ServingOptions.BindPort = kubePort + kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir + kubeAPIServerOptions.InsecureServing.BindPort = 0 + kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()} + kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange + kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} + kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} + kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"} + kubeAPIServerOptions.Authentication.RequestHeader.AllowedNames = []string{"kube-aggregator"} + kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name() + kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name() + kubeAPIServerOptions.Authorization.Mode = "RBAC" + + config, sharedInformers, err := app.BuildMasterConfig(kubeAPIServerOptions) + if err != nil { + t.Fatal(err) + } + kubeClientConfigValue.Store(config.GenericConfig.LoopbackClientConfig) + if err := app.RunServer(config, sharedInformers, stopCh); err != nil { t.Log(err) } @@ -135,10 +140,20 @@ func TestAggregatedAPIServer(t *testing.T) { }() // just use json because everyone speaks it - config.GenericConfig.LoopbackClientConfig.ContentType = "" - config.GenericConfig.LoopbackClientConfig.AcceptContentTypes = "" - kubeClient := client.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + obj := kubeClientConfigValue.Load() + if obj == nil { + return false, nil + } + kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config) + kubeClientConfig.ContentType = "" + kubeClientConfig.AcceptContentTypes = "" + kubeClient, err := client.NewForConfig(kubeClientConfig) + if err != nil { + // this happens because we race the API server start + t.Log(err) + return false, nil + } if _, err := kubeClient.Discovery().ServerVersion(); err != nil { return false, nil } @@ -148,35 +163,41 @@ func TestAggregatedAPIServer(t *testing.T) { t.Fatal(err) } + // after this point we won't be mutating, so the race detector will be fine + kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config) + // write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config - adminKubeConfig := createKubeConfig(config.GenericConfig.LoopbackClientConfig) + adminKubeConfig := createKubeConfig(kubeClientConfig) kubeconfigFile, _ := ioutil.TempFile("", "") defer os.Remove(kubeconfigFile.Name()) clientcmd.WriteToFile(*adminKubeConfig, kubeconfigFile.Name()) - - // start the wardle server to prove we can aggregate it - wardlePort, err := localPort() - if err != nil { - t.Fatal(err) - } wardleCertDir, _ := ioutil.TempDir("", "test-integration-wardle-server") defer os.RemoveAll(wardleCertDir) - wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh) - wardleCmd.SetArgs([]string{ - "--bind-address", "127.0.0.1", - "--secure-port", strconv.Itoa(wardlePort), - "--requestheader-username-headers=X-Remote-User", - "--requestheader-group-headers=X-Remote-Group", - "--requestheader-extra-headers-prefix=X-Remote-Extra-", - "--requestheader-client-ca-file=" + proxyCACertFile.Name(), - "--requestheader-allowed-names=kube-aggregator", - "--authentication-kubeconfig", kubeconfigFile.Name(), - "--authorization-kubeconfig", kubeconfigFile.Name(), - "--etcd-servers", framework.GetEtcdURLFromEnv(), - "--cert-dir", wardleCertDir, - }) + wardlePort := new(int32) + + // start the wardle server to prove we can aggregate it go func() { for { + // always get a fresh port in case something claimed the old one + wardlePortInt, err := localPort() + if err != nil { + t.Fatal(err) + } + atomic.StoreInt32(wardlePort, int32(wardlePortInt)) + wardleCmd := sampleserver.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh) + wardleCmd.SetArgs([]string{ + "--bind-address", "127.0.0.1", + "--secure-port", strconv.Itoa(wardlePortInt), + "--requestheader-username-headers=X-Remote-User", + "--requestheader-group-headers=X-Remote-Group", + "--requestheader-extra-headers-prefix=X-Remote-Extra-", + "--requestheader-client-ca-file=" + proxyCACertFile.Name(), + "--requestheader-allowed-names=kube-aggregator", + "--authentication-kubeconfig", kubeconfigFile.Name(), + "--authorization-kubeconfig", kubeconfigFile.Name(), + "--etcd-servers", framework.GetEtcdURLFromEnv(), + "--cert-dir", wardleCertDir, + }) if err := wardleCmd.Execute(); err != nil { t.Log(err) } @@ -184,17 +205,17 @@ func TestAggregatedAPIServer(t *testing.T) { } }() - wardleClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig) - wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort) + wardleClientConfig := rest.AnonymousClientConfig(kubeClientConfig) wardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt") wardleClientConfig.CAData = nil wardleClientConfig.ServerName = "" - wardleClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken + wardleClientConfig.BearerToken = kubeClientConfig.BearerToken var wardleClient client.Interface err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + wardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(wardlePort)) wardleClient, err = client.NewForConfig(wardleClientConfig) if err != nil { - // this happens if we race the API server for writing the cert + // this happens because we race the API server start t.Log(err) return false, nil } @@ -209,10 +230,6 @@ func TestAggregatedAPIServer(t *testing.T) { } // start the aggregator - aggregatorPort, err := localPort() - if err != nil { - t.Fatal(err) - } aggregatorCertDir, _ := ioutil.TempDir("", "test-integration-aggregator") defer os.RemoveAll(aggregatorCertDir) proxyClientKey, err := cert.NewPrivateKey() @@ -234,21 +251,29 @@ func TestAggregatedAPIServer(t *testing.T) { if err := ioutil.WriteFile(proxyClientKeyFile.Name(), cert.EncodePrivateKeyPEM(proxyClientKey), 0644); err != nil { t.Fatal(err) } - aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh) - aggregatorCmd.SetArgs([]string{ - "--bind-address", "127.0.0.1", - "--secure-port", strconv.Itoa(aggregatorPort), - "--requestheader-username-headers", "", - "--proxy-client-cert-file", proxyClientCertFile.Name(), - "--proxy-client-key-file", proxyClientKeyFile.Name(), - "--core-kubeconfig", kubeconfigFile.Name(), - "--authentication-kubeconfig", kubeconfigFile.Name(), - "--authorization-kubeconfig", kubeconfigFile.Name(), - "--etcd-servers", framework.GetEtcdURLFromEnv(), - "--cert-dir", aggregatorCertDir, - }) + aggregatorPort := new(int32) + go func() { for { + // always get a fresh port in case something claimed the old one + aggregatorPortInt, err := localPort() + if err != nil { + t.Fatal(err) + } + atomic.StoreInt32(aggregatorPort, int32(aggregatorPortInt)) + aggregatorCmd := kubeaggregatorserver.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh) + aggregatorCmd.SetArgs([]string{ + "--bind-address", "127.0.0.1", + "--secure-port", strconv.Itoa(aggregatorPortInt), + "--requestheader-username-headers", "", + "--proxy-client-cert-file", proxyClientCertFile.Name(), + "--proxy-client-key-file", proxyClientKeyFile.Name(), + "--core-kubeconfig", kubeconfigFile.Name(), + "--authentication-kubeconfig", kubeconfigFile.Name(), + "--authorization-kubeconfig", kubeconfigFile.Name(), + "--etcd-servers", framework.GetEtcdURLFromEnv(), + "--cert-dir", aggregatorCertDir, + }) if err := aggregatorCmd.Execute(); err != nil { t.Log(err) } @@ -256,14 +281,14 @@ func TestAggregatedAPIServer(t *testing.T) { } }() - aggregatorClientConfig := rest.AnonymousClientConfig(config.GenericConfig.LoopbackClientConfig) - aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", aggregatorPort) + aggregatorClientConfig := rest.AnonymousClientConfig(kubeClientConfig) aggregatorClientConfig.CAFile = path.Join(aggregatorCertDir, "apiserver.crt") aggregatorClientConfig.CAData = nil aggregatorClientConfig.ServerName = "" - aggregatorClientConfig.BearerToken = config.GenericConfig.LoopbackClientConfig.BearerToken + aggregatorClientConfig.BearerToken = kubeClientConfig.BearerToken var aggregatorDiscoveryClient client.Interface err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) { + aggregatorClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", atomic.LoadInt32(aggregatorPort)) aggregatorDiscoveryClient, err = client.NewForConfig(aggregatorClientConfig) if err != nil { // this happens if we race the API server for writing the cert @@ -321,7 +346,7 @@ func TestAggregatedAPIServer(t *testing.T) { }, Group: "", Version: "v1", - CABundle: config.GenericConfig.LoopbackClientConfig.CAData, + CABundle: kubeClientConfig.CAData, Priority: 100, }, })