mirror of https://github.com/k3s-io/k3s
139 lines
2.9 KiB
Go
139 lines
2.9 KiB
Go
|
// +build dqlite
|
||
|
|
||
|
package cluster
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/tls"
|
||
|
"encoding/json"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/canonical/go-dqlite/client"
|
||
|
"github.com/rancher/dynamiclistener/factory"
|
||
|
"github.com/rancher/k3s/pkg/clientaccess"
|
||
|
"github.com/rancher/k3s/pkg/daemons/config"
|
||
|
"github.com/rancher/k3s/pkg/dqlite"
|
||
|
"github.com/rancher/kine/pkg/endpoint"
|
||
|
v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
||
|
"github.com/sirupsen/logrus"
|
||
|
)
|
||
|
|
||
|
func (c *Cluster) testClusterDB(ctx context.Context) error {
|
||
|
if !c.enabled() {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
dqlite := c.db.(*dqlite.DQLite)
|
||
|
for {
|
||
|
if err := dqlite.Test(ctx); err != nil {
|
||
|
logrus.Infof("Failed to test dqlite connection: %v", err)
|
||
|
} else {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-time.After(2 * time.Second):
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Cluster) initClusterDB(ctx context.Context, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
|
||
|
if !c.enabled() {
|
||
|
return l, handler, nil
|
||
|
}
|
||
|
|
||
|
dqlite := dqlite.New(c.config.DataDir, c.config.AdvertiseIP, c.config.AdvertisePort, func() v1.NodeController {
|
||
|
if c.runtime.Core == nil {
|
||
|
return nil
|
||
|
}
|
||
|
return c.runtime.Core.Core().V1().Node()
|
||
|
})
|
||
|
|
||
|
certs, err := toGetCerts(c.runtime)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
handler, err = dqlite.Start(ctx, c.config.ClusterInit, certs, handler)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
|
||
|
if c.config.ClusterReset {
|
||
|
if err := dqlite.Reset(ctx); err == nil {
|
||
|
logrus.Info("Cluster reset")
|
||
|
os.Exit(0)
|
||
|
} else {
|
||
|
logrus.Fatal("Cluster reset failed: %v", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
c.db = dqlite
|
||
|
if !strings.HasPrefix(c.config.Storage.Endpoint, "dqlite://") {
|
||
|
c.config.Storage = endpoint.Config{
|
||
|
Endpoint: dqlite.StorageEndpoint,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return l, handler, err
|
||
|
}
|
||
|
|
||
|
func (c *Cluster) enabled() bool {
|
||
|
stamp := filepath.Join(c.config.DataDir, "db", "state.dqlite")
|
||
|
if _, err := os.Stat(stamp); err == nil {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
return c.config.Storage.Endpoint == "" && (c.config.ClusterInit || c.runtime.Cluster.Join)
|
||
|
}
|
||
|
|
||
|
func (c *Cluster) postJoin(ctx context.Context) error {
|
||
|
if !c.enabled() {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
resp, err := clientaccess.Get("/db/info", c.clientAccessInfo)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
dqlite := c.db.(*dqlite.DQLite)
|
||
|
var nodes []client.NodeInfo
|
||
|
|
||
|
if err := json.Unmarshal(resp, &nodes); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return dqlite.Join(ctx, nodes)
|
||
|
}
|
||
|
|
||
|
func toGetCerts(runtime *config.ControlRuntime) (*dqlite.Certs, error) {
|
||
|
clientCA, _, err := factory.LoadCerts(runtime.ClientCA, runtime.ClientCAKey)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
ca, _, err := factory.LoadCerts(runtime.ServerCA, runtime.ServerCAKey)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
clientCert, err := tls.LoadX509KeyPair(runtime.ClientKubeAPICert, runtime.ClientKubeAPIKey)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &dqlite.Certs{
|
||
|
ServerTrust: ca,
|
||
|
ClientTrust: clientCA,
|
||
|
ClientCert: clientCert,
|
||
|
}, nil
|
||
|
}
|