Pass context into all Executor functions

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 199424b608)
pull/4456/head
Brad Davidson 2021-09-13 15:20:03 -07:00 committed by Brad Davidson
parent 7364fe8cc6
commit 718d095ee7
4 changed files with 46 additions and 41 deletions

View File

@ -26,13 +26,13 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy
logs.InitLogs() logs.InitLogs()
defer logs.FlushLogs() defer logs.FlushLogs()
if err := startKubelet(&nodeConfig.AgentConfig); err != nil { if err := startKubelet(ctx, &nodeConfig.AgentConfig); err != nil {
return err return err
} }
go func() { go func() {
if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) { if !config.KubeProxyDisabled(ctx, nodeConfig, proxy) {
if err := startKubeProxy(&nodeConfig.AgentConfig); err != nil { if err := startKubeProxy(ctx, &nodeConfig.AgentConfig); err != nil {
logrus.Fatalf("Failed to start kube-proxy: %v", err) logrus.Fatalf("Failed to start kube-proxy: %v", err)
} }
} }
@ -41,20 +41,20 @@ func Agent(ctx context.Context, nodeConfig *daemonconfig.Node, proxy proxy.Proxy
return nil return nil
} }
func startKubeProxy(cfg *daemonconfig.Agent) error { func startKubeProxy(ctx context.Context, cfg *daemonconfig.Agent) error {
argsMap := kubeProxyArgs(cfg) argsMap := kubeProxyArgs(cfg)
args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeProxyArgs) args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeProxyArgs)
logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args)) logrus.Infof("Running kube-proxy %s", daemonconfig.ArgString(args))
return executor.KubeProxy(args) return executor.KubeProxy(ctx, args)
} }
func startKubelet(cfg *daemonconfig.Agent) error { func startKubelet(ctx context.Context, cfg *daemonconfig.Agent) error {
argsMap := kubeletArgs(cfg) argsMap := kubeletArgs(cfg)
args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeletArgs) args := daemonconfig.GetArgs(argsMap, cfg.ExtraKubeletArgs)
logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args)) logrus.Infof("Running kubelet %s", daemonconfig.ArgString(args))
return executor.Kubelet(args) return executor.Kubelet(ctx, args)
} }
// ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config // ImageCredProvAvailable checks to see if the kubelet image credential provider bin dir and config

View File

@ -63,12 +63,12 @@ func Server(ctx context.Context, cfg *config.Control) error {
} }
if !cfg.DisableScheduler { if !cfg.DisableScheduler {
if err := scheduler(cfg, runtime); err != nil { if err := scheduler(ctx, cfg, runtime); err != nil {
return err return err
} }
} }
if !cfg.DisableControllerManager { if !cfg.DisableControllerManager {
if err := controllerManager(cfg, runtime); err != nil { if err := controllerManager(ctx, cfg, runtime); err != nil {
return err return err
} }
} }
@ -82,7 +82,7 @@ func Server(ctx context.Context, cfg *config.Control) error {
return nil return nil
} }
func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) error { func controllerManager(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{ argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigController, "kubeconfig": runtime.KubeConfigController,
"service-account-private-key-file": runtime.ServiceKey, "service-account-private-key-file": runtime.ServiceKey,
@ -115,10 +115,10 @@ func controllerManager(cfg *config.Control, runtime *config.ControlRuntime) erro
args := config.GetArgs(argsMap, cfg.ExtraControllerArgs) args := config.GetArgs(argsMap, cfg.ExtraControllerArgs)
logrus.Infof("Running kube-controller-manager %s", config.ArgString(args)) logrus.Infof("Running kube-controller-manager %s", config.ArgString(args))
return executor.ControllerManager(runtime.APIServerReady, args) return executor.ControllerManager(ctx, runtime.APIServerReady, args)
} }
func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error { func scheduler(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
argsMap := map[string]string{ argsMap := map[string]string{
"kubeconfig": runtime.KubeConfigScheduler, "kubeconfig": runtime.KubeConfigScheduler,
"port": "10251", "port": "10251",
@ -133,7 +133,7 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error {
args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs) args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs)
logrus.Infof("Running kube-scheduler %s", config.ArgString(args)) logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(runtime.APIServerReady, args) return executor.Scheduler(ctx, runtime.APIServerReady, args)
} }
func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error { func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error {
@ -319,7 +319,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case err := <-promise(func() error { return checkForCloudControllerPrivileges(runtime, 5*time.Second) }): case err := <-promise(func() error { return checkForCloudControllerPrivileges(ctx, runtime, 5*time.Second) }):
if err != nil { if err != nil {
logrus.Infof("Waiting for cloud-controller-manager privileges to become available") logrus.Infof("Waiting for cloud-controller-manager privileges to become available")
continue continue
@ -329,10 +329,15 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control, runtime *c
} }
}() }()
return executor.CloudControllerManager(ccmRBACReady, args) return executor.CloudControllerManager(ctx, ccmRBACReady, args)
} }
func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout time.Duration) error { // checkForCloudControllerPrivileges makes a SubjectAccessReview request to the apiserver
// to validate that the embedded cloud controller manager has the required privileges,
// and does not return until the requested access is granted.
// If the CCM RBAC changes, the ResourceAttributes checked for by this function should
// be modified to check for the most recently added privilege.
func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin) restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin)
if err != nil { if err != nil {
return err return err

View File

@ -50,7 +50,7 @@ func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node,
return nil return nil
} }
func (*Embedded) Kubelet(args []string) error { func (*Embedded) Kubelet(ctx context.Context, args []string) error {
command := kubelet.NewKubeletCommand(context.Background()) command := kubelet.NewKubeletCommand(context.Background())
command.SetArgs(args) command.SetArgs(args)
@ -60,13 +60,13 @@ func (*Embedded) Kubelet(args []string) error {
logrus.Fatalf("kubelet panic: %v", err) logrus.Fatalf("kubelet panic: %v", err)
} }
}() }()
logrus.Fatalf("kubelet exited: %v", command.Execute()) logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil
} }
func (*Embedded) KubeProxy(args []string) error { func (*Embedded) KubeProxy(ctx context.Context, args []string) error {
command := proxy.NewProxyCommand() command := proxy.NewProxyCommand()
command.SetArgs(args) command.SetArgs(args)
@ -76,7 +76,7 @@ func (*Embedded) KubeProxy(args []string) error {
logrus.Fatalf("kube-proxy panic: %v", err) logrus.Fatalf("kube-proxy panic: %v", err)
} }
}() }()
logrus.Fatalf("kube-proxy exited: %v", command.Execute()) logrus.Fatalf("kube-proxy exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil
@ -98,13 +98,13 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args
logrus.Fatalf("apiserver panic: %v", err) logrus.Fatalf("apiserver panic: %v", err)
} }
}() }()
logrus.Fatalf("apiserver exited: %v", command.Execute()) logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil
} }
func (e *Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
command := sapp.NewSchedulerCommand() command := sapp.NewSchedulerCommand()
command.SetArgs(args) command.SetArgs(args)
@ -127,13 +127,13 @@ func (e *Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {
logrus.Fatalf("scheduler panic: %v", err) logrus.Fatalf("scheduler panic: %v", err)
} }
}() }()
logrus.Fatalf("scheduler exited: %v", command.Execute()) logrus.Fatalf("scheduler exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil
} }
func (*Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error { func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
command := cmapp.NewControllerManagerCommand() command := cmapp.NewControllerManagerCommand()
command.SetArgs(args) command.SetArgs(args)
@ -144,13 +144,13 @@ func (*Embedded) ControllerManager(apiReady <-chan struct{}, args []string) erro
logrus.Fatalf("controller-manager panic: %v", err) logrus.Fatalf("controller-manager panic: %v", err)
} }
}() }()
logrus.Fatalf("controller-manager exited: %v", command.Execute()) logrus.Fatalf("controller-manager exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil
} }
func (*Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions() ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
if err != nil { if err != nil {
logrus.Fatalf("unable to initialize command options: %v", err) logrus.Fatalf("unable to initialize command options: %v", err)
@ -187,7 +187,7 @@ func (*Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []str
logrus.Fatalf("cloud-controller-manager panic: %v", err) logrus.Fatalf("cloud-controller-manager panic: %v", err)
} }
}() }()
logrus.Fatalf("cloud-controller-manager exited: %v", command.Execute()) logrus.Errorf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx))
}() }()
return nil return nil

