From e2431bdf9d698f13a77342f0bd27ffe60163f4b3 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Wed, 30 Oct 2019 19:05:40 -0700 Subject: [PATCH] Add dqlite support --- pkg/cli/cmds/dqlite.go | 7 + pkg/cluster/cluster.go | 4 + pkg/cluster/dqlite.go | 138 +++++++++++++ pkg/cluster/nocluster.go | 4 + pkg/dqlite/controller/client/controller.go | 149 ++++++++++++++ pkg/dqlite/dialer/dial.go | 36 ++++ pkg/dqlite/join.go | 51 +++++ pkg/dqlite/log.go | 21 ++ pkg/dqlite/pipe/http.go | 56 ++++++ pkg/dqlite/pipe/pipe.go | 46 +++++ pkg/dqlite/proxy.go | 53 +++++ pkg/dqlite/reset.go | 40 ++++ pkg/dqlite/router.go | 80 ++++++++ pkg/dqlite/server.go | 213 +++++++++++++++++++++ pkg/server/router.go | 1 + 15 files changed, 899 insertions(+) create mode 100644 pkg/cli/cmds/dqlite.go create mode 100644 pkg/cluster/dqlite.go create mode 100644 pkg/dqlite/controller/client/controller.go create mode 100644 pkg/dqlite/dialer/dial.go create mode 100644 pkg/dqlite/join.go create mode 100644 pkg/dqlite/log.go create mode 100644 pkg/dqlite/pipe/http.go create mode 100644 pkg/dqlite/pipe/pipe.go create mode 100644 pkg/dqlite/proxy.go create mode 100644 pkg/dqlite/reset.go create mode 100644 pkg/dqlite/router.go create mode 100644 pkg/dqlite/server.go diff --git a/pkg/cli/cmds/dqlite.go b/pkg/cli/cmds/dqlite.go new file mode 100644 index 0000000000..b7f77e36cd --- /dev/null +++ b/pkg/cli/cmds/dqlite.go @@ -0,0 +1,7 @@ +// +build dqlite + +package cmds + +const ( + hideDqlite = false +) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d5372dae0c..73eeac70ab 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -37,6 +37,10 @@ func (c *Cluster) Start(ctx context.Context) error { } } + if err := c.testClusterDB(ctx); err != nil { + return err + } + return c.joined() } diff --git a/pkg/cluster/dqlite.go b/pkg/cluster/dqlite.go new file mode 100644 index 0000000000..ab2d05cb0a --- /dev/null +++ b/pkg/cluster/dqlite.go @@ -0,0 +1,138 @@ +// +build dqlite + +package cluster + +import ( + "context" + "crypto/tls" + "encoding/json" + "net" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/canonical/go-dqlite/client" + "github.com/rancher/dynamiclistener/factory" + "github.com/rancher/k3s/pkg/clientaccess" + "github.com/rancher/k3s/pkg/daemons/config" + "github.com/rancher/k3s/pkg/dqlite" + "github.com/rancher/kine/pkg/endpoint" + v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" +) + +func (c *Cluster) testClusterDB(ctx context.Context) error { + if !c.enabled() { + return nil + } + + dqlite := c.db.(*dqlite.DQLite) + for { + if err := dqlite.Test(ctx); err != nil { + logrus.Infof("Failed to test dqlite connection: %v", err) + } else { + return nil + } + + select { + case <-time.After(2 * time.Second): + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) { + if !c.enabled() { + return l, handler, nil + } + + dqlite := dqlite.New(c.config.DataDir, c.config.AdvertiseIP, c.config.AdvertisePort, func() v1.NodeController { + if c.runtime.Core == nil { + return nil + } + return c.runtime.Core.Core().V1().Node() + }) + + certs, err := toGetCerts(c.runtime) + if err != nil { + return nil, nil, err + } + + handler, err = dqlite.Start(ctx, c.config.ClusterInit, certs, handler) + if err != nil { + return nil, nil, err + } + + if c.config.ClusterReset { + if err := dqlite.Reset(ctx); err == nil { + logrus.Info("Cluster reset") + os.Exit(0) + } else { + logrus.Fatal("Cluster reset failed: %v", err) + } + } + + c.db = dqlite + if !strings.HasPrefix(c.config.Storage.Endpoint, "dqlite://") { + c.config.Storage = endpoint.Config{ + Endpoint: dqlite.StorageEndpoint, + } + } + + return l, handler, err +} + +func (c *Cluster) enabled() bool { + stamp := filepath.Join(c.config.DataDir, "db", "state.dqlite") + if _, err := os.Stat(stamp); err == nil { + return true + } + + return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || c.runtime.Cluster.Join) +} + +func (c *Cluster) postJoin(ctx context.Context) error { + if !c.enabled() { + return nil + } + + resp, err := clientaccess.Get("/db/info", c.clientAccessInfo) + if err != nil { + return err + } + + dqlite := c.db.(*dqlite.DQLite) + var nodes []client.NodeInfo + + if err := json.Unmarshal(resp, &nodes); err != nil { + return err + } + + return dqlite.Join(ctx, nodes) +} + +func toGetCerts(runtime *config.ControlRuntime) (*dqlite.Certs, error) { + clientCA, _, err := factory.LoadCerts(runtime.ClientCA, runtime.ClientCAKey) + if err != nil { + return nil, err + } + + ca, _, err := factory.LoadCerts(runtime.ServerCA, runtime.ServerCAKey) + if err != nil { + return nil, err + } + + clientCert, err := tls.LoadX509KeyPair(runtime.ClientKubeAPICert, runtime.ClientKubeAPIKey) + if err != nil { + return nil, err + } + + return &dqlite.Certs{ + ServerTrust: ca, + ClientTrust: clientCA, + ClientCert: clientCert, + }, nil +} diff --git a/pkg/cluster/nocluster.go b/pkg/cluster/nocluster.go index ce5029156a..a7ed7be3d4 100644 --- a/pkg/cluster/nocluster.go +++ b/pkg/cluster/nocluster.go @@ -8,6 +8,10 @@ import ( "net/http" ) +func (c *Cluster) testClusterDB(ctx context.Context) error { + return nil +} + func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) { return l, handler, nil } diff --git a/pkg/dqlite/controller/client/controller.go b/pkg/dqlite/controller/client/controller.go new file mode 100644 index 0000000000..9df5780457 --- /dev/null +++ b/pkg/dqlite/controller/client/controller.go @@ -0,0 +1,149 @@ +package client + +import ( + "context" + "fmt" + "strconv" + + "github.com/canonical/go-dqlite/client" + controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" +) + +const ( + allKey = "_all_" + nodeID = "cluster.k3s.cattle.io/node-id" + nodeAddress = "cluster.k3s.cattle.io/node-address" + master = "node-role.kubernetes.io/master" +) + +func Register(ctx context.Context, nodeName string, nodeInfo client.NodeInfo, + nodeStore client.NodeStore, nodes controllerv1.NodeController, opts []client.Option) { + h := &handler{ + nodeStore: nodeStore, + nodeController: nodes, + nodeName: nodeName, + id: strconv.FormatUint(nodeInfo.ID, 10), + address: nodeInfo.Address, + ctx: ctx, + opts: opts, + } + nodes.OnChange(ctx, "dqlite-client", h.sync) + nodes.OnRemove(ctx, "dqlite-client", h.onRemove) +} + +type handler struct { + nodeStore client.NodeStore + nodeController controllerv1.NodeController + nodeName string + id string + address string + ctx context.Context + opts []client.Option +} + +func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) { + if key == allKey { + return nil, h.updateNodeStore() + } + + if node == nil { + return nil, nil + } + + if key == h.nodeName { + return h.handleSelf(node) + } + + if node.Labels[master] == "true" { + h.nodeController.Enqueue(allKey) + } + + return node, nil +} + +func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) { + if node.Annotations[nodeID] == h.id && node.Annotations[nodeAddress] == h.address { + return node, nil + } + + node = node.DeepCopy() + if node.Annotations == nil { + node.Annotations = map[string]string{} + } + node.Annotations[nodeID] = h.id + node.Annotations[nodeAddress] = h.address + + return h.nodeController.Update(node) +} + +func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) { + address := node.Annotations[nodeAddress] + if address == "" { + return node, nil + } + return node, h.delete(address) +} + +func (h *handler) delete(address string) error { + c, err := client.FindLeader(h.ctx, h.nodeStore, h.opts...) + if err != nil { + return err + } + defer c.Close() + + members, err := c.Cluster(h.ctx) + if err != nil { + return err + } + + for _, member := range members { + if member.Address == address { + logrus.Infof("Removing %s %d from dqlite", member.Address, member.ID) + return c.Remove(h.ctx, member.ID) + } + } + + return nil +} + +func (h *handler) updateNodeStore() error { + nodes, err := h.nodeController.Cache().List(labels.SelectorFromSet(labels.Set{ + master: "true", + })) + if err != nil { + return err + } + + var nodeInfos []client.NodeInfo + for _, node := range nodes { + address, ok := node.Annotations[nodeAddress] + if !ok { + continue + } + + nodeIDStr, ok := node.Annotations[nodeID] + if !ok { + continue + } + + id, err := strconv.ParseUint(nodeIDStr, 10, 64) + if err != nil { + logrus.Errorf("invalid %s=%s, must be a number: %v", nodeID, nodeIDStr, err) + continue + } + + nodeInfos = append(nodeInfos, client.NodeInfo{ + ID: id, + Address: address, + }) + } + + if len(nodeInfos) == 0 { + return fmt.Errorf("not setting dqlient NodeStore len to 0") + } + + return h.nodeStore.Set(h.ctx, nodeInfos) +} diff --git a/pkg/dqlite/dialer/dial.go b/pkg/dqlite/dialer/dial.go new file mode 100644 index 0000000000..87b801a7dc --- /dev/null +++ b/pkg/dqlite/dialer/dial.go @@ -0,0 +1,36 @@ +package dialer + +import ( + "context" + "crypto/tls" + "fmt" + "net" + + "github.com/canonical/go-dqlite/client" + "github.com/rancher/k3s/pkg/dqlite/pipe" +) + +func NewHTTPDialer(advertiseAddress, bindAddress string, tls *tls.Config) (client.DialFunc, error) { + d := &dialer{ + advertiseAddress: advertiseAddress, + bindAddress: bindAddress, + tls: tls, + } + + return d.dial, nil +} + +type dialer struct { + advertiseAddress string + bindAddress string + tls *tls.Config +} + +func (d *dialer) dial(ctx context.Context, address string) (net.Conn, error) { + if address == d.advertiseAddress { + return net.Dial("unix", d.bindAddress) + } + + url := fmt.Sprintf("https://%s/db/connect", address) + return pipe.ToHTTP(ctx, url, d.tls) +} diff --git a/pkg/dqlite/join.go b/pkg/dqlite/join.go new file mode 100644 index 0000000000..27cfd76ecf --- /dev/null +++ b/pkg/dqlite/join.go @@ -0,0 +1,51 @@ +package dqlite + +import ( + "context" + + "github.com/canonical/go-dqlite/client" + "github.com/sirupsen/logrus" +) + +func (d *DQLite) Test(ctx context.Context) error { + var ips []string + peers, err := d.NodeStore.Get(ctx) + if err != nil { + return err + } + + for _, peer := range peers { + ips = append(ips, peer.Address) + } + + logrus.Infof("Testing connection to peers %v", ips) + return d.Join(ctx, nil) +} + +func (d *DQLite) Join(ctx context.Context, nodes []client.NodeInfo) error { + if len(nodes) > 0 { + if err := d.NodeStore.Set(ctx, nodes); err != nil { + return err + } + } + + client, err := client.FindLeader(ctx, d.NodeStore, d.clientOpts...) + if err != nil { + return err + } + defer client.Close() + + current, err := client.Cluster(ctx) + if err != nil { + return err + } + + for _, testNode := range current { + if testNode.Address == d.NodeInfo.Address { + return nil + } + } + + logrus.Infof("Joining dqlite cluster as address=%s, id=%d") + return client.Add(ctx, d.NodeInfo) +} diff --git a/pkg/dqlite/log.go b/pkg/dqlite/log.go new file mode 100644 index 0000000000..e7185b7797 --- /dev/null +++ b/pkg/dqlite/log.go @@ -0,0 +1,21 @@ +package dqlite + +import ( + "github.com/canonical/go-dqlite/client" + "github.com/sirupsen/logrus" +) + +func log() client.LogFunc { + return func(level client.LogLevel, s string, i ...interface{}) { + switch level { + case client.LogDebug: + logrus.Debugf(s, i...) + case client.LogError: + logrus.Errorf(s, i...) + case client.LogInfo: + logrus.Infof(s, i...) + case client.LogWarn: + logrus.Warnf(s, i...) + } + } +} diff --git a/pkg/dqlite/pipe/http.go b/pkg/dqlite/pipe/http.go new file mode 100644 index 0000000000..036d4c69a2 --- /dev/null +++ b/pkg/dqlite/pipe/http.go @@ -0,0 +1,56 @@ +package pipe + +import ( + "bufio" + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + + "github.com/pkg/errors" +) + +func ToHTTP(ctx context.Context, url string, tlsConfig *tls.Config) (net.Conn, error) { + request, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + request = request.WithContext(ctx) + netDial := &net.Dialer{} + + if deadline, ok := ctx.Deadline(); ok { + netDial.Deadline = deadline + } + + conn, err := tls.DialWithDialer(netDial, "tcp", request.URL.Host, tlsConfig) + if err != nil { + return nil, errors.Wrap(err, "tls dial") + } + + err = request.Write(conn) + if err != nil { + return nil, errors.Wrap(err, "request write") + } + + response, err := http.ReadResponse(bufio.NewReader(conn), request) + if err != nil { + return nil, errors.Wrap(err, "read request") + } + if response.StatusCode != http.StatusSwitchingProtocols { + return nil, fmt.Errorf("expected 101 response, got: %d", response.StatusCode) + } + + listener, err := net.Listen("unix", "") + if err != nil { + return nil, errors.Wrap(err, "Failed to create unix listener") + } + defer listener.Close() + + if err := Unix(conn, listener.Addr().String()); err != nil { + return nil, err + } + + return listener.Accept() +} diff --git a/pkg/dqlite/pipe/pipe.go b/pkg/dqlite/pipe/pipe.go new file mode 100644 index 0000000000..cf8d6113c0 --- /dev/null +++ b/pkg/dqlite/pipe/pipe.go @@ -0,0 +1,46 @@ +package pipe + +import ( + "io" + "net" + + "github.com/lxc/lxd/shared/eagain" + "github.com/sirupsen/logrus" +) + +func UnixPiper(srcs <-chan net.Conn, bindAddress string) { + for src := range srcs { + go Unix(src, bindAddress) + } +} + +func Unix(src net.Conn, target string) error { + dst, err := net.Dial("unix", target) + if err != nil { + src.Close() + return err + } + + Connect(src, dst) + return nil +} + +func Connect(src net.Conn, dst net.Conn) { + go func() { + _, err := io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src}) + if err != nil && err != io.EOF { + logrus.Warnf("copy pipe src->dst closed: %v", err) + } + src.Close() + dst.Close() + }() + + go func() { + _, err := io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst}) + if err != nil { + logrus.Warnf("copy pipe dst->src closed: %v", err) + } + src.Close() + dst.Close() + }() +} diff --git a/pkg/dqlite/proxy.go b/pkg/dqlite/proxy.go new file mode 100644 index 0000000000..0387ad023e --- /dev/null +++ b/pkg/dqlite/proxy.go @@ -0,0 +1,53 @@ +package dqlite + +import ( + "context" + "net" + "net/http" + + "github.com/pkg/errors" + "github.com/rancher/k3s/pkg/dqlite/pipe" +) + +var ( + upgradeResponse = []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: dqlite\r\n\r\n") +) + +type proxy struct { + conns chan net.Conn +} + +func newProxy(ctx context.Context, bindAddress string) http.Handler { + p := &proxy{ + conns: make(chan net.Conn, 100), + } + go func() { + <-ctx.Done() + close(p.conns) + }() + go pipe.UnixPiper(p.conns, bindAddress) + + return p +} + +func (h *proxy) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + hijacker, ok := rw.(http.Hijacker) + if !ok { + http.Error(rw, "failed to hijack", http.StatusInternalServerError) + return + } + + conn, _, err := hijacker.Hijack() + if err != nil { + err := errors.Wrap(err, "Hijack connection") + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + if n, err := conn.Write(upgradeResponse); err != nil || n != len(upgradeResponse) { + conn.Close() + return + } + + h.conns <- conn +} diff --git a/pkg/dqlite/reset.go b/pkg/dqlite/reset.go new file mode 100644 index 0000000000..95b263e55d --- /dev/null +++ b/pkg/dqlite/reset.go @@ -0,0 +1,40 @@ +package dqlite + +import ( + "context" + "fmt" + + "github.com/canonical/go-dqlite/client" + "github.com/sirupsen/logrus" +) + +func (d *DQLite) Reset(ctx context.Context) error { + dqClient, err := client.New(ctx, d.getBindAddress(), client.WithLogFunc(log())) + if err != nil { + return err + } + + current, err := dqClient.Cluster(ctx) + if err != nil { + return err + } + + // There's a chance our ID and the ID the server has doesn't match so find the ID + var surviving []client.NodeInfo + for _, testNode := range current { + if testNode.Address == d.NodeInfo.Address && testNode.ID == d.NodeInfo.ID { + surviving = append(surviving, testNode) + continue + } + if err := dqClient.Remove(ctx, testNode.ID); err != nil { + return err + } + } + + if len(surviving) != 1 { + return fmt.Errorf("failed to find %s in the current node, can not reset", d.NodeInfo.Address) + } + + logrus.Infof("Resetting cluster to single master, please rejoin members") + return d.node.Recover(surviving) +} diff --git a/pkg/dqlite/router.go b/pkg/dqlite/router.go new file mode 100644 index 0000000000..a88ce07e4a --- /dev/null +++ b/pkg/dqlite/router.go @@ -0,0 +1,80 @@ +package dqlite + +import ( + "context" + "crypto/x509" + "encoding/json" + "net/http" + + "github.com/canonical/go-dqlite" + "github.com/canonical/go-dqlite/client" + "github.com/gorilla/mux" +) + +func router(ctx context.Context, next http.Handler, nodeInfo dqlite.NodeInfo, clientCA *x509.Certificate, clientCN string, bindAddress string) http.Handler { + mux := mux.NewRouter() + mux.Handle("/db/connect", newChecker(newProxy(ctx, bindAddress), clientCA, clientCN)) + mux.Handle("/db/info", infoHandler(ctx, nodeInfo, bindAddress)) + mux.NotFoundHandler = next + return mux +} + +func infoHandler(ctx context.Context, nodeInfo dqlite.NodeInfo, bindAddress string) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + client, err := client.New(ctx, bindAddress, client.WithLogFunc(log())) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + defer client.Close() + + info, err := client.Cluster(ctx) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + rw.Header().Set("Content-Type", "application/json") + json.NewEncoder(rw).Encode(info) + }) +} + +type checker struct { + next http.Handler + verify x509.VerifyOptions + cn string +} + +func newChecker(next http.Handler, ca *x509.Certificate, cn string) http.Handler { + pool := x509.NewCertPool() + pool.AddCert(ca) + return &checker{ + next: next, + verify: x509.VerifyOptions{ + Roots: pool, + KeyUsages: []x509.ExtKeyUsage{ + x509.ExtKeyUsageClientAuth, + }, + DNSName: cn, + }, + cn: cn, + } +} + +func (c *checker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + if !c.check(req) { + http.Error(rw, "unauthorized", http.StatusUnauthorized) + return + } + c.next.ServeHTTP(rw, req) +} + +func (c *checker) check(r *http.Request) bool { + for _, cert := range r.TLS.PeerCertificates { + _, err := cert.Verify(c.verify) + if err == nil { + return cert.Subject.CommonName == c.cn + } + } + return false +} diff --git a/pkg/dqlite/server.go b/pkg/dqlite/server.go new file mode 100644 index 0000000000..b5e6b04a4a --- /dev/null +++ b/pkg/dqlite/server.go @@ -0,0 +1,213 @@ +package dqlite + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/canonical/go-dqlite" + "github.com/canonical/go-dqlite/client" + "github.com/pkg/errors" + controllerclient "github.com/rancher/k3s/pkg/dqlite/controller/client" + "github.com/rancher/k3s/pkg/dqlite/dialer" + dqlitedriver "github.com/rancher/kine/pkg/drivers/dqlite" + v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/net" +) + +type Certs struct { + ServerTrust *x509.Certificate + ClientTrust *x509.Certificate + ClientCert tls.Certificate +} + +type DQLite struct { + ClientCA string + ClientCAKey string + ClientCert string + ClientCertKey string + ServerCA string + ServerCAKey string + AdvertiseIP string + AdvertisePort int + DataDir string + NodeStore client.NodeStore + NodeInfo client.NodeInfo + node *dqlite.Node + StorageEndpoint string + NodeControllerGetter NodeControllerGetter + clientOpts []client.Option +} + +type NodeControllerGetter func() v1.NodeController + +func New(dataDir, advertiseIP string, advertisePort int, getter NodeControllerGetter) *DQLite { + return &DQLite{ + AdvertiseIP: advertiseIP, + AdvertisePort: advertisePort, + DataDir: dataDir, + NodeControllerGetter: getter, + } +} + +func (d *DQLite) Start(ctx context.Context, initCluster bool, certs *Certs, next http.Handler) (http.Handler, error) { + bindAddress := d.getBindAddress() + + clientTLSConfig, err := getClientTLSConfig(certs.ClientCert, certs.ServerTrust) + if err != nil { + return nil, err + } + + advertise, err := getAdvertiseAddress(d.AdvertiseIP, d.AdvertisePort) + if err != nil { + return nil, errors.Wrap(err, "get advertise address") + } + + dial, err := getDialer(advertise, bindAddress, clientTLSConfig) + if err != nil { + return nil, err + } + + dqlitedriver.Dialer = dial + dqlitedriver.Logger = log() + + d.clientOpts = append(d.clientOpts, client.WithDialFunc(dial), client.WithLogFunc(log())) + + nodeInfo, node, err := getNode(d.DataDir, advertise, bindAddress, initCluster, dial) + if err != nil { + return nil, err + } + + d.NodeInfo = nodeInfo + + go func() { + <-ctx.Done() + node.Close() + }() + + if err := d.nodeStore(ctx, initCluster); err != nil { + return nil, err + } + + go d.startController(ctx) + + return router(ctx, next, nodeInfo, certs.ClientTrust, "kube-apiserver", bindAddress), node.Start() +} + +func (d *DQLite) startController(ctx context.Context) { + for { + if nc := d.NodeControllerGetter(); nc != nil { + if os.Getenv("NODE_NAME") == "" { + logrus.Errorf("--disable-agent is not compatible with dqlite") + } else { + break + } + } + time.Sleep(time.Second) + } + + controllerclient.Register(ctx, os.Getenv("NODE_NAME"), d.NodeInfo, d.NodeStore, d.NodeControllerGetter(), d.clientOpts) +} + +func (d *DQLite) nodeStore(ctx context.Context, initCluster bool) error { + peerDB := filepath.Join(d.DataDir, "db", "state.dqlite", "peers.db") + ns, err := client.DefaultNodeStore(peerDB) + if err != nil { + return err + } + d.NodeStore = ns + d.StorageEndpoint = fmt.Sprintf("dqlite://?peer-file=%s", peerDB) + if initCluster { + if err := dqlitedriver.AddPeers(ctx, d.NodeStore, d.NodeInfo); err != nil { + return err + } + } + return nil +} + +func getAdvertiseAddress(advertiseIP string, advertisePort int) (string, error) { + ip := advertiseIP + if ip == "" { + ipAddr, err := net.ChooseHostInterface() + if err != nil { + return "", err + } + ip = ipAddr.String() + } + + return fmt.Sprintf("%s:%d", ip, advertisePort), nil +} + +func getClientTLSConfig(cert tls.Certificate, ca *x509.Certificate) (*tls.Config, error) { + tlsConfig := &tls.Config{ + RootCAs: x509.NewCertPool(), + Certificates: []tls.Certificate{ + cert, + }, + ServerName: "kubernetes", + } + tlsConfig.RootCAs.AddCert(ca) + + return tlsConfig, nil +} + +func getDialer(advertiseAddress, bindAddress string, tlsConfig *tls.Config) (client.DialFunc, error) { + return dialer.NewHTTPDialer(advertiseAddress, bindAddress, tlsConfig) +} + +func getNode(dataDir string, advertiseAddress, bindAddress string, initCluster bool, dial client.DialFunc) (dqlite.NodeInfo, *dqlite.Node, error) { + id, err := getClusterID(initCluster, dataDir) + if err != nil { + return dqlite.NodeInfo{}, nil, errors.Wrap(err, "reading cluster id") + } + + dbDir := filepath.Join(dataDir, "db", "state.dqlite") + + node, err := dqlite.New(id, advertiseAddress, dbDir, + dqlite.WithBindAddress(bindAddress), + dqlite.WithDialFunc(dial), + dqlite.WithNetworkLatency(20*time.Millisecond)) + return dqlite.NodeInfo{ + ID: id, + Address: advertiseAddress, + }, node, err +} + +func getClusterID(initCluster bool, dataDir string) (uint64, error) { + idFile := filepath.Join(dataDir, "db/state.dqlite/node-id") + content, err := ioutil.ReadFile(idFile) + if os.IsNotExist(err) { + content = nil + } else if err != nil { + return 0, err + } + + idStr := strings.TrimSpace(string(content)) + if idStr == "" { + if err := os.MkdirAll(filepath.Dir(idFile), 0700); err != nil { + return 0, err + } + id := rand.Uint64() + if initCluster { + id = 1 + } + return id, ioutil.WriteFile(idFile, []byte(strconv.FormatUint(id, 10)), 0644) + } + + return strconv.ParseUint(idStr, 10, 64) +} + +func (d *DQLite) getBindAddress() string { + // only anonymous works??? + return "@" + filepath.Join(d.DataDir, "db", "state.dqlite", "dqlite.sock") +} diff --git a/pkg/server/router.go b/pkg/server/router.go index 83ed3e06f0..9909d13288 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -49,6 +49,7 @@ func router(serverConfig *config.Control, tunnel http.Handler, ca []byte) http.H serverAuthed := mux.NewRouter() serverAuthed.Use(authMiddleware(serverConfig, "k3s:server")) serverAuthed.NotFoundHandler = nodeAuthed + serverAuthed.Path("/db/info").Handler(nodeAuthed) serverAuthed.Path("/v1-k3s/server-bootstrap").Handler(bootstrap.Handler(&serverConfig.Runtime.ControlRuntimeBootstrap)) staticDir := filepath.Join(serverConfig.DataDir, "static")