mirror of https://github.com/k3s-io/k3s
Move temporary etcd startup into etcd module
Reuse the existing etcd library code to start up the temporary etcd server for bootstrap reconcile. This allows us to do proper health-checking of the datastore on startup, including handling of alarms. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/5157/head
parent
4b064b5d22
commit
e4846c92b4
2
go.mod
2
go.mod
|
@ -93,7 +93,7 @@ require (
|
|||
github.com/onsi/gomega v1.17.0
|
||||
github.com/opencontainers/runc v1.0.3
|
||||
github.com/opencontainers/selinux v1.8.3
|
||||
github.com/otiai10/copy v1.6.0
|
||||
github.com/otiai10/copy v1.7.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/rancher/dynamiclistener v0.3.1
|
||||
github.com/rancher/lasso v0.0.0-20210616224652-fc3ebd901c08
|
||||
|
|
8
go.sum
8
go.sum
|
@ -922,13 +922,13 @@ github.com/opencontainers/selinux v1.8.3 h1:tzZR7AuKB5gU1+53uBkoG4XdIFGZzvJTOVoN
|
|||
github.com/opencontainers/selinux v1.8.3/go.mod h1:HTvjPFoGMbpQsG886e3lQwnsRWtE4TC1OF3OUvG9FAo=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/osrg/gobgp v0.0.0-20210801043420-9e48a36ed97c/go.mod h1:aNi0T2X6FSkl1evOifmJUsdxiQ1AQkiV7fIEtLIVv/U=
|
||||
github.com/otiai10/copy v1.6.0 h1:IinKAryFFuPONZ7cm6T6E2QX/vcJwSnlaA5lfoaXIiQ=
|
||||
github.com/otiai10/copy v1.6.0/go.mod h1:XWfuS3CrI0R6IE0FbgHsEazaXO8G0LpMp9o8tos0x4E=
|
||||
github.com/otiai10/copy v1.7.0 h1:hVoPiN+t+7d2nzzwMiDHPSOogsWAStewq3TwU05+clE=
|
||||
github.com/otiai10/copy v1.7.0/go.mod h1:rmRl6QPdJj6EiUqXQ/4Nn2lLXoNQjFCQbbNrxgc/t3U=
|
||||
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE=
|
||||
github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs=
|
||||
github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo=
|
||||
github.com/otiai10/mint v1.3.2 h1:VYWnrP5fXmz1MXvjuUvcBrXSjGE6xjON+axB/UrpO3E=
|
||||
github.com/otiai10/mint v1.3.2/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
|
||||
github.com/otiai10/mint v1.3.3 h1:7JgpsBaN0uMkyju4tbYHu0mnM55hNKVYLsXmwr15NQI=
|
||||
github.com/otiai10/mint v1.3.3/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/paulmach/orb v0.1.3/go.mod h1:VFlX/8C+IQ1p6FTRRKzKoOPJnvEtA5G0Veuqwbu//Vk=
|
||||
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
|
||||
|
|
|
@ -88,7 +88,9 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initialized, err := e.IsInitialized(ctx, &serverConfig.ControlConfig)
|
||||
if err != nil {
|
||||
|
@ -138,7 +140,9 @@ func delete(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin)
|
||||
if err != nil {
|
||||
|
@ -179,7 +183,9 @@ func list(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sf, err := e.ListSnapshots(ctx)
|
||||
if err != nil {
|
||||
|
@ -246,7 +252,9 @@ func prune(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
ctx := signals.SetupSignalContext()
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
if err := e.SetControlConfig(ctx, &serverConfig.ControlConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin)
|
||||
if err != nil {
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
@ -23,11 +22,9 @@ import (
|
|||
"github.com/rancher/k3s/pkg/bootstrap"
|
||||
"github.com/rancher/k3s/pkg/clientaccess"
|
||||
"github.com/rancher/k3s/pkg/daemons/config"
|
||||
"github.com/rancher/k3s/pkg/daemons/executor"
|
||||
"github.com/rancher/k3s/pkg/etcd"
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
)
|
||||
|
||||
// Bootstrap attempts to load a managed database driver, if one has been initialized or should be created/joined.
|
||||
|
@ -63,58 +60,8 @@ func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error {
|
|||
// reading the data, and comparing that to the data on disk, all the while
|
||||
// starting normal etcd.
|
||||
if isInitialized {
|
||||
logrus.Info("Starting local etcd to reconcile with datastore")
|
||||
tmpDataDir := filepath.Join(c.config.DataDir, "db", "tmp-etcd")
|
||||
os.RemoveAll(tmpDataDir)
|
||||
if err := os.Mkdir(tmpDataDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
etcdDataDir := etcd.DBDir(c.config)
|
||||
if err := createTmpDataDir(etcdDataDir, tmpDataDir); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := os.RemoveAll(tmpDataDir); err != nil {
|
||||
logrus.Warn("failed to remove etcd temp dir", err)
|
||||
}
|
||||
}()
|
||||
|
||||
args := executor.ETCDConfig{
|
||||
DataDir: tmpDataDir,
|
||||
ForceNewCluster: true,
|
||||
ListenClientURLs: "http://127.0.0.1:2399",
|
||||
Logger: "zap",
|
||||
HeartbeatInterval: 500,
|
||||
ElectionTimeout: 5000,
|
||||
LogOutputs: []string{"stderr"},
|
||||
}
|
||||
configFile, err := args.ToConfigFile(c.config.ExtraEtcdArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg, err := embed.ConfigFromFile(configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
etcd, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer etcd.Close()
|
||||
|
||||
data, err := c.retrieveInitializedDBdata(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ec := endpoint.ETCDConfig{
|
||||
Endpoints: []string{"http://127.0.0.1:2399"},
|
||||
LeaderElect: false,
|
||||
}
|
||||
|
||||
if err := c.ReconcileBootstrapData(ctx, bytes.NewReader(data.Bytes()), &c.config.Runtime.ControlRuntimeBootstrap, false, &ec); err != nil {
|
||||
logrus.Fatal(err)
|
||||
if err := c.reconcileEtcd(ctx); err != nil {
|
||||
logrus.Fatalf("Failed to reconcile with temporary etcd: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -127,69 +74,6 @@ func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// copyFile copies the contents of the src file
|
||||
// to the given destination file.
|
||||
func copyFile(src, dst string) error {
|
||||
srcfd, err := os.Open(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer srcfd.Close()
|
||||
|
||||
dstfd, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dstfd.Close()
|
||||
|
||||
if _, err = io.Copy(dstfd, srcfd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
srcinfo, err := os.Stat(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.Chmod(dst, srcinfo.Mode())
|
||||
}
|
||||
|
||||
// createTmpDataDir creates a temporary directory and copies the
|
||||
// contents of the original etcd data dir to be used
|
||||
// by etcd when reading data.
|
||||
func createTmpDataDir(src, dst string) error {
|
||||
srcinfo, err := os.Stat(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dst, srcinfo.Mode()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fds, err := ioutil.ReadDir(src)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fd := range fds {
|
||||
srcfp := path.Join(src, fd.Name())
|
||||
dstfp := path.Join(dst, fd.Name())
|
||||
|
||||
if fd.IsDir() {
|
||||
if err = createTmpDataDir(srcfp, dstfp); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
} else {
|
||||
if err = copyFile(srcfp, dstfp); err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldBootstrapLoad returns true if we need to load ControlRuntimeBootstrap data again and a second boolean
|
||||
// indicating that the server has or has not been initialized, if etcd. This is controlled by a stamp file on
|
||||
// disk that records successful bootstrap using a hash of the join token.
|
||||
|
@ -340,7 +224,7 @@ func isMigrated(buf io.ReadSeeker, files *bootstrap.PathsDataformat) bool {
|
|||
// and depending on where the difference is. If the datastore is newer,
|
||||
// then the data will be written to disk. If the data on disk is newer,
|
||||
// k3s will exit with an error.
|
||||
func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap, isHTTP bool, ec *endpoint.ETCDConfig) error {
|
||||
func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker, crb *config.ControlRuntimeBootstrap, isHTTP bool) error {
|
||||
logrus.Info("Reconciling bootstrap data between datastore and disk")
|
||||
|
||||
if err := c.certDirsExist(); err != nil {
|
||||
|
@ -383,14 +267,7 @@ func (c *Cluster) ReconcileBootstrapData(ctx context.Context, buf io.ReadSeeker,
|
|||
|
||||
var value *client.Value
|
||||
|
||||
var etcdConfig endpoint.ETCDConfig
|
||||
if ec != nil {
|
||||
etcdConfig = *ec
|
||||
} else {
|
||||
etcdConfig = c.config.Runtime.EtcdConfig
|
||||
}
|
||||
|
||||
storageClient, err := client.New(etcdConfig)
|
||||
storageClient, err := client.New(c.config.Runtime.EtcdConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -596,7 +473,7 @@ func (c *Cluster) httpBootstrap(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap, true, nil)
|
||||
return c.ReconcileBootstrapData(ctx, bytes.NewReader(content), &c.config.Runtime.ControlRuntimeBootstrap, true)
|
||||
}
|
||||
|
||||
func (c *Cluster) retrieveInitializedDBdata(ctx context.Context) (*bytes.Buffer, error) {
|
||||
|
@ -671,3 +548,49 @@ func ipsTo16Bytes(mySlice []*net.IPNet) {
|
|||
ipNet.IP = ipNet.IP.To16()
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileEtcd starts a temporary single-member etcd cluster using a copy of the
|
||||
// etcd database, and uses it to reconcile bootstrap data. This is necessary
|
||||
// because the full etcd cluster may not have quorum during startup, but we still
|
||||
// need to extract data from the datastore.
|
||||
func (c *Cluster) reconcileEtcd(ctx context.Context) error {
|
||||
logrus.Info("Starting temporary etcd to reconcile with datastore")
|
||||
|
||||
tempConfig := endpoint.ETCDConfig{Endpoints: []string{"http://127.0.0.1:2399"}}
|
||||
originalConfig := c.config.Runtime.EtcdConfig
|
||||
c.config.Runtime.EtcdConfig = tempConfig
|
||||
reconcileCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
c.config.Runtime.EtcdConfig = originalConfig
|
||||
}()
|
||||
|
||||
e := etcd.NewETCD()
|
||||
if err := e.SetControlConfig(reconcileCtx, c.config); err != nil {
|
||||
return err
|
||||
}
|
||||
e.StartEmbeddedTemporary(reconcileCtx)
|
||||
|
||||
for {
|
||||
if err := e.Test(reconcileCtx); err != nil {
|
||||
logrus.Infof("Failed to test data store connection: %v", err)
|
||||
} else {
|
||||
logrus.Info(e.EndpointName() + " data store connection OK")
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-reconcileCtx.Done():
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
data, err := c.retrieveInitializedDBdata(reconcileCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.ReconcileBootstrapData(reconcileCtx, bytes.NewReader(data.Bytes()), &c.config.Runtime.ControlRuntimeBootstrap, false)
|
||||
}
|
||||
|
|
|
@ -136,7 +136,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap, false, nil)
|
||||
return c.ReconcileBootstrapData(ctx, bytes.NewReader(data), &c.config.Runtime.ControlRuntimeBootstrap, false)
|
||||
}
|
||||
|
||||
// getBootstrapKeyFromStorage will list all keys that has prefix /bootstrap and will check for key that is
|
||||
|
|
|
@ -41,10 +41,6 @@ func init() {
|
|||
executor = &Embedded{}
|
||||
}
|
||||
|
||||
type Embedded struct {
|
||||
nodeConfig *daemonconfig.Node
|
||||
}
|
||||
|
||||
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
|
||||
e.nodeConfig = nodeConfig
|
||||
return nil
|
||||
|
@ -193,6 +189,10 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) {
|
||||
return InitialOptions{}, nil
|
||||
}
|
||||
|
||||
// waitForUntaintedNode watches nodes, waiting to find one not tainted as
|
||||
// uninitialized by the external cloud provider.
|
||||
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
//go:build !no_embedded_executor
|
||||
// +build !no_embedded_executor
|
||||
|
||||
package executor
|
||||
|
||||
import (
|
||||
|
@ -9,17 +6,18 @@ import (
|
|||
"io/ioutil"
|
||||
"path/filepath"
|
||||
|
||||
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
|
||||
"github.com/rancher/k3s/pkg/version"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
|
||||
)
|
||||
|
||||
func (e Embedded) CurrentETCDOptions() (InitialOptions, error) {
|
||||
return InitialOptions{}, nil
|
||||
type Embedded struct {
|
||||
nodeConfig *daemonconfig.Node
|
||||
}
|
||||
|
||||
func (e Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
|
||||
func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
|
||||
configFile, err := args.ToConfigFile(extraArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
125
pkg/etcd/etcd.go
125
pkg/etcd/etcd.go
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/k3s-io/kine/pkg/client"
|
||||
endpoint2 "github.com/k3s-io/kine/pkg/endpoint"
|
||||
"github.com/minio/minio-go/v7"
|
||||
cp "github.com/otiai10/copy"
|
||||
"github.com/pkg/errors"
|
||||
certutil "github.com/rancher/dynamiclistener/cert"
|
||||
"github.com/rancher/k3s/pkg/clientaccess"
|
||||
|
@ -47,7 +48,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
endpoint = "https://127.0.0.1:2379"
|
||||
defaultEndpoint = "https://127.0.0.1:2379"
|
||||
testTimeout = time.Second * 10
|
||||
manageTickerTime = time.Second * 15
|
||||
learnerMaxStallTime = time.Minute * 5
|
||||
|
@ -118,8 +119,22 @@ func (e *ETCD) EndpointName() string {
|
|||
}
|
||||
|
||||
// SetControlConfig sets the given config on the etcd struct.
|
||||
func (e *ETCD) SetControlConfig(config *config.Control) {
|
||||
func (e *ETCD) SetControlConfig(ctx context.Context, config *config.Control) error {
|
||||
e.config = config
|
||||
|
||||
client, err := GetClient(ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.client = client
|
||||
|
||||
address, err := GetAdvertiseAddress(config.PrivateIP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.address = address
|
||||
|
||||
return e.setName(false)
|
||||
}
|
||||
|
||||
// Test ensures that the local node is a voting member of the target cluster.
|
||||
|
@ -129,7 +144,8 @@ func (e *ETCD) Test(ctx context.Context) error {
|
|||
ctx, cancel := context.WithTimeout(ctx, testTimeout)
|
||||
defer cancel()
|
||||
|
||||
status, err := e.client.Status(ctx, endpoint)
|
||||
endpoints := getEndpoints(e.config.Runtime)
|
||||
status, err := e.client.Status(ctx, endpoints[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -318,6 +334,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("Starting etcd for existing cluster member")
|
||||
return e.cluster(ctx, false, opt)
|
||||
}
|
||||
|
||||
|
@ -425,7 +442,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
cluster = append(cluster, fmt.Sprintf("%s=%s", e.name, e.peerURL()))
|
||||
}
|
||||
|
||||
logrus.Infof("Starting etcd for cluster %v", cluster)
|
||||
logrus.Infof("Starting etcd to join cluster with members %v", cluster)
|
||||
return e.cluster(ctx, false, executor.InitialOptions{
|
||||
Cluster: strings.Join(cluster, ","),
|
||||
State: "existing",
|
||||
|
@ -436,7 +453,7 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
func (e *ETCD) Register(ctx context.Context, config *config.Control, handler http.Handler) (http.Handler, error) {
|
||||
e.config = config
|
||||
|
||||
client, err := GetClient(ctx, e.config.Runtime, endpoint)
|
||||
client, err := GetClient(ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -447,7 +464,9 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, handler htt
|
|||
return nil, err
|
||||
}
|
||||
e.address = address
|
||||
e.config.Datastore.Endpoint = endpoint
|
||||
|
||||
endpoints := getEndpoints(config.Runtime)
|
||||
e.config.Datastore.Endpoint = endpoints[0]
|
||||
e.config.Datastore.BackendTLSConfig.CAFile = e.config.Runtime.ETCDServerCA
|
||||
e.config.Datastore.BackendTLSConfig.CertFile = e.config.Runtime.ClientETCDCert
|
||||
e.config.Datastore.BackendTLSConfig.KeyFile = e.config.Runtime.ClientETCDKey
|
||||
|
@ -530,7 +549,11 @@ func (e *ETCD) infoHandler() http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
// GetClient returns an etcd client connected to the specified endpoints
|
||||
// GetClient returns an etcd client connected to the specified endpoints.
|
||||
// If no endpoints are provided, endpoints are retrieved from the provided runtime config.
|
||||
// If the runtime config does not list any endpoints, the default endpoint is used.
|
||||
// The returned client should be closed when no longer needed, in order to avoid leaking GRPC
|
||||
// client goroutines.
|
||||
func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*clientv3.Client, error) {
|
||||
cfg, err := getClientConfig(ctx, runtime, endpoints...)
|
||||
if err != nil {
|
||||
|
@ -540,8 +563,12 @@ func GetClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ..
|
|||
return clientv3.New(*cfg)
|
||||
}
|
||||
|
||||
// getClientConfig generates an etcd client config connected to the specified endpoints
|
||||
// getClientConfig generates an etcd client config connected to the specified endpoints.
|
||||
// If no endpoints are provided, getEndpoints is called to provide defaults.
|
||||
func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*clientv3.Config, error) {
|
||||
if len(endpoints) == 0 {
|
||||
endpoints = getEndpoints(runtime)
|
||||
}
|
||||
tlsConfig, err := toTLSConfig(runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -556,6 +583,14 @@ func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoi
|
|||
}, nil
|
||||
}
|
||||
|
||||
// getEndpoints returns the endpoints from the runtime config if set, otherwise the default endpoint.
|
||||
func getEndpoints(runtime *config.ControlRuntime) []string {
|
||||
if len(runtime.EtcdConfig.Endpoints) > 0 {
|
||||
return runtime.EtcdConfig.Endpoints
|
||||
}
|
||||
return []string{defaultEndpoint}
|
||||
}
|
||||
|
||||
// toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable
|
||||
// for use by etcd.
|
||||
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
|
||||
|
@ -595,6 +630,7 @@ func GetAdvertiseAddress(advertiseIP string) (string, error) {
|
|||
|
||||
// newCluster returns options to set up etcd for a new cluster
|
||||
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
|
||||
logrus.Infof("Starting etcd for new cluster")
|
||||
err := e.cluster(ctx, reset, executor.InitialOptions{
|
||||
AdvertisePeerURL: e.peerURL(),
|
||||
Cluster: fmt.Sprintf("%s=%s", e.name, e.peerURL()),
|
||||
|
@ -637,7 +673,7 @@ func (e *ETCD) migrateFromSQLite(ctx context.Context) error {
|
|||
}
|
||||
defer sqliteClient.Close()
|
||||
|
||||
etcdClient, err := GetClient(ctx, e.config.Runtime, "https://localhost:2379")
|
||||
etcdClient, err := GetClient(ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -708,6 +744,61 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
|
|||
}, e.config.ExtraEtcdArgs)
|
||||
}
|
||||
|
||||
func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
|
||||
etcdDataDir := DBDir(e.config)
|
||||
tmpDataDir := etcdDataDir + "-tmp"
|
||||
|
||||
os.RemoveAll(tmpDataDir)
|
||||
if err := os.Mkdir(tmpDataDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := os.RemoveAll(tmpDataDir); err != nil {
|
||||
logrus.Warnf("Failed to remove etcd temp dir: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := cp.Copy(etcdDataDir, tmpDataDir, cp.Options{PreserveOwner: true}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
endpoints := getEndpoints(e.config.Runtime)
|
||||
clientURL := endpoints[0]
|
||||
peerURL, err := addPort(endpoints[0], 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
embedded := executor.Embedded{}
|
||||
return embedded.ETCD(ctx, executor.ETCDConfig{
|
||||
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
|
||||
DataDir: tmpDataDir,
|
||||
ForceNewCluster: true,
|
||||
AdvertiseClientURLs: clientURL,
|
||||
ListenClientURLs: clientURL,
|
||||
ListenPeerURLs: peerURL,
|
||||
Logger: "zap",
|
||||
HeartbeatInterval: 500,
|
||||
ElectionTimeout: 5000,
|
||||
Name: e.name,
|
||||
LogOutputs: []string{"stderr"},
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func addPort(address string, offset int) (string, error) {
|
||||
u, err := url.Parse(address)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
port, err := strconv.Atoi(u.Port())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
port += offset
|
||||
return fmt.Sprintf("%s://%s:%d", u.Scheme, u.Hostname(), port), nil
|
||||
}
|
||||
|
||||
// RemovePeer removes a peer from the cluster. The peer name and IP address must both match.
|
||||
func (e *ETCD) RemovePeer(ctx context.Context, name, address string, allowSelfRemoval bool) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout)
|
||||
|
@ -760,7 +851,8 @@ func (e *ETCD) manageLearners(ctx context.Context) {
|
|||
logrus.Debug("Etcd client was nil")
|
||||
continue
|
||||
}
|
||||
if status, err := e.client.Status(ctx, endpoint); err != nil {
|
||||
endpoints := getEndpoints(e.config.Runtime)
|
||||
if status, err := e.client.Status(ctx, endpoints[0]); err != nil {
|
||||
logrus.Errorf("Failed to check local etcd status for learner management: %v", err)
|
||||
continue
|
||||
} else if status.Header.MemberId != status.Leader {
|
||||
|
@ -913,7 +1005,8 @@ func (e *ETCD) clearAlarms(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// clientURLs returns a list of all non-learner etcd cluster member client access URLs
|
||||
// clientURLs returns a list of all non-learner etcd cluster member client access URLs.
|
||||
// The list is retrieved from the remote server that is being joined.
|
||||
func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) {
|
||||
var memberList Members
|
||||
resp, err := clientAccessInfo.Get("/db/info")
|
||||
|
@ -980,7 +1073,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err
|
|||
if e.config == nil {
|
||||
e.config = config
|
||||
}
|
||||
client, err := GetClient(ctx, e.config.Runtime, endpoint)
|
||||
client, err := GetClient(ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1100,7 +1193,8 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
|||
}
|
||||
}
|
||||
|
||||
status, err := e.client.Status(ctx, endpoint)
|
||||
endpoints := getEndpoints(e.config.Runtime)
|
||||
status, err := e.client.Status(ctx, endpoints[0])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to check etcd status for snapshot")
|
||||
}
|
||||
|
@ -1115,7 +1209,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
|||
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||
}
|
||||
|
||||
cfg, err := getClientConfig(ctx, e.config.Runtime, endpoint)
|
||||
cfg, err := getClientConfig(ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get config for etcd snapshot")
|
||||
}
|
||||
|
@ -1838,8 +1932,9 @@ func backupDirWithRetention(dir string, maxBackupRetention int) (string, error)
|
|||
}
|
||||
|
||||
// GetAPIServerURLsFromETCD will try to fetch the version.Program/apiaddresses key from etcd
|
||||
// and unmarshal it to a list of apiserver endpoints.
|
||||
func GetAPIServerURLsFromETCD(ctx context.Context, cfg *config.Control) ([]string, error) {
|
||||
cl, err := GetClient(ctx, cfg.Runtime, endpoint)
|
||||
cl, err := GetClient(ctx, cfg.Runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
|
||||
e.config.EtcdDisableSnapshots = true
|
||||
testutil.GenerateRuntime(e.config)
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint)
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime)
|
||||
e.client = client
|
||||
|
||||
return err
|
||||
|
@ -254,6 +254,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(10 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
@ -274,7 +275,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
setup: func(e *ETCD, ctxInfo *contextInfo) error {
|
||||
ctxInfo.ctx, ctxInfo.cancel = context.WithCancel(context.Background())
|
||||
testutil.GenerateRuntime(e.config)
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint)
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime)
|
||||
e.client = client
|
||||
|
||||
return err
|
||||
|
@ -284,6 +285,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(5 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
@ -306,7 +308,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := testutil.GenerateRuntime(e.config); err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime, endpoint)
|
||||
client, err := GetClient(ctxInfo.ctx, e.config.Runtime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -318,6 +320,7 @@ func Test_UnitETCD_Start(t *testing.T) {
|
|||
if err := e.RemoveSelf(ctxInfo.ctx); err != nil && err.Error() != etcdserver.ErrNotEnoughStartedMembers.Error() {
|
||||
return err
|
||||
}
|
||||
e.client.Close()
|
||||
ctxInfo.cancel()
|
||||
time.Sleep(5 * time.Second)
|
||||
testutil.CleanupDataDir(e.config)
|
||||
|
|
Loading…
Reference in New Issue