Address issues with etcd snapshots

* Increase the default snapshot timeout. The timeout is not currently
  configurable from Rancher, and larger clusters are frequently seeing
  uploads fail at 30 seconds.
* Enable compression for scheduled snapshots if enabled on the
  command-line. The CLI flag was not being passed into the etcd config.
* Only set the S3 content-type to application/zip if the file is zipped.
* Don't run more than one snapshot at once, to prevent misconfigured
  etcd snapshot cron schedules from stacking up.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/5872/head
Brad Davidson 2022-07-08 18:27:05 -07:00 committed by Brad Davidson
parent abd7770ce6
commit 4dfa077ff3
6 changed files with 28 additions and 15 deletions

1
go.mod
View File

@ -133,6 +133,7 @@ require (
go.uber.org/zap v1.19.0
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
google.golang.org/grpc v1.45.0
gopkg.in/yaml.v2 v2.4.0

View File

@ -95,7 +95,7 @@ var EtcdSnapshotFlags = []cli.Flag{
Name: "s3-timeout,etcd-s3-timeout",
Usage: "(db) S3 timeout",
Destination: &ServerConfig.EtcdS3Timeout,
Value: 30 * time.Second,
Value: 5 * time.Minute,
},
}

View File

@ -374,7 +374,7 @@ var ServerFlags = []cli.Flag{
Name: "etcd-s3-timeout",
Usage: "(db) S3 timeout",
Destination: &ServerConfig.EtcdS3Timeout,
Value: 30 * time.Second,
Value: 5 * time.Minute,
},
cli.StringFlag{
Name: "default-local-storage-path",

View File

@ -151,6 +151,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots
if !cfg.EtcdDisableSnapshots {
serverConfig.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron
serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir

View File

@ -44,6 +44,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -63,7 +64,9 @@ const (
defaultKeepAliveTime = 30 * time.Second
defaultKeepAliveTimeout = 10 * time.Second
maxBackupRetention = 5
maxBackupRetention = 5
maxConcurrentSnapshots = 1
compressedExtension = ".zip"
MasterLabel = "node-role.kubernetes.io/master"
ControlPlaneLabel = "node-role.kubernetes.io/control-plane"
@ -90,13 +93,14 @@ var (
type NodeControllerGetter func() controllerv1.NodeController
type ETCD struct {
client *clientv3.Client
config *config.Control
name string
address string
cron *cron.Cron
s3 *S3
cancel context.CancelFunc
client *clientv3.Client
config *config.Control
name string
address string
cron *cron.Cron
s3 *S3
cancel context.CancelFunc
snapshotSem *semaphore.Weighted
}
type learnerProgress struct {
@ -1162,6 +1166,9 @@ func snapshotDir(config *config.Control, create bool) (string, error) {
// to perform an Etcd snapshot. This is necessary primarily for on-demand
// snapshots since they're performed before normal Etcd setup is completed.
func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) error {
if e.snapshotSem == nil {
e.snapshotSem = semaphore.NewWeighted(maxConcurrentSnapshots)
}
if e.client == nil {
if e.config == nil {
e.config = config
@ -1180,8 +1187,6 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err
return nil
}
const compressedExtension = ".zip"
// compressSnapshot compresses the given snapshot and provides the
// caller with the path to the file.
func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string) (string, error) {
@ -1274,6 +1279,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
if err := e.preSnapshotSetup(ctx, config); err != nil {
return err
}
if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) {
return fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots)
}
defer e.snapshotSem.Release(maxConcurrentSnapshots)
// make sure the core.Factory is initialized before attempting to add snapshot metadata
var extraMetadata string

View File

@ -106,9 +106,11 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
defer cancel()
opts := minio.PutObjectOptions{
ContentType: "application/zip",
NumThreads: 2,
opts := minio.PutObjectOptions{NumThreads: 2}
if strings.HasSuffix(snapshot, compressedExtension) {
opts.ContentType = "application/zip"
} else {
opts.ContentType = "application/octet-stream"
}
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
if err != nil {