diff --git a/pkg/daemons/agent/agent.go b/pkg/daemons/agent/agent.go index c37d7c3e85..3ed92e3a44 100644 --- a/pkg/daemons/agent/agent.go +++ b/pkg/daemons/agent/agent.go @@ -26,13 +26,13 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy logs.InitLogs() defer logs.FlushLogs() - if err := startKubelet(&nodeConfig.AgentConfig); err != nil { + if err := startKubelet(ctx, &nodeConfig.AgentConfig); err != nil { return err } go func() { if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) { - if err := startKubeProxy(&nodeConfig.AgentConfig); err != nil { + if err := startKubeProxy(ctx, &nodeConfig.AgentConfig); err != nil { logrus.Fatalf("Failed to start kube-proxy: %v", err) } } @@ -41,20 +41,20 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy return nil } -func startKubeProxy(cfg *daemonconfig.Agent) error { +func startKubeProxy(ctx context.Context, cfg *daemonconfig.Agent) error { argsMap := kubeProxyArgs(cfg) args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeProxyArgs) logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args)) - return executor.KubeProxy(args) + return executor.KubeProxy(ctx, args) } -func startKubelet(cfg *daemonconfig.Agent) error { +func startKubelet(ctx context.Context, cfg *daemonconfig.Agent) error { argsMap := kubeletArgs(cfg) args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeletArgs) logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args)) - return executor.Kubelet(args) + return executor.Kubelet(ctx, args) } // ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 7b8bfcc95a..0ca8904fdd 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -71,12 +71,12 @@ func Server(ctx context.Context, cfg *config.Control) error { runtime.Handler = handler if !cfg.DisableScheduler { - if err := scheduler(cfg, runtime); err != nil { + if err := scheduler(ctx, cfg, runtime); err != nil { return err } } if !cfg.DisableControllerManager { - if err := controllerManager(cfg, runtime); err != nil { + if err := controllerManager(ctx, cfg, runtime); err != nil { return err } } @@ -90,7 +90,7 @@ func Server(ctx context.Context, cfg *config.Control) error { return nil } -func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error { +func controllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error { argsMap := map[string]string{ "kubeconfig": runtime.KubeConfigController, "service-account-private-key-file": runtime.ServiceKey, @@ -121,10 +121,10 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) erro args := config.GetArgs(argsMap, cfg.ExtraControllerArgs) logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) - return executor.ControllerManager(runtime.APIServerReady, args) + return executor.ControllerManager(ctx, runtime.APIServerReady, args) } -func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error { +func scheduler(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error { argsMap := map[string]string{ "kubeconfig": runtime.KubeConfigScheduler, "bind-address": localhostIP.String(), @@ -137,7 +137,7 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error { args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs) logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) - return executor.Scheduler(runtime.APIServerReady, args) + return executor.Scheduler(ctx, runtime.APIServerReady, args) } func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) { @@ -323,7 +323,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c select { case <-ctx.Done(): return - case err := <-promise(func() error { return checkForCloudControllerPrivileges(runtime, 5*time.Second) }): + case err := <-promise(func() error { return checkForCloudControllerPrivileges(ctx, runtime, 5*time.Second) }): if err != nil { logrus.Infof("Waiting for cloud-controller-manager privileges to become available") continue @@ -333,10 +333,15 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c } }() - return executor.CloudControllerManager(ccmRBACReady, args) + return executor.CloudControllerManager(ctx, ccmRBACReady, args) } -func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout time.Duration) error { +// checkForCloudControllerPrivileges makes a SubjectAccessReview request to the apiserver +// to validate that the embedded cloud controller manager has the required privileges, +// and does not return until the requested access is granted. +// If the CCM RBAC changes, the ResourceAttributes checked for by this function should +// be modified to check for the most recently added privilege. +func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error { restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin) if err != nil { return err diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index d919b9cdc6..fdc13c41b8 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -38,7 +38,7 @@ func (Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cf return nil } -func (Embedded) Kubelet(args []string) error { +func (Embedded) Kubelet(ctx context.Context, args []string) error { command := kubelet.NewKubeletCommand(context.Background()) command.SetArgs(args) @@ -48,13 +48,13 @@ func (Embedded) Kubelet(args []string) error { logrus.Fatalf("kubelet panic: %v", err) } }() - logrus.Fatalf("kubelet exited: %v", command.Execute()) + logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx)) }() return nil } -func (Embedded) KubeProxy(args []string) error { +func (Embedded) KubeProxy(ctx context.Context, args []string) error { command := proxy.NewProxyCommand() command.SetArgs(args) @@ -64,7 +64,7 @@ func (Embedded) KubeProxy(args []string) error { logrus.Fatalf("kube-proxy panic: %v", err) } }() - logrus.Fatalf("kube-proxy exited: %v", command.Execute()) + logrus.Fatalf("kube-proxy exited: %v", command.ExecuteContext(ctx)) }() return nil @@ -81,14 +81,14 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [ logrus.Fatalf("apiserver panic: %v", err) } }() - logrus.Fatalf("apiserver exited: %v", command.Execute()) + logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx)) }() startupConfig := <-app.StartupConfig return startupConfig.Authenticator, startupConfig.Handler, nil } -func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { +func (Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { command := sapp.NewSchedulerCommand() command.SetArgs(args) @@ -99,13 +99,13 @@ func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { logrus.Fatalf("scheduler panic: %v", err) } }() - logrus.Fatalf("scheduler exited: %v", command.Execute()) + logrus.Fatalf("scheduler exited: %v", command.ExecuteContext(ctx)) }() return nil } -func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error { +func (Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { command := cmapp.NewControllerManagerCommand() command.SetArgs(args) @@ -116,13 +116,13 @@ func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error logrus.Fatalf("controller-manager panic: %v", err) } }() - logrus.Fatalf("controller-manager exited: %v", command.Execute()) + logrus.Fatalf("controller-manager exited: %v", command.ExecuteContext(ctx)) }() return nil } -func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { +func (Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error { ccmOptions, err := ccmopt.NewCloudControllerManagerOptions() if err != nil { logrus.Fatalf("unable to initialize command options: %v", err) @@ -159,7 +159,7 @@ func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []stri logrus.Fatalf("cloud-controller-manager panic: %v", err) } }() - logrus.Fatalf("cloud-controller-manager exited: %v", command.Execute()) + logrus.Errorf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx)) }() return nil diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go index 25286013b3..591dee0ff9 100644 --- a/pkg/daemons/executor/executor.go +++ b/pkg/daemons/executor/executor.go @@ -20,14 +20,14 @@ var ( type Executor interface { Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error - Kubelet(args []string) error - KubeProxy(args []string) error + Kubelet(ctx context.Context, args []string) error + KubeProxy(ctx context.Context, args []string) error APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) - Scheduler(apiReady <-chan struct{}, args []string) error - ControllerManager(apiReady <-chan struct{}, args []string) error + Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error + ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error CurrentETCDOptions() (InitialOptions, error) ETCD(ctx context.Context, args ETCDConfig) error - CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error + CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error } type ETCDConfig struct { @@ -89,24 +89,24 @@ func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agen return executor.Bootstrap(ctx, nodeConfig, cfg) } -func Kubelet(args []string) error { - return executor.Kubelet(args) +func Kubelet(ctx context.Context, args []string) error { + return executor.Kubelet(ctx, args) } -func KubeProxy(args []string) error { - return executor.KubeProxy(args) +func KubeProxy(ctx context.Context, args []string) error { + return executor.KubeProxy(ctx, args) } func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) { return executor.APIServer(ctx, etcdReady, args) } -func Scheduler(apiReady <-chan struct{}, args []string) error { - return executor.Scheduler(apiReady, args) +func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error { + return executor.Scheduler(ctx, apiReady, args) } -func ControllerManager(apiReady <-chan struct{}, args []string) error { - return executor.ControllerManager(apiReady, args) +func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error { + return executor.ControllerManager(ctx, apiReady, args) } func CurrentETCDOptions() (InitialOptions, error) { @@ -117,6 +117,6 @@ func ETCD(ctx context.Context, args ETCDConfig) error { return executor.ETCD(ctx, args) } -func CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { - return executor.CloudControllerManager(ccmRBACReady, args) +func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error { + return executor.CloudControllerManager(ctx, ccmRBACReady, args) }