From 11b25366bc7bfe2ad273c8bf9c332fd9d233bffc Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 10 Aug 2017 10:34:25 +0200 Subject: [PATCH] apiservers: add synchronous shutdown mechanism on SIGTERM+INT --- cmd/hyperkube/BUILD | 2 + cmd/hyperkube/federation-apiserver.go | 6 +- .../federation-controller-manager.go | 2 +- cmd/hyperkube/hyperkube.go | 32 +++- cmd/hyperkube/hyperkube_test.go | 138 +++++++++++++++--- cmd/hyperkube/kube-aggregator.go | 6 +- cmd/hyperkube/kube-apiserver.go | 6 +- cmd/hyperkube/kube-controller-manager.go | 2 +- cmd/hyperkube/kube-proxy.go | 2 +- cmd/hyperkube/kube-scheduler.go | 2 +- cmd/hyperkube/kubectl.go | 2 +- cmd/hyperkube/kubelet.go | 2 +- cmd/hyperkube/server.go | 3 +- cmd/kube-apiserver/BUILD | 2 +- cmd/kube-apiserver/apiserver.go | 9 +- .../src/k8s.io/apiextensions-apiserver/BUILD | 3 +- .../k8s.io/apiextensions-apiserver/main.go | 9 +- .../src/k8s.io/apiserver/pkg/audit/types.go | 4 + .../src/k8s.io/apiserver/pkg/audit/union.go | 6 + .../k8s.io/apiserver/pkg/audit/union_test.go | 4 + staging/src/k8s.io/apiserver/pkg/server/BUILD | 9 +- .../apiserver/pkg/server/genericapiserver.go | 5 + .../src/k8s.io/apiserver/pkg/server/signal.go | 43 ++++++ .../apiserver/pkg/server/signal_posix.go | 26 ++++ .../apiserver/pkg/server/signal_windows.go | 23 +++ .../apiserver/plugin/pkg/audit/log/backend.go | 4 + .../plugin/pkg/audit/webhook/webhook.go | 8 + staging/src/k8s.io/kube-aggregator/BUILD | 3 +- staging/src/k8s.io/kube-aggregator/main.go | 9 +- .../cmd/go-to-protobuf/protobuf/tags.go | 2 +- .../cmd/informer-gen/generators/tags.go | 2 +- .../cmd/lister-gen/generators/tags.go | 2 +- staging/src/k8s.io/sample-apiserver/BUILD | 3 +- staging/src/k8s.io/sample-apiserver/main.go | 9 +- 34 files changed, 324 insertions(+), 66 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/signal.go create mode 100644 staging/src/k8s.io/apiserver/pkg/server/signal_posix.go create mode 100644 staging/src/k8s.io/apiserver/pkg/server/signal_windows.go diff --git a/cmd/hyperkube/BUILD b/cmd/hyperkube/BUILD index 09030869be..5f507e5781 100644 --- a/cmd/hyperkube/BUILD +++ b/cmd/hyperkube/BUILD @@ -21,6 +21,7 @@ go_test( deps = [ "//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) @@ -62,6 +63,7 @@ go_library( "//plugin/cmd/kube-scheduler/app/options:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", diff --git a/cmd/hyperkube/federation-apiserver.go b/cmd/hyperkube/federation-apiserver.go index 48064d17b6..97dff04139 100644 --- a/cmd/hyperkube/federation-apiserver.go +++ b/cmd/hyperkube/federation-apiserver.go @@ -17,7 +17,6 @@ limitations under the License. package main import ( - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/federation/cmd/federation-apiserver/app" "k8s.io/kubernetes/federation/cmd/federation-apiserver/app/options" ) @@ -30,9 +29,10 @@ func NewFederationAPIServer() *Server { hks := Server{ SimpleUsage: "federation-apiserver", Long: "The API entrypoint for the federation control plane", - Run: func(_ *Server, args []string) error { - return app.Run(s, wait.NeverStop) + Run: func(_ *Server, args []string, stopCh <-chan struct{}) error { + return app.Run(s, stopCh) }, + RespectsStopCh: true, } s.AddFlags(hks.Flags()) return &hks diff --git a/cmd/hyperkube/federation-controller-manager.go b/cmd/hyperkube/federation-controller-manager.go index e445f87747..b702a1e34c 100644 --- a/cmd/hyperkube/federation-controller-manager.go +++ b/cmd/hyperkube/federation-controller-manager.go @@ -29,7 +29,7 @@ func NewFederationCMServer() *Server { hks := Server{ SimpleUsage: "federation-controller-manager", Long: "Controller manager for federation control plane. Manages federation service endpoints and controllers", - Run: func(_ *Server, args []string) error { + Run: func(_ *Server, args []string, stopCh <-chan struct{}) error { return app.Run(s) }, } diff --git a/cmd/hyperkube/hyperkube.go b/cmd/hyperkube/hyperkube.go index 4c3118240b..556711d5d8 100644 --- a/cmd/hyperkube/hyperkube.go +++ b/cmd/hyperkube/hyperkube.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,12 +25,14 @@ import ( "os" "path" + "github.com/spf13/pflag" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/server" utilflag "k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/logs" utiltemplate "k8s.io/kubernetes/pkg/util/template" "k8s.io/kubernetes/pkg/version/verflag" - - "github.com/spf13/pflag" ) // HyperKube represents a single binary that can morph/manage into multiple @@ -115,7 +117,7 @@ func (hk *HyperKube) Printf(format string, i ...interface{}) { } // Run the server. This will pick the appropriate server and run it. -func (hk *HyperKube) Run(args []string) error { +func (hk *HyperKube) Run(args []string, stopCh <-chan struct{}) error { // If we are called directly, parse all flags up to the first real // argument. That should be the server to run. command := args[0] @@ -174,7 +176,22 @@ func (hk *HyperKube) Run(args []string) error { logs.InitLogs() defer logs.FlushLogs() - err = s.Run(s, s.Flags().Args()) + if !s.RespectsStopCh { + // For commands that do not respect the stopCh, we run them in a go + // routine and leave them running when stopCh is closed. + errCh := make(chan error) + go func() { + errCh <- s.Run(s, s.Flags().Args(), wait.NeverStop) + }() + select { + case <-stopCh: + return errors.New("interrupted") // This error text is ignored. + case err = <-errCh: + // fall-through + } + } else { + err = s.Run(s, s.Flags().Args(), stopCh) + } if err != nil { hk.Println("Error:", err) } @@ -184,11 +201,10 @@ func (hk *HyperKube) Run(args []string) error { // RunToExit will run the hyperkube and then call os.Exit with an appropriate exit code. func (hk *HyperKube) RunToExit(args []string) { - err := hk.Run(args) - if err != nil { + stopCh := server.SetupSignalHandler() + if err := hk.Run(args, stopCh); err != nil { os.Exit(1) } - os.Exit(0) } // Usage will write out a summary for all servers that this binary supports. diff --git a/cmd/hyperkube/hyperkube_test.go b/cmd/hyperkube/hyperkube_test.go index ae0289e667..5a4459b2a9 100644 --- a/cmd/hyperkube/hyperkube_test.go +++ b/cmd/hyperkube/hyperkube_test.go @@ -22,9 +22,12 @@ import ( "fmt" "strings" "testing" + "time" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" + + "k8s.io/apimachinery/pkg/util/wait" ) type result struct { @@ -36,7 +39,7 @@ func testServer(n string) *Server { return &Server{ SimpleUsage: n, Long: fmt.Sprintf("A simple server named %s", n), - Run: func(s *Server, args []string) error { + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { s.hk.Printf("%s Run\n", s.Name()) return nil }, @@ -46,12 +49,47 @@ func testServerError(n string) *Server { return &Server{ SimpleUsage: n, Long: fmt.Sprintf("A simple server named %s that returns an error", n), - Run: func(s *Server, args []string) error { + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { s.hk.Printf("%s Run\n", s.Name()) return errors.New("server returning error") }, } } +func testStopChRespectingServer(n string) *Server { + return &Server{ + SimpleUsage: n, + Long: fmt.Sprintf("A simple server named %s", n), + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { + s.hk.Printf("%s Run\n", s.Name()) + <-stopCh + return nil + }, + RespectsStopCh: true, + } +} +func testStopChIgnoringServer(n string) *Server { + return &Server{ + SimpleUsage: n, + Long: fmt.Sprintf("A simple server named %s", n), + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { + <-wait.NeverStop // this leaks obviously, but we don't care about one go routine more or less in test + return nil + }, + RespectsStopCh: false, + } +} +func testStopChRespectingServerWithError(n string) *Server { + return &Server{ + SimpleUsage: n, + Long: fmt.Sprintf("A simple server named %s", n), + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { + s.hk.Printf("%s Run\n", s.Name()) + <-stopCh + return errors.New("server returning error") + }, + RespectsStopCh: true, + } +} const defaultCobraMessage = "default message from cobra command" const defaultCobraSubMessage = "default sub-message from cobra command" @@ -91,7 +129,7 @@ func testCobraCommand(n string) *Server { s := &Server{ SimpleUsage: n, Long: fmt.Sprintf("A server named %s which uses a cobra command", n), - Run: func(s *Server, args []string) error { + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { cobraServer = s cmd.SetOutput(s.hk.Out()) cmd.SetArgs(args) @@ -102,7 +140,7 @@ func testCobraCommand(n string) *Server { return s } -func runFull(t *testing.T, args string) *result { +func runFull(t *testing.T, args string, stopCh <-chan struct{}) *result { buf := new(bytes.Buffer) hk := HyperKube{ Name: "hyperkube", @@ -114,11 +152,14 @@ func runFull(t *testing.T, args string) *result { hk.AddServer(testServer("test2")) hk.AddServer(testServer("test3")) hk.AddServer(testServerError("test-error")) + hk.AddServer(testStopChIgnoringServer("test-stop-ch-ignoring")) + hk.AddServer(testStopChRespectingServer("test-stop-ch-respecting")) + hk.AddServer(testStopChRespectingServerWithError("test-error-stop-ch-respecting")) hk.AddServer(testCobraCommand("test-cobra-command")) a := strings.Split(args, " ") t.Logf("Running full with args: %q", a) - err := hk.Run(a) + err := hk.Run(a, stopCh) r := &result{err, buf.String()} t.Logf("Result err: %v, output: %q", r.err, r.output) @@ -127,37 +168,37 @@ func runFull(t *testing.T, args string) *result { } func TestRun(t *testing.T) { - x := runFull(t, "hyperkube test1") + x := runFull(t, "hyperkube test1", wait.NeverStop) assert.Contains(t, x.output, "test1 Run") assert.NoError(t, x.err) } func TestLinkRun(t *testing.T) { - x := runFull(t, "test1") + x := runFull(t, "test1", wait.NeverStop) assert.Contains(t, x.output, "test1 Run") assert.NoError(t, x.err) } func TestTopNoArgs(t *testing.T) { - x := runFull(t, "hyperkube") + x := runFull(t, "hyperkube", wait.NeverStop) assert.EqualError(t, x.err, "no server specified") } func TestBadServer(t *testing.T) { - x := runFull(t, "hyperkube bad-server") + x := runFull(t, "hyperkube bad-server", wait.NeverStop) assert.EqualError(t, x.err, "Server not found: bad-server") assert.Contains(t, x.output, "Usage") } func TestTopHelp(t *testing.T) { - x := runFull(t, "hyperkube --help") + x := runFull(t, "hyperkube --help", wait.NeverStop) assert.NoError(t, x.err) assert.Contains(t, x.output, "all-in-one") assert.Contains(t, x.output, "A simple server named test1") } func TestTopFlags(t *testing.T) { - x := runFull(t, "hyperkube --help test1") + x := runFull(t, "hyperkube --help test1", wait.NeverStop) assert.NoError(t, x.err) assert.Contains(t, x.output, "all-in-one") assert.Contains(t, x.output, "A simple server named test1") @@ -165,14 +206,14 @@ func TestTopFlags(t *testing.T) { } func TestTopFlagsBad(t *testing.T) { - x := runFull(t, "hyperkube --bad-flag") + x := runFull(t, "hyperkube --bad-flag", wait.NeverStop) assert.EqualError(t, x.err, "unknown flag: --bad-flag") assert.Contains(t, x.output, "all-in-one") assert.Contains(t, x.output, "A simple server named test1") } func TestServerHelp(t *testing.T) { - x := runFull(t, "hyperkube test1 --help") + x := runFull(t, "hyperkube test1 --help", wait.NeverStop) assert.NoError(t, x.err) assert.Contains(t, x.output, "A simple server named test1") assert.Contains(t, x.output, "-h, --help") @@ -181,7 +222,7 @@ func TestServerHelp(t *testing.T) { } func TestServerFlagsBad(t *testing.T) { - x := runFull(t, "hyperkube test1 --bad-flag") + x := runFull(t, "hyperkube test1 --bad-flag", wait.NeverStop) assert.EqualError(t, x.err, "unknown flag: --bad-flag") assert.Contains(t, x.output, "A simple server named test1") assert.Contains(t, x.output, "-h, --help") @@ -190,36 +231,91 @@ func TestServerFlagsBad(t *testing.T) { } func TestServerError(t *testing.T) { - x := runFull(t, "hyperkube test-error") + x := runFull(t, "hyperkube test-error", wait.NeverStop) assert.Contains(t, x.output, "test-error Run") assert.EqualError(t, x.err, "server returning error") } +func TestStopChIgnoringServer(t *testing.T) { + stopCh := make(chan struct{}) + returnedCh := make(chan struct{}) + var x *result + go func() { + defer close(returnedCh) + x = runFull(t, "hyperkube test-stop-ch-ignoring", stopCh) + }() + close(stopCh) + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-stop-ch-ignoring") + case <-returnedCh: + } + // we cannot be sure that the server had a chance to output anything + // assert.Contains(t, x.output, "test-error-stop-ch-ignoring Run") + assert.EqualError(t, x.err, "interrupted") +} + +func TestStopChRespectingServer(t *testing.T) { + stopCh := make(chan struct{}) + returnedCh := make(chan struct{}) + var x *result + go func() { + defer close(returnedCh) + x = runFull(t, "hyperkube test-stop-ch-respecting", stopCh) + }() + close(stopCh) + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-stop-ch-respecting") + case <-returnedCh: + } + assert.Contains(t, x.output, "test-stop-ch-respecting Run") + assert.Nil(t, x.err) +} + +func TestStopChRespectingServerWithError(t *testing.T) { + stopCh := make(chan struct{}) + returnedCh := make(chan struct{}) + var x *result + go func() { + defer close(returnedCh) + x = runFull(t, "hyperkube test-error-stop-ch-respecting", stopCh) + }() + close(stopCh) + select { + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%q never returned after stopCh was closed", "hyperkube test-error-stop-ch-respecting") + case <-returnedCh: + } + assert.Contains(t, x.output, "test-error-stop-ch-respecting Run") + assert.EqualError(t, x.err, "server returning error") +} + func TestCobraCommandHelp(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command --help") + x := runFull(t, "hyperkube test-cobra-command --help", wait.NeverStop) assert.NoError(t, x.err) assert.Contains(t, x.output, "A server named test-cobra-command which uses a cobra command") assert.Contains(t, x.output, cobraMessageDesc) } func TestCobraCommandDefaultMessage(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command") + x := runFull(t, "hyperkube test-cobra-command", wait.NeverStop) assert.Contains(t, x.output, fmt.Sprintf("msg: %s", defaultCobraMessage)) } func TestCobraCommandMessage(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command --msg foobar") + x := runFull(t, "hyperkube test-cobra-command --msg foobar", wait.NeverStop) assert.Contains(t, x.output, "msg: foobar") } func TestCobraSubCommandHelp(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command subcommand --help") + x := runFull(t, "hyperkube test-cobra-command subcommand --help", wait.NeverStop) assert.NoError(t, x.err) assert.Contains(t, x.output, cobraSubMessageDesc) } func TestCobraSubCommandDefaultMessage(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command subcommand") + x := runFull(t, "hyperkube test-cobra-command subcommand", wait.NeverStop) assert.Contains(t, x.output, fmt.Sprintf("submsg: %s", defaultCobraSubMessage)) } func TestCobraSubCommandMessage(t *testing.T) { - x := runFull(t, "hyperkube test-cobra-command subcommand --submsg foobar") + x := runFull(t, "hyperkube test-cobra-command subcommand --submsg foobar", wait.NeverStop) assert.Contains(t, x.output, "submsg: foobar") } diff --git a/cmd/hyperkube/kube-aggregator.go b/cmd/hyperkube/kube-aggregator.go index 8d30553914..6f1b022d39 100644 --- a/cmd/hyperkube/kube-aggregator.go +++ b/cmd/hyperkube/kube-aggregator.go @@ -19,7 +19,6 @@ package main import ( "os" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kube-aggregator/pkg/cmd/server" ) @@ -33,18 +32,19 @@ func NewKubeAggregator() *Server { AlternativeName: "kube-aggregator", SimpleUsage: "aggregator", Long: "Aggregator for Kubernetes-style API servers: dynamic registration, discovery summarization, secure proxy.", - Run: func(_ *Server, args []string) error { + Run: func(_ *Server, args []string, stopCh <-chan struct{}) error { if err := o.Complete(); err != nil { return err } if err := o.Validate(args); err != nil { return err } - if err := o.RunAggregator(wait.NeverStop); err != nil { + if err := o.RunAggregator(stopCh); err != nil { return err } return nil }, + RespectsStopCh: true, } o.AddFlags(hks.Flags()) diff --git a/cmd/hyperkube/kube-apiserver.go b/cmd/hyperkube/kube-apiserver.go index e356009c2a..160ce22ac5 100644 --- a/cmd/hyperkube/kube-apiserver.go +++ b/cmd/hyperkube/kube-apiserver.go @@ -17,7 +17,6 @@ limitations under the License. package main import ( - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" ) @@ -32,9 +31,10 @@ func NewKubeAPIServer() *Server { AlternativeName: "kube-apiserver", SimpleUsage: "apiserver", Long: "The main API entrypoint and interface to the storage system. The API server is also the focal point for all authorization decisions.", - Run: func(_ *Server, args []string) error { - return app.Run(s, wait.NeverStop) + Run: func(_ *Server, args []string, stopCh <-chan struct{}) error { + return app.Run(s, stopCh) }, + RespectsStopCh: true, } s.AddFlags(hks.Flags()) return &hks diff --git a/cmd/hyperkube/kube-controller-manager.go b/cmd/hyperkube/kube-controller-manager.go index 35061e9a83..c69cb99084 100644 --- a/cmd/hyperkube/kube-controller-manager.go +++ b/cmd/hyperkube/kube-controller-manager.go @@ -31,7 +31,7 @@ func NewKubeControllerManager() *Server { AlternativeName: "kube-controller-manager", SimpleUsage: "controller-manager", Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.", - Run: func(_ *Server, args []string) error { + Run: func(_ *Server, args []string, stopCh <-chan struct{}) error { return app.Run(s) }, } diff --git a/cmd/hyperkube/kube-proxy.go b/cmd/hyperkube/kube-proxy.go index 76bc2dc927..cd5b5d45d4 100644 --- a/cmd/hyperkube/kube-proxy.go +++ b/cmd/hyperkube/kube-proxy.go @@ -45,7 +45,7 @@ func NewKubeProxy() *Server { // refactored to use cobra throughout. command.Flags().AddGoFlagSet(flag.CommandLine) - hks.Run = func(_ *Server, args []string) error { + hks.Run = func(_ *Server, args []string, stopCh <-chan struct{}) error { command.SetArgs(args) return command.Execute() } diff --git a/cmd/hyperkube/kube-scheduler.go b/cmd/hyperkube/kube-scheduler.go index fd00103ac8..5e5abb2c63 100644 --- a/cmd/hyperkube/kube-scheduler.go +++ b/cmd/hyperkube/kube-scheduler.go @@ -31,7 +31,7 @@ func NewScheduler() *Server { AlternativeName: "kube-scheduler", SimpleUsage: "scheduler", Long: "Implements a Kubernetes scheduler. This will assign pods to kubelets based on capacity and constraints.", - Run: func(_ *Server, _ []string) error { + Run: func(_ *Server, _ []string, stopCh <-chan struct{}) error { return app.Run(s) }, } diff --git a/cmd/hyperkube/kubectl.go b/cmd/hyperkube/kubectl.go index 6e01818b94..239d585a77 100644 --- a/cmd/hyperkube/kubectl.go +++ b/cmd/hyperkube/kubectl.go @@ -33,7 +33,7 @@ func NewKubectlServer() *Server { name: "kubectl", SimpleUsage: "Kubernetes command line client", Long: "Kubernetes command line client", - Run: func(s *Server, args []string) error { + Run: func(s *Server, args []string, stopCh <-chan struct{}) error { cmd.SetArgs(args) return cmd.Execute() }, diff --git a/cmd/hyperkube/kubelet.go b/cmd/hyperkube/kubelet.go index 1b2b65edeb..36c3286a12 100644 --- a/cmd/hyperkube/kubelet.go +++ b/cmd/hyperkube/kubelet.go @@ -38,7 +38,7 @@ func NewKubelet() (*Server, error) { queries Docker to see what is currently running. It synchronizes the configuration data, with the running set of containers by starting or stopping Docker containers.`, - Run: func(_ *Server, _ []string) error { + Run: func(_ *Server, _ []string, stopCh <-chan struct{}) error { return app.Run(s, nil) }, } diff --git a/cmd/hyperkube/server.go b/cmd/hyperkube/server.go index 02e912014d..ff1b260d6c 100644 --- a/cmd/hyperkube/server.go +++ b/cmd/hyperkube/server.go @@ -26,7 +26,7 @@ import ( "github.com/spf13/pflag" ) -type serverRunFunc func(s *Server, args []string) error +type serverRunFunc func(s *Server, args []string, stopCh <-chan struct{}) error // Server describes a server that this binary can morph into. type Server struct { @@ -34,6 +34,7 @@ type Server struct { Long string // Longer free form description of the server Run serverRunFunc // Run the server. This is not expected to return. AlternativeName string + RespectsStopCh bool flags *pflag.FlagSet // Flags for the command (and all dependents) name string diff --git a/cmd/kube-apiserver/BUILD b/cmd/kube-apiserver/BUILD index 6a942cadc9..f76a4cf353 100644 --- a/cmd/kube-apiserver/BUILD +++ b/cmd/kube-apiserver/BUILD @@ -29,7 +29,7 @@ go_library( "//pkg/version/prometheus:go_default_library", "//pkg/version/verflag:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", ], diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index 8c369f6f9d..5ed248f155 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -24,7 +24,9 @@ import ( "os" "time" - "k8s.io/apimachinery/pkg/util/wait" + "github.com/spf13/pflag" + + "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/util/flag" "k8s.io/apiserver/pkg/util/logs" "k8s.io/kubernetes/cmd/kube-apiserver/app" @@ -32,8 +34,6 @@ import ( _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration "k8s.io/kubernetes/pkg/version/verflag" - - "github.com/spf13/pflag" ) func main() { @@ -48,7 +48,8 @@ func main() { verflag.PrintAndExitIfRequested() - if err := app.Run(s, wait.NeverStop); err != nil { + stopCh := server.SetupSignalHandler() + if err := app.Run(s, stopCh); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } diff --git a/staging/src/k8s.io/apiextensions-apiserver/BUILD b/staging/src/k8s.io/apiextensions-apiserver/BUILD index d7a5b5dd32..b52477d47a 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/BUILD +++ b/staging/src/k8s.io/apiextensions-apiserver/BUILD @@ -15,8 +15,9 @@ go_library( name = "go_default_library", srcs = ["main.go"], deps = [ + "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiextensions-apiserver/main.go b/staging/src/k8s.io/apiextensions-apiserver/main.go index 5244c496f9..7723e65135 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/main.go +++ b/staging/src/k8s.io/apiextensions-apiserver/main.go @@ -21,8 +21,10 @@ import ( "os" "runtime" + "github.com/golang/glog" + "k8s.io/apiextensions-apiserver/pkg/cmd/server" - "k8s.io/apimachinery/pkg/util/wait" + genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/util/logs" ) @@ -34,9 +36,10 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) } - cmd := server.NewCommandStartCustomResourceDefinitionsServer(os.Stdout, os.Stderr, wait.NeverStop) + stopCh := genericapiserver.SetupSignalHandler() + cmd := server.NewCommandStartCustomResourceDefinitionsServer(os.Stdout, os.Stderr, stopCh) cmd.Flags().AddGoFlagSet(flag.CommandLine) if err := cmd.Execute(); err != nil { - panic(err) + glog.Fatal(err) } } diff --git a/staging/src/k8s.io/apiserver/pkg/audit/types.go b/staging/src/k8s.io/apiserver/pkg/audit/types.go index a7c10cf03b..0b27b0536b 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/types.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/types.go @@ -34,4 +34,8 @@ type Backend interface { // Run will initialize the backend. It must not block, but may run go routines in the background. If // stopCh is closed, it is supposed to stop them. Run will be called before the first call to ProcessEvents. Run(stopCh <-chan struct{}) error + + // Shutdown will synchronously shut down the backend while making sure that all pending + // events are delivered. + Shutdown() } diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union.go b/staging/src/k8s.io/apiserver/pkg/audit/union.go index ba969cec98..856b5c125d 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/union.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/union.go @@ -49,3 +49,9 @@ func (u union) Run(stopCh <-chan struct{}) error { } return errors.AggregateGoroutines(funcs...) } + +func (u union) Shutdown() { + for _, backend := range u.backends { + backend.Shutdown() + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go index c016f3d07c..70d33b03f1 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/union_test.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/union_test.go @@ -36,6 +36,10 @@ func (f *fakeBackend) Run(stopCh <-chan struct{}) error { return nil } +func (u *fakeBackend) Shutdown() { + // nothing to do here +} + func TestUnion(t *testing.T) { backends := []Backend{ new(fakeBackend), diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index 7e3433955f..e4ae0d1783 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -52,7 +52,14 @@ go_library( "hooks.go", "plugins.go", "serve.go", - ], + "signal.go", + "signal_posix.go", + ] + select({ + "@io_bazel_rules_go//go/platform:windows_amd64": [ + "signal_windows.go", + ], + "//conditions:default": [], + }), deps = [ "//vendor/github.com/coreos/go-systemd/daemon:go_default_library", "//vendor/github.com/emicklei/go-restful:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 806f5dc9e6..4195979279 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -246,6 +246,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { } <-stopCh + + if s.GenericAPIServer.AuditBackend != nil { + s.GenericAPIServer.AuditBackend.Shutdown() + } + return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/server/signal.go b/staging/src/k8s.io/apiserver/pkg/server/signal.go new file mode 100644 index 0000000000..1cd8cefaa2 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/signal.go @@ -0,0 +1,43 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "os" + "os/signal" +) + +var onlyOneSignalHandler = make(chan struct{}) + +// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler() (stopCh <-chan struct{}) { + close(onlyOneSignalHandler) // panics when called twice + + stop := make(chan struct{}) + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + close(stop) + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return stop +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/signal_posix.go b/staging/src/k8s.io/apiserver/pkg/server/signal_posix.go new file mode 100644 index 0000000000..11b3bba65f --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/signal_posix.go @@ -0,0 +1,26 @@ +// +build !windows + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "os" + "syscall" +) + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} diff --git a/staging/src/k8s.io/apiserver/pkg/server/signal_windows.go b/staging/src/k8s.io/apiserver/pkg/server/signal_windows.go new file mode 100644 index 0000000000..e7645a2088 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/signal_windows.go @@ -0,0 +1,23 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "os" +) + +var shutdownSignals = []os.Signal{os.Interrupt} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go index 2794e76eb5..ff1c128812 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/log/backend.go @@ -85,3 +85,7 @@ func (b *backend) logEvent(ev *auditinternal.Event) { func (b *backend) Run(stopCh <-chan struct{}) error { return nil } + +func (u *backend) Shutdown() { + // nothing to do here +} diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go index e20afd2e02..52e21855ec 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/webhook/webhook.go @@ -117,6 +117,10 @@ func (b *blockingBackend) Run(stopCh <-chan struct{}) error { return nil } +func (b *blockingBackend) Shutdown() { + // nothing to do here +} + func (b *blockingBackend) ProcessEvents(ev ...*auditinternal.Event) { if err := b.processEvents(ev...); err != nil { audit.HandlePluginError(pluginName, err, ev...) @@ -203,6 +207,10 @@ func (b *batchBackend) Run(stopCh <-chan struct{}) error { return nil } +func (b *batchBackend) Shutdown() { + // TODO: send out batched events +} + // sendBatchEvents attempts to batch some number of events to the backend. It POSTs events // in a goroutine and logging any error encountered during the POST. // diff --git a/staging/src/k8s.io/kube-aggregator/BUILD b/staging/src/k8s.io/kube-aggregator/BUILD index 52ca37eb68..9e4bad016d 100644 --- a/staging/src/k8s.io/kube-aggregator/BUILD +++ b/staging/src/k8s.io/kube-aggregator/BUILD @@ -23,7 +23,8 @@ go_library( name = "go_default_library", srcs = ["main.go"], deps = [ - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/install:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/validation:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/main.go b/staging/src/k8s.io/kube-aggregator/main.go index f43e8b88a1..7c1e88664f 100644 --- a/staging/src/k8s.io/kube-aggregator/main.go +++ b/staging/src/k8s.io/kube-aggregator/main.go @@ -21,7 +21,9 @@ import ( "os" "runtime" - "k8s.io/apimachinery/pkg/util/wait" + "github.com/golang/glog" + + genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/util/logs" "k8s.io/kube-aggregator/pkg/cmd/server" @@ -41,9 +43,10 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) } - cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, wait.NeverStop) + stopCh := genericapiserver.SetupSignalHandler() + cmd := server.NewCommandStartAggregator(os.Stdout, os.Stderr, stopCh) cmd.Flags().AddGoFlagSet(flag.CommandLine) if err := cmd.Execute(); err != nil { - panic(err) + glog.Fatal(err) } } diff --git a/staging/src/k8s.io/kube-gen/cmd/go-to-protobuf/protobuf/tags.go b/staging/src/k8s.io/kube-gen/cmd/go-to-protobuf/protobuf/tags.go index 62f9002aed..2dff5b9229 100644 --- a/staging/src/k8s.io/kube-gen/cmd/go-to-protobuf/protobuf/tags.go +++ b/staging/src/k8s.io/kube-gen/cmd/go-to-protobuf/protobuf/tags.go @@ -27,7 +27,7 @@ import ( func extractBoolTagOrDie(key string, lines []string) bool { val, err := types.ExtractSingleBoolCommentTag("+", key, false, lines) if err != nil { - glog.Fatalf(err.Error()) + glog.Fatal(err) } return val } diff --git a/staging/src/k8s.io/kube-gen/cmd/informer-gen/generators/tags.go b/staging/src/k8s.io/kube-gen/cmd/informer-gen/generators/tags.go index 34aa77231f..afa2878152 100644 --- a/staging/src/k8s.io/kube-gen/cmd/informer-gen/generators/tags.go +++ b/staging/src/k8s.io/kube-gen/cmd/informer-gen/generators/tags.go @@ -27,7 +27,7 @@ import ( func extractBoolTagOrDie(key string, lines []string) bool { val, err := types.ExtractSingleBoolCommentTag("+", key, false, lines) if err != nil { - glog.Fatalf(err.Error()) + glog.Fatal(err) } return val } diff --git a/staging/src/k8s.io/kube-gen/cmd/lister-gen/generators/tags.go b/staging/src/k8s.io/kube-gen/cmd/lister-gen/generators/tags.go index 34aa77231f..afa2878152 100644 --- a/staging/src/k8s.io/kube-gen/cmd/lister-gen/generators/tags.go +++ b/staging/src/k8s.io/kube-gen/cmd/lister-gen/generators/tags.go @@ -27,7 +27,7 @@ import ( func extractBoolTagOrDie(key string, lines []string) bool { val, err := types.ExtractSingleBoolCommentTag("+", key, false, lines) if err != nil { - glog.Fatalf(err.Error()) + glog.Fatal(err) } return val } diff --git a/staging/src/k8s.io/sample-apiserver/BUILD b/staging/src/k8s.io/sample-apiserver/BUILD index 5e679ec979..1364b5658e 100644 --- a/staging/src/k8s.io/sample-apiserver/BUILD +++ b/staging/src/k8s.io/sample-apiserver/BUILD @@ -15,7 +15,8 @@ go_library( name = "go_default_library", srcs = ["main.go"], deps = [ - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library", "//vendor/k8s.io/sample-apiserver/pkg/cmd/server:go_default_library", ], diff --git a/staging/src/k8s.io/sample-apiserver/main.go b/staging/src/k8s.io/sample-apiserver/main.go index 08585c66d4..2c3c5a10d2 100644 --- a/staging/src/k8s.io/sample-apiserver/main.go +++ b/staging/src/k8s.io/sample-apiserver/main.go @@ -21,7 +21,9 @@ import ( "os" "runtime" - "k8s.io/apimachinery/pkg/util/wait" + "github.com/golang/glog" + + genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/util/logs" "k8s.io/sample-apiserver/pkg/cmd/server" ) @@ -34,9 +36,10 @@ func main() { runtime.GOMAXPROCS(runtime.NumCPU()) } - cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, wait.NeverStop) + stopCh := genericapiserver.SetupSignalHandler() + cmd := server.NewCommandStartWardleServer(os.Stdout, os.Stderr, stopCh) cmd.Flags().AddGoFlagSet(flag.CommandLine) if err := cmd.Execute(); err != nil { - panic(err) + glog.Fatal(err) } }