diff --git a/pkg/agent/run.go b/pkg/agent/run.go index dccee1f157..41fee59cb0 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -115,6 +115,14 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { } } + // the agent runtime is ready to host workloads when containerd is up and the airgap + // images have finished loading, as that portion of startup may block for an arbitrary + // amount of time depending on how long it takes to import whatever the user has placed + // in the images directory. + if cfg.AgentReady != nil { + close(cfg.AgentReady) + } + notifySocket := os.Getenv("NOTIFY_SOCKET") os.Unsetenv("NOTIFY_SOCKET") diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index 489a21c653..fb6806d2ad 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -53,6 +53,9 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { return err } + // Do an immediate fill of proxy addresses from the server endpoint list, before going into the + // watch loop. This will fail on the first server, as the apiserver won't be started yet - but + // that's fine because the local server is already seeded into the proxy address list. endpoint, _ := client.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) if endpoint != nil { addresses := util.GetAddresses(endpoint) @@ -61,8 +64,9 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { } } + // Attempt to connect to supervisors, storing their cancellation function for later when we + // need to disconnect. disconnect := map[string]context.CancelFunc{} - wg := &sync.WaitGroup{} for _, address := range proxy.SupervisorAddresses() { if _, ok := disconnect[address]; !ok { @@ -70,7 +74,11 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { } } + // Once the apiserver is up, go into a watch loop, adding and removing tunnels as endpoints come + // and go from the cluster. We go into a faster but noisier connect loop if the watch fails + // following a successful connection. go func() { + util.WaitForAPIServerReady(client, 30*time.Second) connect: for { time.Sleep(5 * time.Second) diff --git a/pkg/cli/cmds/agent.go b/pkg/cli/cmds/agent.go index c06d0bc767..2158b0b9a4 100644 --- a/pkg/cli/cmds/agent.go +++ b/pkg/cli/cmds/agent.go @@ -46,6 +46,7 @@ type Agent struct { Taints cli.StringSlice ImageCredProvBinDir string ImageCredProvConfig string + AgentReady chan<- struct{} AgentShared } diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index f797fd858a..e344fd80d7 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -16,6 +16,7 @@ import ( "github.com/rancher/k3s/pkg/agent/loadbalancer" "github.com/rancher/k3s/pkg/cli/cmds" "github.com/rancher/k3s/pkg/clientaccess" + "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/datadir" "github.com/rancher/k3s/pkg/etcd" "github.com/rancher/k3s/pkg/netutil" @@ -83,8 +84,11 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont cfg.Token = cfg.ClusterSecret } + agentReady := make(chan struct{}) + serverConfig := server.Config{} serverConfig.DisableAgent = cfg.DisableAgent + serverConfig.ControlConfig.Runtime = &config.ControlRuntime{AgentReady: agentReady} serverConfig.ControlConfig.Token = cfg.Token serverConfig.ControlConfig.AgentToken = cfg.AgentToken serverConfig.ControlConfig.JoinURL = cfg.ServerURL @@ -411,6 +415,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont }() if cfg.DisableAgent { + close(agentReady) <-ctx.Done() return nil } @@ -427,6 +432,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont } agentConfig := cmds.AgentConfig + agentConfig.AgentReady = agentReady agentConfig.Debug = app.GlobalBool("debug") agentConfig.DataDir = filepath.Dir(serverConfig.ControlConfig.DataDir) agentConfig.ServerURL = url diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 4e4d328579..c15d33d4b6 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -199,6 +199,7 @@ type ControlRuntime struct { HTTPBootstrap bool APIServerReady <-chan struct{} + AgentReady <-chan struct{} ETCDReady <-chan struct{} ClusterControllerStart func(ctx context.Context) error LeaderElectedClusterControllerStart func(ctx context.Context) error @@ -218,6 +219,7 @@ type ControlRuntime struct { ServingKubeletKey string ServerToken string AgentToken string + APIServer http.Handler Handler http.Handler Tunnel http.Handler Authenticator authenticator.Request diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 96a5a647da..3db4c0980c 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -4,7 +4,6 @@ import ( "context" "math/rand" "net" - "net/http" "os" "path/filepath" "strconv" @@ -23,7 +22,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes" @@ -37,9 +35,7 @@ var localhostIP = net.ParseIP("127.0.0.1") func Server(ctx context.Context, cfg *config.Control) error { rand.Seed(time.Now().UTC().UnixNano()) - - runtime := &config.ControlRuntime{} - cfg.Runtime = runtime + runtime := cfg.Runtime if err := prepare(ctx, cfg, runtime); err != nil { return errors.Wrap(err, "preparing server") @@ -48,13 +44,16 @@ func Server(ctx context.Context, cfg *config.Control) error { cfg.Runtime.Tunnel = setupTunnel() proxyutil.DisableProxyHostnameCheck = true - var auth authenticator.Request - var handler http.Handler - var err error + basicAuth, err := basicAuthenticator(runtime.PasswdFile) + if err != nil { + return err + } + runtime.Authenticator = basicAuth if !cfg.DisableAPIServer { - auth, handler, err = apiServer(ctx, cfg, runtime) - if err != nil { + go waitForAPIServerHandlers(ctx, runtime) + + if err := apiServer(ctx, cfg, runtime); err != nil { return err } @@ -62,13 +61,6 @@ func Server(ctx context.Context, cfg *config.Control) error { return err } } - basicAuth, err := basicAuthenticator(runtime.PasswdFile) - if err != nil { - return err - } - - runtime.Authenticator = combineAuthenticators(basicAuth, auth) - runtime.Handler = handler if !cfg.DisableScheduler { if err := scheduler(cfg, runtime); err != nil { @@ -144,7 +136,7 @@ func scheduler(cfg *config.Control, runtime *config.ControlRuntime) error { return executor.Scheduler(runtime.APIServerReady, args) } -func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) (authenticator.Request, http.Handler, error) { +func apiServer(ctx context.Context, cfg *config.Control, runtime *config.ControlRuntime) error { argsMap := make(map[string]string) setupStorageBackend(argsMap, cfg) @@ -418,6 +410,15 @@ func checkForCloudControllerPrivileges(runtime *config.ControlRuntime, timeout t return nil } +func waitForAPIServerHandlers(ctx context.Context, runtime *config.ControlRuntime) { + auth, handler, err := executor.APIServerHandlers() + if err != nil { + logrus.Fatalf("Failed to get request handlers from apiserver: %v", err) + } + runtime.Authenticator = combineAuthenticators(runtime.Authenticator, auth) + runtime.APIServer = handler +} + func waitForAPIServerInBackground(ctx context.Context, runtime *config.ControlRuntime) error { restConfig, err := clientcmd.BuildConfigFromFlags("", runtime.KubeConfigAdmin) if err != nil { diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index d89639c9a3..f4d908712e 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -60,17 +60,21 @@ func (Embedded) KubeProxy(args []string) error { return nil } -func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) { - <-etcdReady +func (Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) { + startupConfig := <-app.StartupConfig + return startupConfig.Authenticator, startupConfig.Handler, nil +} + +func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error { command := app.NewAPIServerCommand(ctx.Done()) command.SetArgs(args) go func() { + <-etcdReady logrus.Fatalf("apiserver exited: %v", command.Execute()) }() - startupConfig := <-app.StartupConfig - return startupConfig.Authenticator, startupConfig.Handler, nil + return nil } func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { diff --git a/pkg/daemons/executor/etcd.go b/pkg/daemons/executor/etcd.go index 0cb9dbb274..5ac204606c 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/daemons/executor/etcd.go @@ -28,7 +28,7 @@ func (e Embedded) ETCD(args ETCDConfig) error { } etcd, err := embed.StartEtcd(cfg) if err != nil { - return nil + return err } go func() { diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go index 01a21080bc..1b4004dccc 100644 --- a/pkg/daemons/executor/executor.go +++ b/pkg/daemons/executor/executor.go @@ -22,7 +22,8 @@ type Executor interface { Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error Kubelet(args []string) error KubeProxy(args []string) error - APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) + APIServerHandlers() (authenticator.Request, http.Handler, error) + APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error Scheduler(apiReady <-chan struct{}, args []string) error ControllerManager(apiReady <-chan struct{}, args []string) error CurrentETCDOptions() (InitialOptions, error) @@ -97,7 +98,11 @@ func KubeProxy(args []string) error { return executor.KubeProxy(args) } -func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) (authenticator.Request, http.Handler, error) { +func APIServerHandlers() (authenticator.Request, http.Handler, error) { + return executor.APIServerHandlers() +} + +func APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error { return executor.APIServer(ctx, etcdReady, args) } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index df77d6ecdf..ff815fa5c3 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -186,6 +186,7 @@ func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool, func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error { // Wait for etcd to come up as a new single-node cluster, then exit go func() { + <-e.runtime.AgentReady t := time.NewTicker(5 * time.Second) defer t.Stop() for range t.C { @@ -285,8 +286,14 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e return e.newCluster(ctx, false) } - err = e.join(ctx, clientAccessInfo) - return errors.Wrap(err, "joining etcd cluster") + go func() { + <-e.runtime.AgentReady + if err := e.join(ctx, clientAccessInfo); err != nil { + logrus.Fatalf("ETCD join failed: %v", err) + } + }() + + return nil } // join attempts to add a member to an existing cluster @@ -329,9 +336,9 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er // make sure to remove the name file if a duplicate node name is used nameFile := nameFile(e.config) if err := os.Remove(nameFile); err != nil { - return err + logrus.Errorf("Failed to remove etcd name file %s: %v", nameFile, err) } - return errors.New("Failed to join etcd cluster due to duplicate node names, please use unique node name for the server") + return errors.New("duplicate node name found, please use a unique name for this node") } for _, peer := range member.PeerURLs { u, err := url.Parse(peer) @@ -352,7 +359,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er } if add { - logrus.Infof("Adding %s to etcd cluster %v", e.peerURL(), cluster) + logrus.Infof("Adding member %s=%s to etcd cluster %v", e.name, e.peerURL(), cluster) if _, err = client.MemberAddAsLearner(clientCtx, []string{e.peerURL()}); err != nil { return err } @@ -438,7 +445,7 @@ func (e *ETCD) handler(next http.Handler) http.Handler { return mux } -// infoHandler returns etcd cluster information. This is used by new members when joining the custer. +// infoHandler returns etcd cluster information. This is used by new members when joining the cluster. func (e *ETCD) infoHandler() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second) @@ -494,6 +501,10 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi // toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable // for use by etcd. func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) { + if runtime.ClientETCDCert == "" || runtime.ClientETCDKey == "" || runtime.ETCDServerCA == "" { + return nil, errors.New("runtime is not ready yet") + } + clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey) if err != nil { return nil, err @@ -527,8 +538,8 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) { // newCluster returns options to set up etcd for a new cluster func (e *ETCD) newCluster(ctx context.Context, reset bool) error { return e.cluster(ctx, reset, executor.InitialOptions{ - AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address), - Cluster: fmt.Sprintf("%s=https://%s:2380", e.name, e.address), + AdvertisePeerURL: e.peerURL(), + Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()), State: "new", }) } @@ -621,6 +632,7 @@ func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRe // being promoted to full voting member. The checks only run on the cluster member that is // the etcd leader. func (e *ETCD) manageLearners(ctx context.Context) error { + <-e.runtime.AgentReady t := time.NewTicker(manageTickerTime) defer t.Stop() @@ -1310,9 +1322,6 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error) // GetAPIServerURLFromETCD will try to fetch the version.Program/apiaddresses key from etcd // when it succeed it will parse the first address in the list and return back an address func GetAPIServerURLFromETCD(ctx context.Context, cfg *config.Control) (string, error) { - if cfg.Runtime == nil { - return "", fmt.Errorf("runtime is not ready yet") - } cl, err := GetClient(ctx, cfg.Runtime, endpoint) if err != nil { return "", err diff --git a/pkg/server/router.go b/pkg/server/router.go index 33195691db..a03dd668fa 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -38,7 +38,7 @@ func router(ctx context.Context, config *Config) http.Handler { prefix := "/v1-" + version.Program authed := mux.NewRouter() authed.Use(authMiddleware(serverConfig, version.Program+":agent")) - authed.NotFoundHandler = serverConfig.Runtime.Handler + authed.NotFoundHandler = apiserver(serverConfig.Runtime) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey)) @@ -70,6 +70,20 @@ func router(ctx context.Context, config *Config) http.Handler { return router } +func apiserver(runtime *config.ControlRuntime) http.Handler { + return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { + if runtime != nil && runtime.APIServer != nil { + runtime.APIServer.ServeHTTP(resp, req) + } else { + data := []byte("apiserver not ready") + resp.WriteHeader(http.StatusInternalServerError) + resp.Header().Set("Content-Type", "text/plain") + resp.Header().Set("Content-length", strconv.Itoa(len(data))) + resp.Write(data) + } + }) +} + func cacerts(serverCA string) http.Handler { var ca []byte return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { @@ -262,7 +276,10 @@ func configHandler(server *config.Control) http.Handler { return } resp.Header().Set("content-type", "application/json") - json.NewEncoder(resp).Encode(server) + if err := json.NewEncoder(resp).Encode(server); err != nil { + logrus.Errorf("Failed to encode agent config: %v", err) + resp.WriteHeader(http.StatusInternalServerError) + } }) }