diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 0bdb71e485..9f554210f0 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -335,6 +335,6 @@ func Run(s *options.ServerRunOptions) error { } sharedInformers.Start(wait.NeverStop) - m.GenericAPIServer.PrepareRun().Run() + m.GenericAPIServer.PrepareRun().Run(wait.NeverStop) return nil } diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index d1ff0d80b7..72d1f77f96 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -60,7 +60,7 @@ func NewServerRunOptions() *genericoptions.ServerRunOptions { return serverOptions } -func Run(serverOptions *genericoptions.ServerRunOptions) error { +func Run(serverOptions *genericoptions.ServerRunOptions, stopCh <-chan struct{}) error { // Set ServiceClusterIPRange _, serviceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24") serverOptions.ServiceClusterIPRange = *serviceClusterIPRange @@ -105,6 +105,6 @@ func Run(serverOptions *genericoptions.ServerRunOptions) error { if err := s.InstallAPIGroup(&apiGroupInfo); err != nil { return fmt.Errorf("Error in installing API: %v", err) } - s.PrepareRun().Run() + s.PrepareRun().Run(stopCh) return nil } diff --git a/examples/apiserver/server/main.go b/examples/apiserver/server/main.go index a145adba68..5c69edf708 100644 --- a/examples/apiserver/server/main.go +++ b/examples/apiserver/server/main.go @@ -19,6 +19,7 @@ package main import ( "k8s.io/kubernetes/examples/apiserver" "k8s.io/kubernetes/pkg/util/flag" + "k8s.io/kubernetes/pkg/util/wait" "github.com/golang/glog" "github.com/spf13/pflag" @@ -32,7 +33,7 @@ func main() { serverRunOptions.AddEtcdStorageFlags(pflag.CommandLine) flag.InitFlags() - if err := apiserver.Run(serverRunOptions); err != nil { + if err := apiserver.Run(serverRunOptions, wait.NeverStop); err != nil { glog.Fatalf("Error in bringing up the server: %v", err) } } diff --git a/federation/cmd/federation-apiserver/app/server.go b/federation/cmd/federation-apiserver/app/server.go index 3ee53be400..a1bd1e3bf6 100644 --- a/federation/cmd/federation-apiserver/app/server.go +++ b/federation/cmd/federation-apiserver/app/server.go @@ -225,7 +225,7 @@ func Run(s *options.ServerRunOptions) error { installExtensionsAPIs(m, restOptionsFactory) sharedInformers.Start(wait.NeverStop) - m.PrepareRun().Run() + m.PrepareRun().Run(wait.NeverStop) return nil } diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index b3d9f93f53..6e4a28cf62 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -17,7 +17,6 @@ limitations under the License. package genericapiserver import ( - "crypto/tls" "fmt" "mime" "net" @@ -44,9 +43,7 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver/openapi/common" "k8s.io/kubernetes/pkg/genericapiserver/routes" "k8s.io/kubernetes/pkg/runtime" - certutil "k8s.io/kubernetes/pkg/util/cert" utilnet "k8s.io/kubernetes/pkg/util/net" - utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/sets" ) @@ -115,6 +112,9 @@ type GenericAPIServer struct { SecureServingInfo *ServingInfo InsecureServingInfo *ServingInfo + // numerical ports, set after listening + effectiveSecurePort, effectiveInsecurePort int + // ExternalAddress is the address (hostname or IP and port) that should be used in // external (public internet) URLs for this GenericAPIServer. ExternalAddress string @@ -180,7 +180,6 @@ type preparedGenericAPIServer struct { // PrepareRun does post API installation setup steps. func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { - // install APIs which depend on other APIs to be installed if s.enableSwaggerSupport { routes.Swagger{ExternalAddress: s.ExternalAddress}.Install(s.HandlerContainer) } @@ -192,76 +191,18 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { return preparedGenericAPIServer{s} } -func (s preparedGenericAPIServer) Run() { +// Run spawns the http servers (secure and insecure). It only returns if stopCh is closed +// or one of the ports cannot be listened on initially. +func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) { if s.SecureServingInfo != nil && s.Handler != nil { - secureServer := &http.Server{ - Addr: s.SecureServingInfo.BindAddress, - Handler: s.Handler, - MaxHeaderBytes: 1 << 20, - TLSConfig: &tls.Config{ - // Can't use SSLv3 because of POODLE and BEAST - // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher - // Can't use TLSv1.1 because of RC4 cipher usage - MinVersion: tls.VersionTLS12, - }, + if err := s.serveSecurely(stopCh); err != nil { + glog.Fatal(err) } - - if len(s.SecureServingInfo.ClientCA) > 0 { - clientCAs, err := certutil.NewPool(s.SecureServingInfo.ClientCA) - if err != nil { - glog.Fatalf("Unable to load client CA file: %v", err) - } - // Populate PeerCertificates in requests, but don't reject connections without certificates - // This allows certificates to be validated by authenticators, while still allowing other auth types - secureServer.TLSConfig.ClientAuth = tls.RequestClientCert - // Specify allowed CAs for client certificates - secureServer.TLSConfig.ClientCAs = clientCAs - // "h2" NextProtos is necessary for enabling HTTP2 for go's 1.7 HTTP Server - secureServer.TLSConfig.NextProtos = []string{"h2"} - - } - - glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress) - go func() { - defer utilruntime.HandleCrash() - - for { - if err := secureServer.ListenAndServeTLS(s.SecureServingInfo.ServerCert.CertFile, s.SecureServingInfo.ServerCert.KeyFile); err != nil { - glog.Errorf("Unable to listen for secure (%v); will try again.", err) - } - time.Sleep(15 * time.Second) - } - }() } if s.InsecureServingInfo != nil && s.InsecureHandler != nil { - insecureServer := &http.Server{ - Addr: s.InsecureServingInfo.BindAddress, - Handler: s.InsecureHandler, - MaxHeaderBytes: 1 << 20, - } - glog.Infof("Serving insecurely on %s", s.InsecureServingInfo.BindAddress) - go func() { - defer utilruntime.HandleCrash() - - for { - if err := insecureServer.ListenAndServe(); err != nil { - glog.Errorf("Unable to listen for insecure (%v); will try again.", err) - } - time.Sleep(15 * time.Second) - } - }() - } - - // Attempt to verify the server came up for 20 seconds (100 tries * 100ms, 100ms timeout per try) per port - if s.SecureServingInfo != nil { - if err := waitForSuccessfulDial(true, "tcp", s.SecureServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100); err != nil { - glog.Fatalf("Secure server never started: %v", err) - } - } - if s.InsecureServingInfo != nil { - if err := waitForSuccessfulDial(false, "tcp", s.InsecureServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100); err != nil { - glog.Fatalf("Insecure server never started: %v", err) + if err := s.serveInsecurely(stopCh); err != nil { + glog.Fatal(err) } } @@ -272,7 +213,7 @@ func (s preparedGenericAPIServer) Run() { glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } - select {} + <-stopCh } // installAPIResources is a private method for installing the REST storage backing each api groupversionresource @@ -471,27 +412,3 @@ func NewDefaultAPIGroupInfo(group string) APIGroupInfo { NegotiatedSerializer: api.Codecs, } } - -// waitForSuccessfulDial attempts to connect to the given address, closing and returning nil on the first successful connection. -func waitForSuccessfulDial(https bool, network, address string, timeout, interval time.Duration, retries int) error { - var ( - conn net.Conn - err error - ) - for i := 0; i <= retries; i++ { - dialer := net.Dialer{Timeout: timeout} - if https { - conn, err = tls.DialWithDialer(&dialer, network, address, &tls.Config{InsecureSkipVerify: true}) - } else { - conn, err = dialer.Dial(network, address) - } - if err != nil { - glog.V(5).Infof("Got error %#v, trying again: %#v\n", err, address) - time.Sleep(interval) - continue - } - conn.Close() - return nil - } - return err -} diff --git a/pkg/genericapiserver/serve.go b/pkg/genericapiserver/serve.go new file mode 100644 index 0000000000..686b0fb2c2 --- /dev/null +++ b/pkg/genericapiserver/serve.go @@ -0,0 +1,249 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package genericapiserver + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "net/http" + "sync" + "time" + + certutil "k8s.io/kubernetes/pkg/util/cert" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + + "github.com/golang/glog" + "github.com/pkg/errors" +) + +const ( + defaultKeepAlivePeriod = 3 * time.Minute +) + +// serveSecurely runs the secure http server. It fails only if certificates cannot +// be loaded or the initial listen call fails. The actual server loop (stoppable by closing +// stopCh) runs in a go routine, i.e. serveSecurely does not block. +func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error { + secureServer := &http.Server{ + Addr: s.SecureServingInfo.BindAddress, + Handler: s.Handler, + MaxHeaderBytes: 1 << 20, + TLSConfig: &tls.Config{ + // Can't use SSLv3 because of POODLE and BEAST + // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher + // Can't use TLSv1.1 because of RC4 cipher usage + MinVersion: tls.VersionTLS12, + // enable HTTP2 for go's 1.7 HTTP Server + NextProtos: []string{"h2", "http/1.1"}, + }, + } + + var err error + if len(s.SecureServingInfo.ServerCert.CertFile) != 0 || len(s.SecureServingInfo.ServerCert.KeyFile) != 0 { + secureServer.TLSConfig.Certificates = make([]tls.Certificate, 1) + secureServer.TLSConfig.Certificates[0], err = tls.LoadX509KeyPair(s.SecureServingInfo.ServerCert.CertFile, s.SecureServingInfo.ServerCert.KeyFile) + if err != nil { + return fmt.Errorf("unable to load server certificate: %v", err) + } + } + + if len(s.SecureServingInfo.ClientCA) > 0 { + clientCAs, err := certutil.NewPool(s.SecureServingInfo.ClientCA) + if err != nil { + return fmt.Errorf("unable to load client CA file: %v", err) + } + // Populate PeerCertificates in requests, but don't reject connections without certificates + // This allows certificates to be validated by authenticators, while still allowing other auth types + secureServer.TLSConfig.ClientAuth = tls.RequestClientCert + // Specify allowed CAs for client certificates + secureServer.TLSConfig.ClientCAs = clientCAs + } + + glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress) + s.effectiveSecurePort, err = runServer(secureServer, stopCh) + return err +} + +// serveInsecurely run the insecure http server. It fails only if the initial listen +// call fails. The actual server loop (stoppable by closing stopCh) runs in a go +// routine, i.e. serveInsecurely does not block. +func (s *GenericAPIServer) serveInsecurely(stopCh <-chan struct{}) error { + insecureServer := &http.Server{ + Addr: s.InsecureServingInfo.BindAddress, + Handler: s.InsecureHandler, + MaxHeaderBytes: 1 << 20, + } + glog.Infof("Serving insecurely on %s", s.InsecureServingInfo.BindAddress) + var err error + s.effectiveInsecurePort, err = runServer(insecureServer, stopCh) + return err +} + +// runServer listens on the given port, then spawns a go-routine continuously serving +// until the stopCh is closed. The port is returned. This function does not block. +func runServer(server *http.Server, stopCh <-chan struct{}) (int, error) { + if len(server.Addr) == 0 { + return 0, errors.New("address cannot be empty") + } + + // first listen is synchronous (fail early!) + ln, err := net.Listen("tcp", server.Addr) + if err != nil { + return 0, fmt.Errorf("failed to listen on %v: %v", server.Addr, err) + } + + // get port + tcpAddr, ok := ln.Addr().(*net.TCPAddr) + if !ok { + ln.Close() + return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String()) + } + + lock := sync.Mutex{} // to avoid we close an old listener during a listen retry + go func() { + <-stopCh + lock.Lock() + defer lock.Unlock() + ln.Close() + }() + + go func() { + defer utilruntime.HandleCrash() + + for { + var listener net.Listener + listener = tcpKeepAliveListener{ln.(*net.TCPListener)} + if server.TLSConfig != nil { + listener = tls.NewListener(listener, server.TLSConfig) + } + + err := server.Serve(listener) + glog.Errorf("Error serving %v (%v); will try again.", server.Addr, err) + + // listen again + func() { + lock.Lock() + defer lock.Unlock() + for { + time.Sleep(15 * time.Second) + + ln, err = net.Listen("tcp", server.Addr) + if err == nil { + return + } + select { + case <-stopCh: + return + default: + } + glog.Errorf("Error listening on %v (%v); will try again.", server.Addr, err) + } + }() + + select { + case <-stopCh: + return + default: + } + } + }() + + return tcpAddr.Port, nil +} + +// getNamedCertificateMap returns a map of strings to *tls.Certificate, suitable for use in +// tls.Config#NamedCertificates. Returns an error if any of the certs cannot be loaded. +// Returns nil if len(namedCertKeys) == 0 +func getNamedCertificateMap(namedCertKeys []NamedCertKey) (map[string]*tls.Certificate, error) { + if len(namedCertKeys) == 0 { + return nil, nil + } + + // load keys + tlsCerts := make([]tls.Certificate, len(namedCertKeys)) + for i := range namedCertKeys { + var err error + nkc := &namedCertKeys[i] + tlsCerts[i], err = tls.LoadX509KeyPair(nkc.CertFile, nkc.KeyFile) + if err != nil { + return nil, err + } + } + + // register certs with implicit names first, reverse order such that earlier trump over the later + tlsCertsByName := map[string]*tls.Certificate{} + for i := len(namedCertKeys) - 1; i >= 0; i-- { + nkc := &namedCertKeys[i] + if len(nkc.Names) > 0 { + continue + } + cert := &tlsCerts[i] + + // read names from certificate common names and DNS names + if len(cert.Certificate) == 0 { + return nil, fmt.Errorf("no certificate found in %q", nkc.CertFile) + } + x509Cert, err := x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("parse error for certificate in %q: %v", nkc.CertFile, err) + } + if len(x509Cert.Subject.CommonName) > 0 { + tlsCertsByName[x509Cert.Subject.CommonName] = cert + } + for _, san := range x509Cert.DNSNames { + tlsCertsByName[san] = cert + } + // intentionally all IPs in the cert are ignored as SNI forbids passing IPs + // to select a cert. Before go 1.6 the tls happily passed IPs as SNI values. + } + + // register certs with explicit names last, overwriting every of the implicit ones, + // again in reverse order. + for i := len(namedCertKeys) - 1; i >= 0; i-- { + nkc := &namedCertKeys[i] + if len(nkc.Names) == 0 { + continue + } + for _, name := range nkc.Names { + tlsCertsByName[name] = &tlsCerts[i] + } + } + + return tlsCertsByName, nil +} + +// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by ListenAndServe and ListenAndServeTLS so +// dead TCP connections (e.g. closing laptop mid-download) eventually +// go away. +// +// Copied from Go 1.7.2 net/http/server.go +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { + tc, err := ln.AcceptTCP() + if err != nil { + return nil, err + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(defaultKeepAlivePeriod) + return tc, nil +} diff --git a/test/integration/discoverysummarizer/discoverysummarizer_test.go b/test/integration/discoverysummarizer/discoverysummarizer_test.go index 2ee0661821..353cd8733f 100644 --- a/test/integration/discoverysummarizer/discoverysummarizer_test.go +++ b/test/integration/discoverysummarizer/discoverysummarizer_test.go @@ -65,12 +65,13 @@ func runDiscoverySummarizer(t *testing.T) string { return serverURL } -func runAPIServer(t *testing.T) string { +func runAPIServer(t *testing.T, stopCh <-chan struct{}) string { serverRunOptions := apiserver.NewServerRunOptions() - // Change the port, because otherwise it will fail if examples/apiserver/apiserver_test and this are run in parallel. - serverRunOptions.InsecurePort = 8083 + // Change the ports, because otherwise it will fail if examples/apiserver/apiserver_test and this are run in parallel. + serverRunOptions.SecurePort = 6443 + 3 + serverRunOptions.InsecurePort = 8080 + 3 go func() { - if err := apiserver.Run(serverRunOptions); err != nil { + if err := apiserver.Run(serverRunOptions, stopCh); err != nil { t.Fatalf("Error in bringing up the example apiserver: %v", err) } }() @@ -98,7 +99,9 @@ func TestRunDiscoverySummarizer(t *testing.T) { testResponse(t, discoveryURL, "/randomPath", http.StatusNotFound) // Run the APIServer now to test the good case. - runAPIServer(t) + stopCh := make(chan struct{}) + runAPIServer(t, stopCh) + defer close(stopCh) // Test /api path. // There is no server running at that URL, so we will get a 500. diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index 8bed495c60..858d2ee390 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -42,11 +42,13 @@ var groupVersionForDiscovery = unversioned.GroupVersionForDiscovery{ func TestRunServer(t *testing.T) { serverIP := fmt.Sprintf("http://localhost:%d", apiserver.InsecurePort) + stopCh := make(chan struct{}) go func() { - if err := apiserver.Run(apiserver.NewServerRunOptions()); err != nil { + if err := apiserver.Run(apiserver.NewServerRunOptions(), stopCh); err != nil { t.Fatalf("Error in bringing up the server: %v", err) } }() + defer close(stopCh) if err := waitForApiserverUp(serverIP); err != nil { t.Fatalf("%v", err) } @@ -58,14 +60,16 @@ func TestRunServer(t *testing.T) { func TestRunSecureServer(t *testing.T) { serverIP := fmt.Sprintf("https://localhost:%d", apiserver.SecurePort) + stopCh := make(chan struct{}) go func() { options := apiserver.NewServerRunOptions() options.InsecurePort = 0 options.SecurePort = apiserver.SecurePort - if err := apiserver.Run(options); err != nil { + if err := apiserver.Run(options, stopCh); err != nil { t.Fatalf("Error in bringing up the server: %v", err) } }() + defer close(stopCh) if err := waitForApiserverUp(serverIP); err != nil { t.Fatalf("%v", err) } diff --git a/test/integration/federation/server_test.go b/test/integration/federation/server_test.go index 0bd5e56e69..7c8b12b1d1 100644 --- a/test/integration/federation/server_test.go +++ b/test/integration/federation/server_test.go @@ -77,7 +77,8 @@ func TestLongRunningRequestRegexp(t *testing.T) { } } -var insecurePort = 8082 +var securePort = 6443 + 2 +var insecurePort = 8080 + 2 var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort) var groupVersions = []unversioned.GroupVersion{ fed_v1b1.SchemeGroupVersion, @@ -86,6 +87,7 @@ var groupVersions = []unversioned.GroupVersion{ func TestRun(t *testing.T) { s := options.NewServerRunOptions() + s.GenericServerRunOptions.SecurePort = securePort s.GenericServerRunOptions.InsecurePort = insecurePort _, ipNet, _ := net.ParseCIDR("10.10.10.0/24") s.GenericServerRunOptions.ServiceClusterIPRange = *ipNet