Make GenericApiServer.Run interruptable and fail on first listen

pull/6/head
Dr. Stefan Schimanski 2016-10-21 13:22:43 +02:00
parent cc84673ebe
commit d0b3981f07
9 changed files with 283 additions and 107 deletions

View File

@ -335,6 +335,6 @@ func Run(s *options.ServerRunOptions) error {
}
sharedInformers.Start(wait.NeverStop)
m.GenericAPIServer.PrepareRun().Run()
m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
return nil
}

View File

@ -60,7 +60,7 @@ func NewServerRunOptions() *genericoptions.ServerRunOptions {
return serverOptions
}
func Run(serverOptions *genericoptions.ServerRunOptions) error {
func Run(serverOptions *genericoptions.ServerRunOptions, stopCh <-chan struct{}) error {
// Set ServiceClusterIPRange
_, serviceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
serverOptions.ServiceClusterIPRange = *serviceClusterIPRange
@ -105,6 +105,6 @@ func Run(serverOptions *genericoptions.ServerRunOptions) error {
if err := s.InstallAPIGroup(&apiGroupInfo); err != nil {
return fmt.Errorf("Error in installing API: %v", err)
}
s.PrepareRun().Run()
s.PrepareRun().Run(stopCh)
return nil
}

View File

@ -19,6 +19,7 @@ package main
import (
"k8s.io/kubernetes/examples/apiserver"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"github.com/spf13/pflag"
@ -32,7 +33,7 @@ func main() {
serverRunOptions.AddEtcdStorageFlags(pflag.CommandLine)
flag.InitFlags()
if err := apiserver.Run(serverRunOptions); err != nil {
if err := apiserver.Run(serverRunOptions, wait.NeverStop); err != nil {
glog.Fatalf("Error in bringing up the server: %v", err)
}
}

View File

@ -225,7 +225,7 @@ func Run(s *options.ServerRunOptions) error {
installExtensionsAPIs(m, restOptionsFactory)
sharedInformers.Start(wait.NeverStop)
m.PrepareRun().Run()
m.PrepareRun().Run(wait.NeverStop)
return nil
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package genericapiserver
import (
"crypto/tls"
"fmt"
"mime"
"net"
@ -44,9 +43,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver/openapi/common"
"k8s.io/kubernetes/pkg/genericapiserver/routes"
"k8s.io/kubernetes/pkg/runtime"
certutil "k8s.io/kubernetes/pkg/util/cert"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -115,6 +112,9 @@ type GenericAPIServer struct {
SecureServingInfo *ServingInfo
InsecureServingInfo *ServingInfo
// numerical ports, set after listening
effectiveSecurePort, effectiveInsecurePort int
// ExternalAddress is the address (hostname or IP and port) that should be used in
// external (public internet) URLs for this GenericAPIServer.
ExternalAddress string
@ -180,7 +180,6 @@ type preparedGenericAPIServer struct {
// PrepareRun does post API installation setup steps.
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// install APIs which depend on other APIs to be installed
if s.enableSwaggerSupport {
routes.Swagger{ExternalAddress: s.ExternalAddress}.Install(s.HandlerContainer)
}
@ -192,76 +191,18 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
return preparedGenericAPIServer{s}
}
func (s preparedGenericAPIServer) Run() {
// Run spawns the http servers (secure and insecure). It only returns if stopCh is closed
// or one of the ports cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
if s.SecureServingInfo != nil && s.Handler != nil {
secureServer := &http.Server{
Addr: s.SecureServingInfo.BindAddress,
Handler: s.Handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
},
if err := s.serveSecurely(stopCh); err != nil {
glog.Fatal(err)
}
if len(s.SecureServingInfo.ClientCA) > 0 {
clientCAs, err := certutil.NewPool(s.SecureServingInfo.ClientCA)
if err != nil {
glog.Fatalf("Unable to load client CA file: %v", err)
}
// Populate PeerCertificates in requests, but don't reject connections without certificates
// This allows certificates to be validated by authenticators, while still allowing other auth types
secureServer.TLSConfig.ClientAuth = tls.RequestClientCert
// Specify allowed CAs for client certificates
secureServer.TLSConfig.ClientCAs = clientCAs
// "h2" NextProtos is necessary for enabling HTTP2 for go's 1.7 HTTP Server
secureServer.TLSConfig.NextProtos = []string{"h2"}
}
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
go func() {
defer utilruntime.HandleCrash()
for {
if err := secureServer.ListenAndServeTLS(s.SecureServingInfo.ServerCert.CertFile, s.SecureServingInfo.ServerCert.KeyFile); err != nil {
glog.Errorf("Unable to listen for secure (%v); will try again.", err)
}
time.Sleep(15 * time.Second)
}
}()
}
if s.InsecureServingInfo != nil && s.InsecureHandler != nil {
insecureServer := &http.Server{
Addr: s.InsecureServingInfo.BindAddress,
Handler: s.InsecureHandler,
MaxHeaderBytes: 1 << 20,
}
glog.Infof("Serving insecurely on %s", s.InsecureServingInfo.BindAddress)
go func() {
defer utilruntime.HandleCrash()
for {
if err := insecureServer.ListenAndServe(); err != nil {
glog.Errorf("Unable to listen for insecure (%v); will try again.", err)
}
time.Sleep(15 * time.Second)
}
}()
}
// Attempt to verify the server came up for 20 seconds (100 tries * 100ms, 100ms timeout per try) per port
if s.SecureServingInfo != nil {
if err := waitForSuccessfulDial(true, "tcp", s.SecureServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100); err != nil {
glog.Fatalf("Secure server never started: %v", err)
}
}
if s.InsecureServingInfo != nil {
if err := waitForSuccessfulDial(false, "tcp", s.InsecureServingInfo.BindAddress, 100*time.Millisecond, 100*time.Millisecond, 100); err != nil {
glog.Fatalf("Insecure server never started: %v", err)
if err := s.serveInsecurely(stopCh); err != nil {
glog.Fatal(err)
}
}
@ -272,7 +213,7 @@ func (s preparedGenericAPIServer) Run() {
glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
select {}
<-stopCh
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
@ -471,27 +412,3 @@ func NewDefaultAPIGroupInfo(group string) APIGroupInfo {
NegotiatedSerializer: api.Codecs,
}
}
// waitForSuccessfulDial attempts to connect to the given address, closing and returning nil on the first successful connection.
func waitForSuccessfulDial(https bool, network, address string, timeout, interval time.Duration, retries int) error {
var (
conn net.Conn
err error
)
for i := 0; i <= retries; i++ {
dialer := net.Dialer{Timeout: timeout}
if https {
conn, err = tls.DialWithDialer(&dialer, network, address, &tls.Config{InsecureSkipVerify: true})
} else {
conn, err = dialer.Dial(network, address)
}
if err != nil {
glog.V(5).Infof("Got error %#v, trying again: %#v\n", err, address)
time.Sleep(interval)
continue
}
conn.Close()
return nil
}
return err
}

View File

@ -0,0 +1,249 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package genericapiserver
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"sync"
"time"
certutil "k8s.io/kubernetes/pkg/util/cert"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"github.com/golang/glog"
"github.com/pkg/errors"
)
const (
defaultKeepAlivePeriod = 3 * time.Minute
)
// serveSecurely runs the secure http server. It fails only if certificates cannot
// be loaded or the initial listen call fails. The actual server loop (stoppable by closing
// stopCh) runs in a go routine, i.e. serveSecurely does not block.
func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
secureServer := &http.Server{
Addr: s.SecureServingInfo.BindAddress,
Handler: s.Handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: &tls.Config{
// Can't use SSLv3 because of POODLE and BEAST
// Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
// Can't use TLSv1.1 because of RC4 cipher usage
MinVersion: tls.VersionTLS12,
// enable HTTP2 for go's 1.7 HTTP Server
NextProtos: []string{"h2", "http/1.1"},
},
}
var err error
if len(s.SecureServingInfo.ServerCert.CertFile) != 0 || len(s.SecureServingInfo.ServerCert.KeyFile) != 0 {
secureServer.TLSConfig.Certificates = make([]tls.Certificate, 1)
secureServer.TLSConfig.Certificates[0], err = tls.LoadX509KeyPair(s.SecureServingInfo.ServerCert.CertFile, s.SecureServingInfo.ServerCert.KeyFile)
if err != nil {
return fmt.Errorf("unable to load server certificate: %v", err)
}
}
if len(s.SecureServingInfo.ClientCA) > 0 {
clientCAs, err := certutil.NewPool(s.SecureServingInfo.ClientCA)
if err != nil {
return fmt.Errorf("unable to load client CA file: %v", err)
}
// Populate PeerCertificates in requests, but don't reject connections without certificates
// This allows certificates to be validated by authenticators, while still allowing other auth types
secureServer.TLSConfig.ClientAuth = tls.RequestClientCert
// Specify allowed CAs for client certificates
secureServer.TLSConfig.ClientCAs = clientCAs
}
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
s.effectiveSecurePort, err = runServer(secureServer, stopCh)
return err
}
// serveInsecurely run the insecure http server. It fails only if the initial listen
// call fails. The actual server loop (stoppable by closing stopCh) runs in a go
// routine, i.e. serveInsecurely does not block.
func (s *GenericAPIServer) serveInsecurely(stopCh <-chan struct{}) error {
insecureServer := &http.Server{
Addr: s.InsecureServingInfo.BindAddress,
Handler: s.InsecureHandler,
MaxHeaderBytes: 1 << 20,
}
glog.Infof("Serving insecurely on %s", s.InsecureServingInfo.BindAddress)
var err error
s.effectiveInsecurePort, err = runServer(insecureServer, stopCh)
return err
}
// runServer listens on the given port, then spawns a go-routine continuously serving
// until the stopCh is closed. The port is returned. This function does not block.
func runServer(server *http.Server, stopCh <-chan struct{}) (int, error) {
if len(server.Addr) == 0 {
return 0, errors.New("address cannot be empty")
}
// first listen is synchronous (fail early!)
ln, err := net.Listen("tcp", server.Addr)
if err != nil {
return 0, fmt.Errorf("failed to listen on %v: %v", server.Addr, err)
}
// get port
tcpAddr, ok := ln.Addr().(*net.TCPAddr)
if !ok {
ln.Close()
return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
}
lock := sync.Mutex{} // to avoid we close an old listener during a listen retry
go func() {
<-stopCh
lock.Lock()
defer lock.Unlock()
ln.Close()
}()
go func() {
defer utilruntime.HandleCrash()
for {
var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
glog.Errorf("Error serving %v (%v); will try again.", server.Addr, err)
// listen again
func() {
lock.Lock()
defer lock.Unlock()
for {
time.Sleep(15 * time.Second)
ln, err = net.Listen("tcp", server.Addr)
if err == nil {
return
}
select {
case <-stopCh:
return
default:
}
glog.Errorf("Error listening on %v (%v); will try again.", server.Addr, err)
}
}()
select {
case <-stopCh:
return
default:
}
}
}()
return tcpAddr.Port, nil
}
// getNamedCertificateMap returns a map of strings to *tls.Certificate, suitable for use in
// tls.Config#NamedCertificates. Returns an error if any of the certs cannot be loaded.
// Returns nil if len(namedCertKeys) == 0
func getNamedCertificateMap(namedCertKeys []NamedCertKey) (map[string]*tls.Certificate, error) {
if len(namedCertKeys) == 0 {
return nil, nil
}
// load keys
tlsCerts := make([]tls.Certificate, len(namedCertKeys))
for i := range namedCertKeys {
var err error
nkc := &namedCertKeys[i]
tlsCerts[i], err = tls.LoadX509KeyPair(nkc.CertFile, nkc.KeyFile)
if err != nil {
return nil, err
}
}
// register certs with implicit names first, reverse order such that earlier trump over the later
tlsCertsByName := map[string]*tls.Certificate{}
for i := len(namedCertKeys) - 1; i >= 0; i-- {
nkc := &namedCertKeys[i]
if len(nkc.Names) > 0 {
continue
}
cert := &tlsCerts[i]
// read names from certificate common names and DNS names
if len(cert.Certificate) == 0 {
return nil, fmt.Errorf("no certificate found in %q", nkc.CertFile)
}
x509Cert, err := x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return nil, fmt.Errorf("parse error for certificate in %q: %v", nkc.CertFile, err)
}
if len(x509Cert.Subject.CommonName) > 0 {
tlsCertsByName[x509Cert.Subject.CommonName] = cert
}
for _, san := range x509Cert.DNSNames {
tlsCertsByName[san] = cert
}
// intentionally all IPs in the cert are ignored as SNI forbids passing IPs
// to select a cert. Before go 1.6 the tls happily passed IPs as SNI values.
}
// register certs with explicit names last, overwriting every of the implicit ones,
// again in reverse order.
for i := len(namedCertKeys) - 1; i >= 0; i-- {
nkc := &namedCertKeys[i]
if len(nkc.Names) == 0 {
continue
}
for _, name := range nkc.Names {
tlsCertsByName[name] = &tlsCerts[i]
}
}
return tlsCertsByName, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by ListenAndServe and ListenAndServeTLS so
// dead TCP connections (e.g. closing laptop mid-download) eventually
// go away.
//
// Copied from Go 1.7.2 net/http/server.go
type tcpKeepAliveListener struct {
*net.TCPListener
}
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(defaultKeepAlivePeriod)
return tc, nil
}

View File

@ -65,12 +65,13 @@ func runDiscoverySummarizer(t *testing.T) string {
return serverURL
}
func runAPIServer(t *testing.T) string {
func runAPIServer(t *testing.T, stopCh <-chan struct{}) string {
serverRunOptions := apiserver.NewServerRunOptions()
// Change the port, because otherwise it will fail if examples/apiserver/apiserver_test and this are run in parallel.
serverRunOptions.InsecurePort = 8083
// Change the ports, because otherwise it will fail if examples/apiserver/apiserver_test and this are run in parallel.
serverRunOptions.SecurePort = 6443 + 3
serverRunOptions.InsecurePort = 8080 + 3
go func() {
if err := apiserver.Run(serverRunOptions); err != nil {
if err := apiserver.Run(serverRunOptions, stopCh); err != nil {
t.Fatalf("Error in bringing up the example apiserver: %v", err)
}
}()
@ -98,7 +99,9 @@ func TestRunDiscoverySummarizer(t *testing.T) {
testResponse(t, discoveryURL, "/randomPath", http.StatusNotFound)
// Run the APIServer now to test the good case.
runAPIServer(t)
stopCh := make(chan struct{})
runAPIServer(t, stopCh)
defer close(stopCh)
// Test /api path.
// There is no server running at that URL, so we will get a 500.

View File

@ -42,11 +42,13 @@ var groupVersionForDiscovery = unversioned.GroupVersionForDiscovery{
func TestRunServer(t *testing.T) {
serverIP := fmt.Sprintf("http://localhost:%d", apiserver.InsecurePort)
stopCh := make(chan struct{})
go func() {
if err := apiserver.Run(apiserver.NewServerRunOptions()); err != nil {
if err := apiserver.Run(apiserver.NewServerRunOptions(), stopCh); err != nil {
t.Fatalf("Error in bringing up the server: %v", err)
}
}()
defer close(stopCh)
if err := waitForApiserverUp(serverIP); err != nil {
t.Fatalf("%v", err)
}
@ -58,14 +60,16 @@ func TestRunServer(t *testing.T) {
func TestRunSecureServer(t *testing.T) {
serverIP := fmt.Sprintf("https://localhost:%d", apiserver.SecurePort)
stopCh := make(chan struct{})
go func() {
options := apiserver.NewServerRunOptions()
options.InsecurePort = 0
options.SecurePort = apiserver.SecurePort
if err := apiserver.Run(options); err != nil {
if err := apiserver.Run(options, stopCh); err != nil {
t.Fatalf("Error in bringing up the server: %v", err)
}
}()
defer close(stopCh)
if err := waitForApiserverUp(serverIP); err != nil {
t.Fatalf("%v", err)
}

View File

@ -77,7 +77,8 @@ func TestLongRunningRequestRegexp(t *testing.T) {
}
}
var insecurePort = 8082
var securePort = 6443 + 2
var insecurePort = 8080 + 2
var serverIP = fmt.Sprintf("http://localhost:%v", insecurePort)
var groupVersions = []unversioned.GroupVersion{
fed_v1b1.SchemeGroupVersion,
@ -86,6 +87,7 @@ var groupVersions = []unversioned.GroupVersion{
func TestRun(t *testing.T) {
s := options.NewServerRunOptions()
s.GenericServerRunOptions.SecurePort = securePort
s.GenericServerRunOptions.InsecurePort = insecurePort
_, ipNet, _ := net.ParseCIDR("10.10.10.0/24")
s.GenericServerRunOptions.ServiceClusterIPRange = *ipNet