View File

@ -20,15 +20,15 @@ var (
type Executor interface { type Executor interface {
Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error
Kubelet(args []string) error Kubelet(ctx context.Context, args []string) error
KubeProxy(args []string) error KubeProxy(ctx context.Context, args []string) error
APIServerHandlers() (authenticator.Request, http.Handler, error) APIServerHandlers() (authenticator.Request, http.Handler, error)
APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error
Scheduler(apiReady <-chan struct{}, args []string) error Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
ControllerManager(apiReady <-chan struct{}, args []string) error ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error) CurrentETCDOptions() (InitialOptions, error)
ETCD(ctx context.Context, args ETCDConfig) error ETCD(ctx context.Context, args ETCDConfig) error
CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
} }
type ETCDConfig struct { type ETCDConfig struct {
@ -90,12 +90,12 @@ func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agen
return executor.Bootstrap(ctx, nodeConfig, cfg) return executor.Bootstrap(ctx, nodeConfig, cfg)
} }
func Kubelet(args []string) error { func Kubelet(ctx context.Context, args []string) error {
return executor.Kubelet(args) return executor.Kubelet(ctx, args)
} }
func KubeProxy(args []string) error { func KubeProxy(ctx context.Context, args []string) error {
return executor.KubeProxy(args) return executor.KubeProxy(ctx, args)
} }
func APIServerHandlers() (authenticator.Request, http.Handler, error) { func APIServerHandlers() (authenticator.Request, http.Handler, error) {
@ -106,12 +106,12 @@ func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) er
return executor.APIServer(ctx, etcdReady, args) return executor.APIServer(ctx, etcdReady, args)
} }
func Scheduler(apiReady <-chan struct{}, args []string) error { func Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.Scheduler(apiReady, args) return executor.Scheduler(ctx, apiReady, args)
} }
func ControllerManager(apiReady <-chan struct{}, args []string) error { func ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error {
return executor.ControllerManager(apiReady, args) return executor.ControllerManager(ctx, apiReady, args)
} }
func CurrentETCDOptions() (InitialOptions, error) { func CurrentETCDOptions() (InitialOptions, error) {
@ -122,6 +122,6 @@ func ETCD(ctx context.Context, args ETCDConfig) error {
return executor.ETCD(ctx, args) return executor.ETCD(ctx, args)
} }
func CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
return executor.CloudControllerManager(ccmRBACReady, args) return executor.CloudControllerManager(ctx, ccmRBACReady, args)
} }