Browse Source

Support both http and db based bootstrap

pull/1045/head
Darren Shepherd 5 years ago
parent
commit
0ae20eb7a3
  1. 47
      pkg/cluster/cluster.go
  2. 21
      pkg/cluster/dqlite.go
  3. 81
      pkg/cluster/encrypt.go
  4. 52
      pkg/cluster/join.go
  5. 4
      pkg/cluster/nocluster.go
  6. 50
      pkg/cluster/storage.go
  7. 9
      pkg/daemons/config/types.go
  8. 23
      pkg/daemons/control/server.go
  9. 17
      pkg/dqlite/controller/client/controller.go
  10. 4
      pkg/server/router.go

47
pkg/cluster/cluster.go

@ -2,46 +2,73 @@ package cluster
import (
"context"
"strings"
"github.com/rancher/k3s/pkg/clientaccess"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/kine/pkg/client"
"github.com/rancher/kine/pkg/endpoint"
)
type Cluster struct {
token string
clientAccessInfo *clientaccess.Info
config *config.Control
runtime *config.ControlRuntime
db interface{}
runJoin bool
storageStarted bool
etcdConfig endpoint.ETCDConfig
joining bool
saveBootstrap bool
storageClient client.Client
}
func (c *Cluster) Start(ctx context.Context) error {
join, err := c.shouldJoin()
if err != nil {
if err := c.startClusterAndHTTPS(ctx); err != nil {
return err
}
if join {
if err := c.join(); err != nil {
if c.runJoin {
if err := c.postJoin(ctx); err != nil {
return err
}
}
if err := c.startClusterAndHTTPS(ctx); err != nil {
if err := c.testClusterDB(ctx); err != nil {
return err
}
if join {
if err := c.postJoin(ctx); err != nil {
if c.saveBootstrap {
if err := c.save(ctx); err != nil {
return err
}
}
if err := c.testClusterDB(ctx); err != nil {
if c.runJoin {
if err := c.joined(); err != nil {
return err
}
}
return c.startStorage(ctx)
}
func (c *Cluster) startStorage(ctx context.Context) error {
if c.storageStarted {
return nil
}
c.storageStarted = true
etcdConfig, err := endpoint.Listen(ctx, c.config.Storage)
if err != nil {
return err
}
return c.joined()
c.etcdConfig = etcdConfig
c.config.Storage.Config = etcdConfig.TLSConfig
c.config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect
return nil
}
func New(config *config.Control) *Cluster {

21
pkg/cluster/dqlite.go

@ -24,7 +24,7 @@ import (
)
func (c *Cluster) testClusterDB(ctx context.Context) error {
if !c.enabled() {
if !c.dqliteEnabled() {
return nil
}
@ -45,7 +45,7 @@ func (c *Cluster) testClusterDB(ctx context.Context) error {
}
func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
if !c.enabled() {
if !c.dqliteEnabled() {
return l, handler, nil
}
@ -61,17 +61,17 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt
return nil, nil, err
}
handler, err = dqlite.Start(ctx, c.config.ClusterInit, certs, handler)
handler, err = dqlite.Start(ctx, c.config.ClusterInit, c.config.ClusterReset, certs, handler)
if err != nil {
return nil, nil, err
}
if c.config.ClusterReset {
if err := dqlite.Reset(ctx); err == nil {
logrus.Info("Cluster reset")
logrus.Info("Cluster reset successful, now rejoin members")
os.Exit(0)
} else {
logrus.Fatal("Cluster reset failed: %v", err)
logrus.Fatalf("Cluster reset failed: %v", err)
}
}
@ -85,17 +85,22 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt
return l, handler, err
}
func (c *Cluster) enabled() bool {
func (c *Cluster) dqliteEnabled() bool {
stamp := filepath.Join(c.config.DataDir, "db", "state.dqlite")
if _, err := os.Stat(stamp); err == nil {
return true
}
return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || c.runtime.Cluster.Join)
driver, _ := endpoint.ParseStorageEndpoint(c.config.Storage.Endpoint)
if driver == endpoint.DQLiteBackend {
return true
}
return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || (c.config.Token != "" && c.config.JoinURL != ""))
}
func (c *Cluster) postJoin(ctx context.Context) error {
if !c.enabled() {
if !c.dqliteEnabled() {
return nil
}

81
pkg/cluster/encrypt.go

@ -0,0 +1,81 @@
package cluster
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"strings"
"github.com/rancher/k3s/pkg/token"
"golang.org/x/crypto/pbkdf2"
)
func storageKey(passphrase string) string {
d := sha256.New()
d.Write([]byte(passphrase))
return "/bootstrap/" + hex.EncodeToString(d.Sum(nil)[:])[:12]
}
func keyHash(passphrase string) string {
d := sha256.New()
d.Write([]byte(passphrase))
return hex.EncodeToString(d.Sum(nil)[:])[:12]
}
func encrypt(passphrase string, plaintext []byte) ([]byte, error) {
salt, err := token.Random(8)
if err != nil {
return nil, err
}
clearKey := pbkdf2.Key([]byte(passphrase), []byte(salt), 4096, 32, sha1.New)
key, err := aes.NewCipher(clearKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(key)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
_, err = io.ReadFull(rand.Reader, nonce)
if err != nil {
return nil, err
}
sealed := gcm.Seal(nonce, nonce, plaintext, nil)
return []byte(salt + ":" + base64.StdEncoding.EncodeToString(sealed)), nil
}
func decrypt(passphrase string, ciphertext []byte) ([]byte, error) {
parts := strings.SplitN(string(ciphertext), ":", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("invalid cipher text, not : delimited")
}
clearKey := pbkdf2.Key([]byte(passphrase), []byte(parts[0]), 4096, 32, sha1.New)
key, err := aes.NewCipher(clearKey)
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(key)
if err != nil {
return nil, err
}
data, err := base64.StdEncoding.DecodeString(parts[1])
if err != nil {
return nil, err
}
return gcm.Open(nil, data[:gcm.NonceSize()], data[gcm.NonceSize():], nil)
}

52
pkg/cluster/join.go

@ -2,6 +2,7 @@ package cluster
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
@ -11,18 +12,38 @@ import (
"github.com/sirupsen/logrus"
)
func (c *Cluster) Join(ctx context.Context) error {
runJoin, err := c.shouldJoin()
if err != nil {
return err
}
c.runJoin = runJoin
if runJoin {
if err := c.join(ctx); err != nil {
return err
}
}
return nil
}
func (c *Cluster) shouldJoin() (bool, error) {
if c.config.JoinURL == "" {
return false, nil
dqlite := c.dqliteEnabled()
if dqlite {
c.runtime.HTTPBootstrap = true
if c.config.JoinURL == "" {
return false, nil
}
}
stamp := filepath.Join(c.config.DataDir, "db/joined")
stamp := c.joinStamp()
if _, err := os.Stat(stamp); err == nil {
logrus.Info("Already joined to cluster, not rejoining")
logrus.Info("Cluster bootstrap already complete")
return false, nil
}
if c.config.Token == "" {
if dqlite && c.config.Token == "" {
return false, fmt.Errorf("K3S_TOKEN is required to join a cluster")
}
@ -46,14 +67,11 @@ func (c *Cluster) joined() error {
return f.Close()
}
func (c *Cluster) join() error {
c.runtime.Cluster.Join = true
func (c *Cluster) httpJoin() error {
token, err := clientaccess.NormalizeAndValidateTokenForUser(c.config.JoinURL, c.config.Token, "server")
if err != nil {
return err
}
c.token = token
info, err := clientaccess.ParseAndValidateToken(c.config.JoinURL, token)
if err != nil {
@ -69,6 +87,20 @@ func (c *Cluster) join() error {
return bootstrap.Read(bytes.NewBuffer(content), &c.runtime.ControlRuntimeBootstrap)
}
func (c *Cluster) join(ctx context.Context) error {
c.joining = true
if c.runtime.HTTPBootstrap {
return c.httpJoin()
}
if err := c.storageJoin(ctx); err != nil {
return err
}
return nil
}
func (c *Cluster) joinStamp() string {
return filepath.Join(c.config.DataDir, "db/joined")
return filepath.Join(c.config.DataDir, "db/joined-"+keyHash(c.config.Token))
}

4
pkg/cluster/nocluster.go

@ -19,3 +19,7 @@ func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler htt
func (c *Cluster) postJoin(ctx context.Context) error {
return nil
}
func (c *Cluster) dqliteEnabled() bool {
return false
}

50
pkg/cluster/storage.go

@ -0,0 +1,50 @@
package cluster
import (
"bytes"
"context"
"github.com/rancher/k3s/pkg/bootstrap"
"github.com/rancher/kine/pkg/client"
)
func (c *Cluster) save(ctx context.Context) error {
buf := &bytes.Buffer{}
if err := bootstrap.Write(buf, &c.runtime.ControlRuntimeBootstrap); err != nil {
return err
}
data, err := encrypt(c.config.Token, buf.Bytes())
if err != nil {
return err
}
return c.storageClient.Create(ctx, storageKey(c.config.Token), data)
}
func (c *Cluster) storageJoin(ctx context.Context) error {
if err := c.startStorage(ctx); err != nil {
return err
}
storageClient, err := client.New(c.etcdConfig)
if err != nil {
return err
}
c.storageClient = storageClient
value, err := storageClient.Get(ctx, storageKey(c.config.Token))
if err == client.ErrNotFound {
c.saveBootstrap = true
return nil
} else if err != nil {
return err
}
data, err := decrypt(c.config.Token, value.Data)
if err != nil {
return err
}
return bootstrap.Read(bytes.NewBuffer(data), &c.runtime.ControlRuntimeBootstrap)
}

9
pkg/daemons/config/types.go

@ -132,6 +132,8 @@ type ControlRuntimeBootstrap struct {
type ControlRuntime struct {
ControlRuntimeBootstrap
HTTPBootstrap bool
ClientKubeAPICert string
ClientKubeAPIKey string
NodePasswdFile string
@ -169,12 +171,7 @@ type ControlRuntime struct {
ClientK3sControllerCert string
ClientK3sControllerKey string
Cluster ClusterConfig
Core *core.Factory
}
type ClusterConfig struct {
Join bool
Core *core.Factory
}
type ArgString []string

23
pkg/daemons/control/server.go

@ -26,7 +26,6 @@ import (
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/passwd"
"github.com/rancher/k3s/pkg/token"
"github.com/rancher/kine/pkg/endpoint"
"github.com/rancher/wrangler-api/pkg/generated/controllers/rbac"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -317,11 +316,13 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro
runtime.ClientAuthProxyCert = path.Join(config.DataDir, "tls", "client-auth-proxy.crt")
runtime.ClientAuthProxyKey = path.Join(config.DataDir, "tls", "client-auth-proxy.key")
if err := genCerts(config, runtime); err != nil {
cluster := cluster.New(config)
if err := cluster.Join(ctx); err != nil {
return err
}
if err := cluster.New(config).Start(ctx); err != nil {
if err := genCerts(config, runtime); err != nil {
return err
}
@ -337,23 +338,11 @@ func prepare(ctx context.Context, config *config.Control, runtime *config.Contro
return err
}
if err := prepareStorageBackend(ctx, config); err != nil {
return err
}
return readTokens(runtime)
}
func prepareStorageBackend(ctx context.Context, config *config.Control) error {
etcdConfig, err := endpoint.Listen(ctx, config.Storage)
if err != nil {
if err := readTokens(runtime); err != nil {
return err
}
config.Storage.Config = etcdConfig.TLSConfig
config.Storage.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
config.NoLeaderElect = !etcdConfig.LeaderElect
return nil
return cluster.Start(ctx)
}
func readTokens(runtime *config.ControlRuntime) error {

17
pkg/dqlite/controller/client/controller.go

@ -117,7 +117,11 @@ func (h *handler) updateNodeStore() error {
return err
}
var nodeInfos []client.NodeInfo
var (
nodeInfos []client.NodeInfo
seen = map[string]bool{}
)
for _, node := range nodes {
address, ok := node.Annotations[nodeAddress]
if !ok {
@ -135,10 +139,13 @@ func (h *handler) updateNodeStore() error {
continue
}
nodeInfos = append(nodeInfos, client.NodeInfo{
ID: id,
Address: address,
})
if !seen[address] {
nodeInfos = append(nodeInfos, client.NodeInfo{
ID: id,
Address: address,
})
seen[address] = true
}
}
if len(nodeInfos) == 0 {

4
pkg/server/router.go

@ -50,7 +50,9 @@ func router(serverConfig *config.Control, tunnel http.Handler, ca []byte) http.H
serverAuthed.Use(authMiddleware(serverConfig, "k3s:server"))
serverAuthed.NotFoundHandler = nodeAuthed
serverAuthed.Path("/db/info").Handler(nodeAuthed)
serverAuthed.Path("/v1-k3s/server-bootstrap").Handler(bootstrap.Handler(&serverConfig.Runtime.ControlRuntimeBootstrap))
if serverConfig.Runtime.HTTPBootstrap {
serverAuthed.Path("/v1-k3s/server-bootstrap").Handler(bootstrap.Handler(&serverConfig.Runtime.ControlRuntimeBootstrap))
}
staticDir := filepath.Join(serverConfig.DataDir, "static")
router := mux.NewRouter()

Loading…
Cancel
Save