Pass context into all Executor functions

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/4014/head
Brad Davidson 2021-09-13 15:20:03 -07:00 committed by Brad Davidson
parent 137e80cd86
commit 199424b608
4 changed files with 46 additions and 41 deletions

View File

@ -26,13 +26,13 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy
logs.InitLogs()
defer logs.FlushLogs()
if err := startKubelet(&nodeConfig.AgentConfig); err != nil {
if err := startKubelet(ctx, &nodeConfig.AgentConfig); err != nil {
return err
}
go func() {
if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) {
if err := startKubeProxy(&nodeConfig.AgentConfig); err != nil {
if err := startKubeProxy(ctx, &nodeConfig.AgentConfig); err != nil {
logrus.Fatalf("Failed to start kube-proxy: %v", err)
}
}
@ -41,20 +41,20 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy
return nil
}
func startKubeProxy(cfg *daemonconfig.Agent) error {
func startKubeProxy(ctx context.Context, cfg *daemonconfig.Agent) error {
argsMap := kubeProxyArgs(cfg)
args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeProxyArgs)
logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args))
return executor.KubeProxy(args)
return executor.KubeProxy(ctx, args)
}
func startKubelet(cfg *daemonconfig.Agent) error {
func startKubelet(ctx context.Context, cfg *daemonconfig.Agent) error {
argsMap := kubeletArgs(cfg)
args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeletArgs)
logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args))
return executor.Kubelet(args)
return executor.Kubelet(ctx, args)
}
// ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config

View File

@ -71,12 +71,12 @@ func Server(ctx context.Context, cfg *config.Control) error {
runtime.Handler = handler
if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil {
if err := scheduler(ctx, cfg, runtime); err != nil {
return err
}
}
if !cfg.DisableControllerManager {
if err := controllerManager(cfg, runtime); err != nil {
if err := controllerManager(ctx, cfg, runtime); err != nil {
return err
}
}
@ -90,7 +90,7 @@ func Server(ctx context.Context, cfg *config.Control) error {
return nil
}
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error {
func controllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigController,
"service-account-private-key-file": runtime.ServiceKey,
@ -121,10 +121,10 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) erro
args := config.GetArgs(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
return executor.ControllerManager(runtime.APIServerReady, args)
return executor.ControllerManager(ctx, runtime.APIServerReady, args)
}
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
func scheduler(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigScheduler,
"bind-address": localhostIP.String(),
@ -137,7 +137,7 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs)
logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(runtime.APIServerReady, args)
return executor.Scheduler(ctx, runtime.APIServerReady, args)
}
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) {
@ -323,7 +323,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
select {
case <-ctx.Done():
return
case err := <-promise(func() error { return checkForCloudControllerPrivileges(runtime, 5*time.Second) }):
case err := <-promise(func() error { return checkForCloudControllerPrivileges(ctx, runtime, 5*time.Second) }):
if err != nil {
logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
continue
@ -333,10 +333,15 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
}
}()
return executor.CloudControllerManager(ccmRBACReady, args)
return executor.CloudControllerManager(ctx, ccmRBACReady, args)
}
func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout time.Duration) error {
// checkForCloudControllerPrivileges makes a SubjectAccessReview request to the apiserver
// to validate that the embedded cloud controller manager has the required privileges,
// and does not return until the requested access is granted.
// If the CCM RBAC changes, the ResourceAttributes checked for by this function should
// be modified to check for the most recently added privilege.
func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil {
return err

View File

@ -38,7 +38,7 @@ func (Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cf
return nil
}
func (Embedded) Kubelet(args []string) error {
func (Embedded) Kubelet(ctx context.Context, args []string) error {
command := kubelet.NewKubeletCommand(context.Background())
command.SetArgs(args)
@ -48,13 +48,13 @@ func (Embedded) Kubelet(args []string) error {
logrus.Fatalf("kubelet panic: %v", err)
}
}()
logrus.Fatalf("kubelet exited: %v", command.Execute())
logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx))
}()
return nil
}
func (Embedded) KubeProxy(args []string) error {
func (Embedded) KubeProxy(ctx context.Context, args []string) error {
command := proxy.NewProxyCommand()
command.SetArgs(args)
@ -64,7 +64,7 @@ func (Embedded) KubeProxy(args []string) error {
logrus.Fatalf("kube-proxy panic: %v", err)
}
}()
logrus.Fatalf("kube-proxy exited: %v", command.Execute())
logrus.Fatalf("kube-proxy exited: %v", command.ExecuteContext(ctx))
}()
return nil
@ -81,14 +81,14 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [
logrus.Fatalf("apiserver panic: %v", err)
}
}()
logrus.Fatalf("apiserver exited: %v", command.Execute())
logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx))
}()
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}
func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {
func (Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
command := sapp.NewSchedulerCommand()
command.SetArgs(args)
@ -99,13 +99,13 @@ func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {
logrus.Fatalf("scheduler panic: %v", err)
}
}()
logrus.Fatalf("scheduler exited: %v", command.Execute())
logrus.Fatalf("scheduler exited: %v", command.ExecuteContext(ctx))
}()
return nil
}
func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error {
func (Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)
@ -116,13 +116,13 @@ func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error
logrus.Fatalf("controller-manager panic: %v", err)
}
}()
logrus.Fatalf("controller-manager exited: %v", command.Execute())
logrus.Fatalf("controller-manager exited: %v", command.ExecuteContext(ctx))
}()
return nil
}
func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
func (Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
if err != nil {
logrus.Fatalf("unable to initialize command options: %v", err)
@ -159,7 +159,7 @@ func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []stri
logrus.Fatalf("cloud-controller-manager panic: %v", err)
}
}()
logrus.Fatalf("cloud-controller-manager exited: %v", command.Execute())
logrus.Errorf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx))
}()
return nil

View File

@ -20,14 +20,14 @@ var (
type Executor interface {
Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error
Kubelet(args []string) error
KubeProxy(args []string) error
Kubelet(ctx context.Context, args []string) error
KubeProxy(ctx context.Context, args []string) error
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error)
Scheduler(apiReady <-chan struct{}, args []string) error
ControllerManager(apiReady <-chan struct{}, args []string) error
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error)
ETCD(ctx context.Context, args ETCDConfig) error
CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error
CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
}
type ETCDConfig struct {
@ -89,24 +89,24 @@ func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agen
return executor.Bootstrap(ctx, nodeConfig, cfg)
}
func Kubelet(args []string) error {
return executor.Kubelet(args)
func Kubelet(ctx context.Context, args []string) error {
return executor.Kubelet(ctx, args)
}
func KubeProxy(args []string) error {
return executor.KubeProxy(args)
func KubeProxy(ctx context.Context, args []string) error {
return executor.KubeProxy(ctx, args)
}
func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) {
return executor.APIServer(ctx, etcdReady, args)
}
func Scheduler(apiReady <-chan struct{}, args []string) error {
return executor.Scheduler(apiReady, args)
func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.Scheduler(ctx, apiReady, args)
}
func ControllerManager(apiReady <-chan struct{}, args []string) error {
return executor.ControllerManager(apiReady, args)
func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.ControllerManager(ctx, apiReady, args)
}
func CurrentETCDOptions() (InitialOptions, error) {
@ -117,6 +117,6 @@ func ETCD(ctx context.Context, args ETCDConfig) error {
return executor.ETCD(ctx, args)
}
func CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
return executor.CloudControllerManager(ccmRBACReady, args)
func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
return executor.CloudControllerManager(ctx, ccmRBACReady, args)
}