mirror of https://github.com/k3s-io/k3s
Skip etcd snapshots if the local endpoint is still a learner (#2295)
* Don't take snapshots if the local endpoint is still a learner * Configure timeouts for etcd client dialerpull/2318/head
parent
f5b506ccaf
commit
42bba04651
134
pkg/etcd/etcd.go
134
pkg/etcd/etcd.go
|
@ -51,8 +51,12 @@ func NewETCD() *ETCD {
|
|||
const (
|
||||
snapshotPrefix = "etcd-snapshot-"
|
||||
endpoint = "https://127.0.0.1:2379"
|
||||
testTimeout = time.Second * 10
|
||||
|
||||
testTimeout = time.Second * 10
|
||||
// defaults from etcdctl/ctlv3/ctl.go
|
||||
defaultDialTimeout = 2 * time.Second
|
||||
defaultKeepAliveTime = 2 * time.Second
|
||||
defaultKeepAliveTimeOut = 6 * time.Second
|
||||
)
|
||||
|
||||
// Members contains a slice that holds all
|
||||
|
@ -66,6 +70,8 @@ func (e *ETCD) EndpointName() string {
|
|||
return "etcd"
|
||||
}
|
||||
|
||||
// Test ensures that the local node is a part of the target cluster. If it is a learner, a goroutine
|
||||
// will be started to promote it to full member. If it is not a part of the cluster, an error is raised.
|
||||
func (e *ETCD) Test(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, testTimeout)
|
||||
defer cancel()
|
||||
|
@ -100,29 +106,37 @@ func (e *ETCD) Test(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
return fmt.Errorf(msg)
|
||||
}
|
||||
|
||||
// walDir returns the path to dataDir/member/wal
|
||||
func walDir(config *config.Control) string {
|
||||
return filepath.Join(dataDir(config), "member", "wal")
|
||||
}
|
||||
|
||||
// dataDir returns the path to dataDir/db/etcd
|
||||
func dataDir(config *config.Control) string {
|
||||
return filepath.Join(config.DataDir, "db", "etcd")
|
||||
}
|
||||
|
||||
// nameFile returns the path to dataDir/name
|
||||
func nameFile(config *config.Control) string {
|
||||
return filepath.Join(dataDir(config), "name")
|
||||
}
|
||||
|
||||
// IsInitialized checks to see if a WAL directory exists. If so, we assume that etcd
|
||||
// has already been brought up at least once.
|
||||
func (e *ETCD) IsInitialized(ctx context.Context, config *config.Control) (bool, error) {
|
||||
if s, err := os.Stat(walDir(config)); err == nil && s.IsDir() {
|
||||
dir := walDir(config)
|
||||
if s, err := os.Stat(dir); err == nil && s.IsDir() {
|
||||
return true, nil
|
||||
} else if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, errors.Wrapf(err, "failed to test if etcd is initialized")
|
||||
return false, errors.Wrapf(err, "invalid state for wal directory %s", dir)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset resets an etcd node
|
||||
func (e *ETCD) Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
// Wait for etcd to come up as a new single-node cluster, then exit
|
||||
go func() {
|
||||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
|
@ -140,18 +154,21 @@ func (e *ETCD) Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// If asked to restore from a snapshot, do so
|
||||
if e.config.ClusterResetRestorePath != "" {
|
||||
info, err := os.Stat(e.config.ClusterResetRestorePath)
|
||||
if os.IsNotExist(err) {
|
||||
return fmt.Errorf("etcd: snapshot path does not exist: %s", e.config.ClusterResetRestorePath)
|
||||
}
|
||||
if info.IsDir() {
|
||||
return fmt.Errorf("etcd: snapshot path is directory: %s", e.config.ClusterResetRestorePath)
|
||||
return fmt.Errorf("etcd: snapshot path must be a file, not a directory: %s", e.config.ClusterResetRestorePath)
|
||||
}
|
||||
if err := e.Restore(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := e.setName(true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -159,10 +176,11 @@ func (e *ETCD) Reset(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
|||
return e.newCluster(ctx, true)
|
||||
}
|
||||
|
||||
// Start starts the datastore
|
||||
func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
existingCluster, err := e.IsInitialized(ctx, e.config)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to validation")
|
||||
return errors.Wrapf(err, "configuration validation failed")
|
||||
}
|
||||
|
||||
e.config.Runtime.ClusterControllerStart = func(ctx context.Context) error {
|
||||
|
@ -190,13 +208,14 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
|
|||
return errors.Wrap(err, "joining etcd cluster")
|
||||
}
|
||||
|
||||
// join attempts to add a member to an existing cluster
|
||||
func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
clientURLs, memberList, err := e.clientURLs(ctx, clientAccessInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := joinClient(ctx, e.runtime, clientURLs)
|
||||
client, err := getClient(ctx, e.runtime, clientURLs...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -257,11 +276,12 @@ func (e *ETCD) join(ctx context.Context, clientAccessInfo *clientaccess.Info) er
|
|||
})
|
||||
}
|
||||
|
||||
// Register configures a new etcd client and adds db info routes for the http listener.
|
||||
func (e *ETCD) Register(ctx context.Context, config *config.Control, l net.Listener, handler http.Handler) (net.Listener, http.Handler, error) {
|
||||
e.config = config
|
||||
e.runtime = config.Runtime
|
||||
|
||||
client, err := newClient(ctx, e.runtime)
|
||||
client, err := getClient(ctx, e.runtime, endpoint)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -285,6 +305,9 @@ func (e *ETCD) Register(ctx context.Context, config *config.Control, l net.Liste
|
|||
return l, e.handler(handler), err
|
||||
}
|
||||
|
||||
// setName sets a unique name for this cluster member. The first time this is called,
|
||||
// or if force is set to true, a new name will be generated and written to disk. The persistent
|
||||
// name is used on subsequent calls.
|
||||
func (e *ETCD) setName(force bool) error {
|
||||
fileName := nameFile(e.config)
|
||||
data, err := ioutil.ReadFile(fileName)
|
||||
|
@ -305,6 +328,7 @@ func (e *ETCD) setName(force bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// handler handles request routing for the base http listener
|
||||
func (e *ETCD) handler(next http.Handler) http.Handler {
|
||||
mux := mux.NewRouter()
|
||||
mux.Handle("/db/info", e.infoHandler())
|
||||
|
@ -312,6 +336,7 @@ func (e *ETCD) handler(next http.Handler) http.Handler {
|
|||
return mux
|
||||
}
|
||||
|
||||
// infoHandler returns etcd cluster information. This is used by new members when joining the custer.
|
||||
func (e *ETCD) infoHandler() http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second)
|
||||
|
@ -338,36 +363,36 @@ func (e *ETCD) infoHandler() http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
func joinClient(ctx context.Context, runtime *config.ControlRuntime, peers []string) (*etcd.Client, error) {
|
||||
// getClient returns an etcd client connected to the specified endpoints
|
||||
func getClient(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Client, error) {
|
||||
cfg, err := getClientConfig(ctx, runtime, endpoints...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return etcd.New(*cfg)
|
||||
}
|
||||
|
||||
//getClientConfig generates an etcd client config connected to the specified endpoints
|
||||
func getClientConfig(ctx context.Context, runtime *config.ControlRuntime, endpoints ...string) (*etcd.Config, error) {
|
||||
tlsConfig, err := toTLSConfig(runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Endpoints: peers,
|
||||
TLS: tlsConfig,
|
||||
Context: ctx,
|
||||
cfg := &etcd.Config{
|
||||
Endpoints: endpoints,
|
||||
TLS: tlsConfig,
|
||||
Context: ctx,
|
||||
DialTimeout: defaultDialTimeout,
|
||||
DialKeepAliveTime: defaultKeepAliveTime,
|
||||
DialKeepAliveTimeout: defaultKeepAliveTimeOut,
|
||||
}
|
||||
|
||||
return etcd.New(cfg)
|
||||
}
|
||||
|
||||
func newClient(ctx context.Context, runtime *config.ControlRuntime) (*etcd.Client, error) {
|
||||
tlsConfig, err := toTLSConfig(runtime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Context: ctx,
|
||||
Endpoints: []string{endpoint},
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
|
||||
return etcd.New(cfg)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// toTLSConfig converts the ControlRuntime configuration to TLS configuration suitable
|
||||
// for use by etcd.
|
||||
func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
|
||||
clientCert, err := tls.LoadX509KeyPair(runtime.ClientETCDCert, runtime.ClientETCDKey)
|
||||
if err != nil {
|
||||
|
@ -385,6 +410,7 @@ func toTLSConfig(runtime *config.ControlRuntime) (*tls.Config, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// getAdvertiseAddress returns the IP address best suited for advertising to clients
|
||||
func getAdvertiseAddress(advertiseIP string) (string, error) {
|
||||
ip := advertiseIP
|
||||
if ip == "" {
|
||||
|
@ -398,6 +424,7 @@ func getAdvertiseAddress(advertiseIP string) (string, error) {
|
|||
return ip, nil
|
||||
}
|
||||
|
||||
// newCluster returns options to set up etcd for a new cluster
|
||||
func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
|
||||
return e.cluster(ctx, reset, executor.InitialOptions{
|
||||
AdvertisePeerURL: fmt.Sprintf("https://%s:2380", e.address),
|
||||
|
@ -406,14 +433,17 @@ func (e *ETCD) newCluster(ctx context.Context, reset bool) error {
|
|||
})
|
||||
}
|
||||
|
||||
// peerURL returns the peer access address for the local node
|
||||
func (e *ETCD) peerURL() string {
|
||||
return fmt.Sprintf("https://%s:2380", e.address)
|
||||
}
|
||||
|
||||
// clientURL returns the client access address for the local node
|
||||
func (e *ETCD) clientURL() string {
|
||||
return fmt.Sprintf("https://%s:2379", e.address)
|
||||
}
|
||||
|
||||
// cluster returns ETCDConfig for a cluster
|
||||
func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.InitialOptions) error {
|
||||
return executor.ETCD(executor.ETCDConfig{
|
||||
Name: e.name,
|
||||
|
@ -441,6 +471,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
|
|||
})
|
||||
}
|
||||
|
||||
// removePeer removes a peer from the cluster. The peer ID and IP address must both match.
|
||||
func (e *ETCD) removePeer(ctx context.Context, id, address string) error {
|
||||
members, err := e.client.MemberList(ctx)
|
||||
if err != nil {
|
||||
|
@ -467,6 +498,10 @@ func (e *ETCD) removePeer(ctx context.Context, id, address string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// promoteMember attempts to promote any learners to full members at 5 second intervals.
|
||||
// It will return when a member has been promoted. Usually this function is run on the node
|
||||
// that has just been added to the cluster and is trying to promote itself. If it is run when there
|
||||
// are no learners, it will never return.
|
||||
func (e *ETCD) promoteMember(ctx context.Context, clientAccessInfo *clientaccess.Info) error {
|
||||
clientURLs, _, err := e.clientURLs(ctx, clientAccessInfo)
|
||||
if err != nil {
|
||||
|
@ -476,7 +511,7 @@ func (e *ETCD) promoteMember(ctx context.Context, clientAccessInfo *clientaccess
|
|||
t := time.NewTicker(5 * time.Second)
|
||||
defer t.Stop()
|
||||
for range t.C {
|
||||
client, err := joinClient(ctx, e.runtime, clientURLs)
|
||||
client, err := getClient(ctx, e.runtime, clientURLs...)
|
||||
// continue on errors to keep trying to promote member
|
||||
// grpc error are shown so no need to re log them
|
||||
if err != nil {
|
||||
|
@ -503,6 +538,7 @@ func (e *ETCD) promoteMember(ctx context.Context, clientAccessInfo *clientaccess
|
|||
return nil
|
||||
}
|
||||
|
||||
// clientURLs returns a list of all non-learner etcd cluster member client access URLs
|
||||
func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info) ([]string, Members, error) {
|
||||
var memberList Members
|
||||
resp, err := clientaccess.Get("/db/info", clientAccessInfo)
|
||||
|
@ -525,6 +561,7 @@ func (e *ETCD) clientURLs(ctx context.Context, clientAccessInfo *clientaccess.In
|
|||
return clientURLs, memberList, nil
|
||||
}
|
||||
|
||||
// snapshotDir ensures that the snapshot directory exists, and then returns its path.
|
||||
func snapshotDir(config *config.Control) (string, error) {
|
||||
if config.EtcdSnapshotDir == "" {
|
||||
// we have to create the snapshot dir if we are using
|
||||
|
@ -547,30 +584,37 @@ func snapshotDir(config *config.Control) (string, error) {
|
|||
return config.EtcdSnapshotDir, nil
|
||||
}
|
||||
|
||||
// snapshot attempts to save a new snapshot to the configured directory, and then clean up any old
|
||||
// snapshots in excess of the retention limits.
|
||||
func (e *ETCD) snapshot(ctx context.Context) {
|
||||
snapshotTime := time.Now()
|
||||
logrus.Infof("Snapshot retention check")
|
||||
status, err := e.client.Status(ctx, endpoint)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to check etcd status for snapshot: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if status.IsLearner {
|
||||
logrus.Warnf("Skipping snapshot: not supported for learner")
|
||||
return
|
||||
}
|
||||
|
||||
snapshotDir, err := snapshotDir(e.config)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to get the snapshot dir: %v", err)
|
||||
return
|
||||
}
|
||||
logrus.Infof("Taking etcd snapshot at %s", snapshotTime.String())
|
||||
sManager := snapshot.NewV3(nil)
|
||||
tlsConfig, err := toTLSConfig(e.runtime)
|
||||
|
||||
cfg, err := getClientConfig(ctx, e.runtime, endpoint)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to get tls config for etcd: %v", err)
|
||||
logrus.Errorf("Failed to get config for etcd snapshot: %v", err)
|
||||
return
|
||||
}
|
||||
etcdConfig := etcd.Config{
|
||||
Context: ctx,
|
||||
Endpoints: []string{endpoint},
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
snapshotPath := filepath.Join(snapshotDir, snapshotPrefix+strconv.Itoa(int(snapshotTime.Unix())))
|
||||
|
||||
if err := sManager.Save(ctx, etcdConfig, snapshotPath); err != nil {
|
||||
logrus.Errorf("Failed to save snapshot %s: %v", snapshotPath, err)
|
||||
snapshotPath := filepath.Join(snapshotDir, snapshotPrefix+strconv.Itoa(int(time.Now().Unix())))
|
||||
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
||||
|
||||
if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil {
|
||||
logrus.Errorf("Failed to save snapshot: %v", err)
|
||||
return
|
||||
}
|
||||
if err := snapshotRetention(e.config.EtcdSnapshotRetention, snapshotDir); err != nil {
|
||||
|
@ -579,9 +623,7 @@ func (e *ETCD) snapshot(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// snapshot performs an ETCD snapshot at the given interval and
|
||||
// saves the file to either the default snapshot directory or
|
||||
// the user provided directory.
|
||||
// setSnapshotFunction schedules snapshots at the configured interval
|
||||
func (e *ETCD) setSnapshotFunction(ctx context.Context) {
|
||||
e.cron.AddFunc(e.config.EtcdSnapshotCron, func() { e.snapshot(ctx) })
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue