Remove signal handler registration from pkg/kubelet

The goal of this change is to remove the registration of signal
handling from pkg/kubelet. We now pass in a stop channel.

If you register a signal handler in `main()` to aid in a controlled
and deliberate exit then the handler registered in `pkg/kubelet` often
wins and the process exits immediately. This means all other signal
handler registrations are currently racy if `DockerServer.Start()` is
directly or indirectly invoked.

This change also removes another signal handler registration from
`NewAPIServerCommand()`; a stop channel is now passed to this
function.
pull/8/head
Andrew McDermott 2018-05-15 13:29:05 +01:00
parent 972a74e238
commit 9cbd54018f
17 changed files with 72 additions and 42 deletions

View File

@ -31,6 +31,7 @@ go_library(
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/cobra/doc:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"github.com/spf13/cobra/doc"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
"k8s.io/kubernetes/cmd/genutils"
apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
@ -53,7 +54,7 @@ func main() {
switch module {
case "kube-apiserver":
// generate docs for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand()
apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(apiserver, outDir)
case "kube-controller-manager":
// generate docs for kube-controller-manager
@ -73,7 +74,7 @@ func main() {
doc.GenMarkdownTree(scheduler, outDir)
case "kubelet":
// generate docs for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalHandler())
doc.GenMarkdownTree(kubelet, outDir)
case "kubeadm":
// resets global flags created by kubelet or other commands e.g.

View File

@ -28,6 +28,7 @@ go_library(
"//vendor/github.com/cpuguy83/go-md2man/md2man:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
],
)

View File

@ -26,6 +26,7 @@ import (
mangen "github.com/cpuguy83/go-md2man/md2man"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
ccmapp "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
"k8s.io/kubernetes/cmd/genutils"
apiservapp "k8s.io/kubernetes/cmd/kube-apiserver/app"
@ -62,7 +63,7 @@ func main() {
switch module {
case "kube-apiserver":
// generate manpage for kube-apiserver
apiserver := apiservapp.NewAPIServerCommand()
apiserver := apiservapp.NewAPIServerCommand(server.SetupSignalHandler())
genMarkdown(apiserver, "", outDir)
for _, c := range apiserver.Commands() {
genMarkdown(c, "kube-apiserver", outDir)
@ -97,7 +98,7 @@ func main() {
}
case "kubelet":
// generate manpage for kubelet
kubelet := kubeletapp.NewKubeletCommand()
kubelet := kubeletapp.NewKubeletCommand(server.SetupSignalHandler())
genMarkdown(kubelet, "", outDir)
for _, c := range kubelet.Commands() {
genMarkdown(c, "kubelet", outDir)

View File

@ -29,6 +29,7 @@ go_library(
"//pkg/version/prometheus:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
],

View File

@ -32,6 +32,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
cloudcontrollermanager "k8s.io/kubernetes/cmd/cloud-controller-manager/app"
@ -48,7 +49,7 @@ import (
func main() {
rand.Seed(time.Now().UTC().UnixNano())
hyperkubeCommand, allCommandFns := NewHyperKubeCommand()
hyperkubeCommand, allCommandFns := NewHyperKubeCommand(server.SetupSignalHandler())
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
@ -82,12 +83,12 @@ func commandFor(basename string, defaultCommand *cobra.Command, commands []func(
return defaultCommand
}
// NewCmdRequestProject implement the OpenShift cli RequestProject command.
func NewHyperKubeCommand() (*cobra.Command, []func() *cobra.Command) {
// NewHyperKubeCommand is the entry point for hyperkube
func NewHyperKubeCommand(stopCh <-chan struct{}) (*cobra.Command, []func() *cobra.Command) {
// these have to be functions since the command is polymorphic. Cobra wants you to be top level
// command to get executed
apiserver := func() *cobra.Command {
ret := kubeapiserver.NewAPIServerCommand()
ret := kubeapiserver.NewAPIServerCommand(stopCh)
// add back some unfortunate aliases that should be removed
ret.Aliases = []string{"apiserver"}
return ret
@ -111,7 +112,7 @@ func NewHyperKubeCommand() (*cobra.Command, []func() *cobra.Command) {
return ret
}
kubectlCmd := func() *cobra.Command { return kubectl.NewDefaultKubectlCommand() }
kubelet := func() *cobra.Command { return kubelet.NewKubeletCommand() }
kubelet := func() *cobra.Command { return kubelet.NewKubeletCommand(stopCh) }
cloudController := func() *cobra.Command { return cloudcontrollermanager.NewCloudControllerManagerCommand() }
commandFns := []func() *cobra.Command{

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//vendor/github.com/spf13/pflag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
],

View File

@ -27,6 +27,7 @@ import (
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
@ -37,7 +38,7 @@ import (
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewAPIServerCommand()
command := app.NewAPIServerCommand(server.SetupSignalHandler())
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the

View File

@ -48,7 +48,6 @@ import (
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/server"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/filters"
serveroptions "k8s.io/apiserver/pkg/server/options"
@ -109,7 +108,7 @@ const etcdRetryLimit = 60
const etcdRetryInterval = 1 * time.Second
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
func NewAPIServerCommand(stopCh <-chan struct{}) *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
@ -132,7 +131,6 @@ cluster's shared state through which all other components interact.`,
return utilerrors.NewAggregate(errs)
}
stopCh := server.SetupSignalHandler()
return Run(completedOptions, stopCh)
},
}

View File

@ -21,6 +21,7 @@ go_library(
"//cmd/kubelet/app:go_default_library",
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/logs:go_default_library",
],
)

View File

@ -103,7 +103,7 @@ const (
)
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
@ -248,14 +248,15 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig); err != nil {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
glog.Fatal(err)
}
return
}
// run the kubelet
glog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps); err != nil {
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
glog.Fatal(err)
}
},
@ -399,13 +400,13 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps); err != nil {
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
@ -462,7 +463,7 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
}
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
@ -717,7 +718,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
glog.Warning(err)
}
if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce); err != nil {
if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce, stopCh); err != nil {
return err
}
@ -738,7 +739,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")
<-done
select {
case <-done:
break
case <-stopCh:
break
}
return nil
}
@ -877,7 +884,7 @@ func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config)
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool, stopCh <-chan struct{}) error {
hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
@ -950,7 +957,8 @@ func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *kubeletconfiginternal.
kubeFlags.KeepTerminatedPodVolumes,
kubeFlags.NodeLabels,
kubeFlags.SeccompProfileRoot,
kubeFlags.BootstrapCheckpointPath)
kubeFlags.BootstrapCheckpointPath,
stopCh)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
@ -1034,7 +1042,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (k kubelet.Bootstrap, err error) {
bootstrapCheckpointPath string,
stopCh <-chan struct{}) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
@ -1067,7 +1076,8 @@ func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes,
nodeLabels,
seccompProfileRoot,
bootstrapCheckpointPath)
bootstrapCheckpointPath,
stopCh)
if err != nil {
return nil, err
}
@ -1130,7 +1140,7 @@ func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamic
// RunDockershim only starts the dockershim in current process. This is only used for cri validate testing purpose
// TODO(random-liu): Move this to a separate binary.
func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration) error {
func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConfiguration, stopCh <-chan struct{}) error {
r := &f.ContainerRuntimeOptions
// Initialize docker client configuration.
@ -1167,11 +1177,23 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf
}
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
if err := server.Start(stopCh); err != nil {
return err
}
// Start the streaming server
addr := net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port)))
return http.ListenAndServe(addr, ds)
streamingServer := &http.Server{
Addr: net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))),
Handler: ds,
}
go func() {
<-stopCh
streamingServer.Shutdown(context.Background())
}()
// Start the streaming server
if err := streamingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"os"
"time"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/logs"
"k8s.io/kubernetes/cmd/kubelet/app"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
@ -35,7 +36,7 @@ import (
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewKubeletCommand()
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()

View File

@ -13,7 +13,6 @@ go_library(
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//pkg/util/interrupt:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],

View File

@ -21,11 +21,9 @@ import (
"github.com/golang/glog"
"google.golang.org/grpc"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/util/interrupt"
)
// maxMsgSize use 8MB as the default message size limit.
@ -51,7 +49,7 @@ func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
}
// Start starts the dockershim grpc server.
func (s *DockerServer) Start() error {
func (s *DockerServer) Start(stopCh <-chan struct{}) error {
// Start the internal service.
if err := s.service.Start(); err != nil {
glog.Errorf("Unable to start docker service")
@ -71,13 +69,14 @@ func (s *DockerServer) Start() error {
runtimeapi.RegisterRuntimeServiceServer(s.server, s.service)
runtimeapi.RegisterImageServiceServer(s.server, s.service)
go func() {
// Use interrupt handler to make sure the server to be stopped properly.
h := interrupt.New(nil, s.Stop)
err := h.Run(func() error { return s.server.Serve(l) })
if err != nil {
if err := s.server.Serve(l); err != nil {
glog.Errorf("Failed to serve connections: %v", err)
}
}()
go func() {
<-stopCh
s.Stop()
}()
return nil
}

View File

@ -343,7 +343,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
seccompProfileRoot string,
bootstrapCheckpointPath string) (*Kubelet, error) {
bootstrapCheckpointPath string,
stopCh <-chan struct{}) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
@ -619,9 +620,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
remoteRuntimeEndpoint,
remoteImageEndpoint)
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
if err := server.Start(stopCh); err != nil {
return nil, err
}

View File

@ -46,6 +46,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@ -20,6 +20,7 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
kubeletapp "k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/cmd/kubelet/app/options"
@ -89,7 +90,7 @@ func NewHollowKubelet(
// Starts this HollowKubelet and blocks.
func (hk *HollowKubelet) Run() {
if err := kubeletapp.RunKubelet(hk.KubeletFlags, hk.KubeletConfiguration, hk.KubeletDeps, false); err != nil {
if err := kubeletapp.RunKubelet(hk.KubeletFlags, hk.KubeletConfiguration, hk.KubeletDeps, false, wait.NeverStop); err != nil {
glog.Fatalf("Failed to run HollowKubelet: %v. Exiting.", err)
}
select {}