From 0ae20eb7a35da9edd0e75b673070b1c01d3ec11a Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 11 Nov 2019 22:18:26 +0000 Subject: [PATCH] Support both http and db based bootstrap --- pkg/cluster/cluster.go | 47 ++++++++++--- pkg/cluster/dqlite.go | 21 +++--- pkg/cluster/encrypt.go | 81 ++++++++++++++++++++++ pkg/cluster/join.go | 52 +++++++++++--- pkg/cluster/nocluster.go | 4 ++ pkg/cluster/storage.go | 50 +++++++++++++ pkg/daemons/config/types.go | 9 +-- pkg/daemons/control/server.go | 23 ++---- pkg/dqlite/controller/client/controller.go | 17 +++-- pkg/server/router.go | 4 +- 10 files changed, 251 insertions(+), 57 deletions(-) create mode 100644 pkg/cluster/encrypt.go create mode 100644 pkg/cluster/storage.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 73eeac70ab..580d40d75d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -2,46 +2,73 @@ package cluster import ( "context" + "strings" "github.com/rancher/k3s/pkg/clientaccess" "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/kine/pkg/client" + "github.com/rancher/kine/pkg/endpoint" ) type Cluster struct { - token string clientAccessInfo *clientaccess.Info config *config.Control runtime *config.ControlRuntime db interface{} + runJoin bool + storageStarted bool + etcdConfig endpoint.ETCDConfig + joining bool + saveBootstrap bool + storageClient client.Client } func (c *Cluster) Start(ctx context.Context) error { - join, err := c.shouldJoin() - if err != nil { + if err := c.startClusterAndHTTPS(ctx); err != nil { return err } - if join { - if err := c.join(); err != nil { + if c.runJoin { + if err := c.postJoin(ctx); err != nil { return err } } - if err := c.startClusterAndHTTPS(ctx); err != nil { + if err := c.testClusterDB(ctx); err != nil { return err } - if join { - if err := c.postJoin(ctx); err != nil { + if c.saveBootstrap { + if err := c.save(ctx); err != nil { return err } } - if err := c.testClusterDB(ctx); err != nil { + if c.runJoin { + if err := c.joined(); err != nil { + return err + } + } + + return c.startStorage(ctx) +} + +func (c *Cluster) startStorage(ctx context.Context) error { + if c.storageStarted { + return nil + } + c.storageStarted = true + + etcdConfig, err := endpoint.Listen(ctx, c.config.Storage) + if err != nil { return err } - return c.joined() + c.etcdConfig = etcdConfig + c.config.Storage.Config = etcdConfig.TLSConfig + c.config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",") + c.config.NoLeaderElect = !etcdConfig.LeaderElect + return nil } func New(config *config.Control) *Cluster { diff --git a/pkg/cluster/dqlite.go b/pkg/cluster/dqlite.go index ab2d05cb0a..faeb80f994 100644 --- a/pkg/cluster/dqlite.go +++ b/pkg/cluster/dqlite.go @@ -24,7 +24,7 @@ import ( ) func (c *Cluster) testClusterDB(ctx context.Context) error { - if !c.enabled() { + if !c.dqliteEnabled() { return nil } @@ -45,7 +45,7 @@ func (c *Cluster) testClusterDB(ctx context.Context) error { } func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) { - if !c.enabled() { + if !c.dqliteEnabled() { return l, handler, nil } @@ -61,17 +61,17 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt return nil, nil, err } - handler, err = dqlite.Start(ctx, c.config.ClusterInit, certs, handler) + handler, err = dqlite.Start(ctx, c.config.ClusterInit, c.config.ClusterReset, certs, handler) if err != nil { return nil, nil, err } if c.config.ClusterReset { if err := dqlite.Reset(ctx); err == nil { - logrus.Info("Cluster reset") + logrus.Info("Cluster reset successful, now rejoin members") os.Exit(0) } else { - logrus.Fatal("Cluster reset failed: %v", err) + logrus.Fatalf("Cluster reset failed: %v", err) } } @@ -85,17 +85,22 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt return l, handler, err } -func (c *Cluster) enabled() bool { +func (c *Cluster) dqliteEnabled() bool { stamp := filepath.Join(c.config.DataDir, "db", "state.dqlite") if _, err := os.Stat(stamp); err == nil { return true } - return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || c.runtime.Cluster.Join) + driver, _ := endpoint.ParseStorageEndpoint(c.config.Storage.Endpoint) + if driver == endpoint.DQLiteBackend { + return true + } + + return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != "")) } func (c *Cluster) postJoin(ctx context.Context) error { - if !c.enabled() { + if !c.dqliteEnabled() { return nil } diff --git a/pkg/cluster/encrypt.go b/pkg/cluster/encrypt.go new file mode 100644 index 0000000000..be44ff2363 --- /dev/null +++ b/pkg/cluster/encrypt.go @@ -0,0 +1,81 @@ +package cluster + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "crypto/sha1" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "fmt" + "io" + "strings" + + "github.com/rancher/k3s/pkg/token" + "golang.org/x/crypto/pbkdf2" +) + +func storageKey(passphrase string) string { + d := sha256.New() + d.Write([]byte(passphrase)) + return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12] +} + +func keyHash(passphrase string) string { + d := sha256.New() + d.Write([]byte(passphrase)) + return hex.EncodeToString(d.Sum(nil)[:])[:12] +} + +func encrypt(passphrase string, plaintext []byte) ([]byte, error) { + salt, err := token.Random(8) + if err != nil { + return nil, err + } + + clearKey := pbkdf2.Key([]byte(passphrase), []byte(salt), 4096, 32, sha1.New) + key, err := aes.NewCipher(clearKey) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(key) + if err != nil { + return nil, err + } + + nonce := make([]byte, gcm.NonceSize()) + _, err = io.ReadFull(rand.Reader, nonce) + if err != nil { + return nil, err + } + + sealed := gcm.Seal(nonce, nonce, plaintext, nil) + return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil +} + +func decrypt(passphrase string, ciphertext []byte) ([]byte, error) { + parts := strings.SplitN(string(ciphertext), ":", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid cipher text, not : delimited") + } + + clearKey := pbkdf2.Key([]byte(passphrase), []byte(parts[0]), 4096, 32, sha1.New) + key, err := aes.NewCipher(clearKey) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(key) + if err != nil { + return nil, err + } + + data, err := base64.StdEncoding.DecodeString(parts[1]) + if err != nil { + return nil, err + } + + return gcm.Open(nil, data[:gcm.NonceSize()], data[gcm.NonceSize():], nil) +} diff --git a/pkg/cluster/join.go b/pkg/cluster/join.go index 99d1bdecdf..ecb85e2b7a 100644 --- a/pkg/cluster/join.go +++ b/pkg/cluster/join.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "context" "fmt" "os" "path/filepath" @@ -11,18 +12,38 @@ import ( "github.com/sirupsen/logrus" ) +func (c *Cluster) Join(ctx context.Context) error { + runJoin, err := c.shouldJoin() + if err != nil { + return err + } + c.runJoin = runJoin + + if runJoin { + if err := c.join(ctx); err != nil { + return err + } + } + + return nil +} + func (c *Cluster) shouldJoin() (bool, error) { - if c.config.JoinURL == "" { - return false, nil + dqlite := c.dqliteEnabled() + if dqlite { + c.runtime.HTTPBootstrap = true + if c.config.JoinURL == "" { + return false, nil + } } - stamp := filepath.Join(c.config.DataDir, "db/joined") + stamp := c.joinStamp() if _, err := os.Stat(stamp); err == nil { - logrus.Info("Already joined to cluster, not rejoining") + logrus.Info("Cluster bootstrap already complete") return false, nil } - if c.config.Token == "" { + if dqlite && c.config.Token == "" { return false, fmt.Errorf("K3S_TOKEN is required to join a cluster") } @@ -46,14 +67,11 @@ func (c *Cluster) joined() error { return f.Close() } -func (c *Cluster) join() error { - c.runtime.Cluster.Join = true - +func (c *Cluster) httpJoin() error { token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server") if err != nil { return err } - c.token = token info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token) if err != nil { @@ -69,6 +87,20 @@ func (c *Cluster) join() error { return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap) } +func (c *Cluster) join(ctx context.Context) error { + c.joining = true + + if c.runtime.HTTPBootstrap { + return c.httpJoin() + } + + if err := c.storageJoin(ctx); err != nil { + return err + } + + return nil +} + func (c *Cluster) joinStamp() string { - return filepath.Join(c.config.DataDir, "db/joined") + return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token)) } diff --git a/pkg/cluster/nocluster.go b/pkg/cluster/nocluster.go index a7ed7be3d4..19f5728c2a 100644 --- a/pkg/cluster/nocluster.go +++ b/pkg/cluster/nocluster.go @@ -19,3 +19,7 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt func (c *Cluster) postJoin(ctx context.Context) error { return nil } + +func (c *Cluster) dqliteEnabled() bool { + return false +} diff --git a/pkg/cluster/storage.go b/pkg/cluster/storage.go new file mode 100644 index 0000000000..dc3d776de6 --- /dev/null +++ b/pkg/cluster/storage.go @@ -0,0 +1,50 @@ +package cluster + +import ( + "bytes" + "context" + + "github.com/rancher/k3s/pkg/bootstrap" + "github.com/rancher/kine/pkg/client" +) + +func (c *Cluster) save(ctx context.Context) error { + buf := &bytes.Buffer{} + if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil { + return err + } + + data, err := encrypt(c.config.Token, buf.Bytes()) + if err != nil { + return err + } + + return c.storageClient.Create(ctx, storageKey(c.config.Token), data) +} + +func (c *Cluster) storageJoin(ctx context.Context) error { + if err := c.startStorage(ctx); err != nil { + return err + } + + storageClient, err := client.New(c.etcdConfig) + if err != nil { + return err + } + c.storageClient = storageClient + + value, err := storageClient.Get(ctx, storageKey(c.config.Token)) + if err == client.ErrNotFound { + c.saveBootstrap = true + return nil + } else if err != nil { + return err + } + + data, err := decrypt(c.config.Token, value.Data) + if err != nil { + return err + } + + return bootstrap.Read(bytes.NewBuffer(data), &c.runtime.ControlRuntimeBootstrap) +} diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 8c41740c33..c3750ffa84 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -132,6 +132,8 @@ type ControlRuntimeBootstrap struct { type ControlRuntime struct { ControlRuntimeBootstrap + HTTPBootstrap bool + ClientKubeAPICert string ClientKubeAPIKey string NodePasswdFile string @@ -169,12 +171,7 @@ type ControlRuntime struct { ClientK3sControllerCert string ClientK3sControllerKey string - Cluster ClusterConfig - Core *core.Factory -} - -type ClusterConfig struct { - Join bool + Core *core.Factory } type ArgString []string diff --git a/pkg/daemons/control/server.go b/pkg/daemons/control/server.go index 9990ddd3f9..b3ff90acd1 100644 --- a/pkg/daemons/control/server.go +++ b/pkg/daemons/control/server.go @@ -26,7 +26,6 @@ import ( "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/passwd" "github.com/rancher/k3s/pkg/token" - "github.com/rancher/kine/pkg/endpoint" "github.com/rancher/wrangler-api/pkg/generated/controllers/rbac" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -317,11 +316,13 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro runtime.ClientAuthProxyCert = path.Join(config.DataDir, "tls", "client-auth-proxy.crt") runtime.ClientAuthProxyKey = path.Join(config.DataDir, "tls", "client-auth-proxy.key") - if err := genCerts(config, runtime); err != nil { + cluster := cluster.New(config) + + if err := cluster.Join(ctx); err != nil { return err } - if err := cluster.New(config).Start(ctx); err != nil { + if err := genCerts(config, runtime); err != nil { return err } @@ -337,23 +338,11 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro return err } - if err := prepareStorageBackend(ctx, config); err != nil { - return err - } - - return readTokens(runtime) -} - -func prepareStorageBackend(ctx context.Context, config *config.Control) error { - etcdConfig, err := endpoint.Listen(ctx, config.Storage) - if err != nil { + if err := readTokens(runtime); err != nil { return err } - config.Storage.Config = etcdConfig.TLSConfig - config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",") - config.NoLeaderElect = !etcdConfig.LeaderElect - return nil + return cluster.Start(ctx) } func readTokens(runtime *config.ControlRuntime) error { diff --git a/pkg/dqlite/controller/client/controller.go b/pkg/dqlite/controller/client/controller.go index 9df5780457..25027a7d3c 100644 --- a/pkg/dqlite/controller/client/controller.go +++ b/pkg/dqlite/controller/client/controller.go @@ -117,7 +117,11 @@ func (h *handler) updateNodeStore() error { return err } - var nodeInfos []client.NodeInfo + var ( + nodeInfos []client.NodeInfo + seen = map[string]bool{} + ) + for _, node := range nodes { address, ok := node.Annotations[nodeAddress] if !ok { @@ -135,10 +139,13 @@ func (h *handler) updateNodeStore() error { continue } - nodeInfos = append(nodeInfos, client.NodeInfo{ - ID: id, - Address: address, - }) + if !seen[address] { + nodeInfos = append(nodeInfos, client.NodeInfo{ + ID: id, + Address: address, + }) + seen[address] = true + } } if len(nodeInfos) == 0 { diff --git a/pkg/server/router.go b/pkg/server/router.go index 9909d13288..b9908b6dce 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -50,7 +50,9 @@ func router(serverConfig *config.Control, tunnel http.Handler, ca []byte) http.H serverAuthed.Use(authMiddleware(serverConfig, "k3s:server")) serverAuthed.NotFoundHandler = nodeAuthed serverAuthed.Path("/db/info").Handler(nodeAuthed) - serverAuthed.Path("/v1-k3s/server-bootstrap").Handler(bootstrap.Handler(&serverConfig.Runtime.ControlRuntimeBootstrap)) + if serverConfig.Runtime.HTTPBootstrap { + serverAuthed.Path("/v1-k3s/server-bootstrap").Handler(bootstrap.Handler(&serverConfig.Runtime.ControlRuntimeBootstrap)) + } staticDir := filepath.Join(serverConfig.DataDir, "static") router := mux.NewRouter()