diff --git a/go.mod b/go.mod index 77b31277e1..46d9b30523 100644 --- a/go.mod +++ b/go.mod @@ -133,6 +133,7 @@ require ( go.uber.org/zap v1.19.0 golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 golang.org/x/net v0.0.0-20211216030914-fe4d6282115f + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220412211240-33da011f77ad google.golang.org/grpc v1.45.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/pkg/cli/cmds/etcd_snapshot.go b/pkg/cli/cmds/etcd_snapshot.go index 23ace5c546..54d94548ef 100644 --- a/pkg/cli/cmds/etcd_snapshot.go +++ b/pkg/cli/cmds/etcd_snapshot.go @@ -95,7 +95,7 @@ var EtcdSnapshotFlags = []cli.Flag{ Name: "s3-timeout,etcd-s3-timeout", Usage: "(db) S3 timeout", Destination: &ServerConfig.EtcdS3Timeout, - Value: 30 * time.Second, + Value: 5 * time.Minute, }, } diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 04b429feb2..2f845cb240 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -374,7 +374,7 @@ var ServerFlags = []cli.Flag{ Name: "etcd-s3-timeout", Usage: "(db) S3 timeout", Destination: &ServerConfig.EtcdS3Timeout, - Value: 30 * time.Second, + Value: 5 * time.Minute, }, cli.StringFlag{ Name: "default-local-storage-path", diff --git a/pkg/cli/server/server.go b/pkg/cli/server/server.go index 25134e569d..6e5f8fdfb0 100644 --- a/pkg/cli/server/server.go +++ b/pkg/cli/server/server.go @@ -151,6 +151,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont serverConfig.ControlConfig.EtcdDisableSnapshots = cfg.EtcdDisableSnapshots if !cfg.EtcdDisableSnapshots { + serverConfig.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName serverConfig.ControlConfig.EtcdSnapshotCron = cfg.EtcdSnapshotCron serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 4ea98466b2..09c43e3ca7 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -44,6 +44,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.uber.org/zap" + "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -63,7 +64,9 @@ const ( defaultKeepAliveTime = 30 * time.Second defaultKeepAliveTimeout = 10 * time.Second - maxBackupRetention = 5 + maxBackupRetention = 5 + maxConcurrentSnapshots = 1 + compressedExtension = ".zip" MasterLabel = "node-role.kubernetes.io/master" ControlPlaneLabel = "node-role.kubernetes.io/control-plane" @@ -90,13 +93,14 @@ var ( type NodeControllerGetter func() controllerv1.NodeController type ETCD struct { - client *clientv3.Client - config *config.Control - name string - address string - cron *cron.Cron - s3 *S3 - cancel context.CancelFunc + client *clientv3.Client + config *config.Control + name string + address string + cron *cron.Cron + s3 *S3 + cancel context.CancelFunc + snapshotSem *semaphore.Weighted } type learnerProgress struct { @@ -1162,6 +1166,9 @@ func snapshotDir(config *config.Control, create bool) (string, error) { // to perform an Etcd snapshot. This is necessary primarily for on-demand // snapshots since they're performed before normal Etcd setup is completed. func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) error { + if e.snapshotSem == nil { + e.snapshotSem = semaphore.NewWeighted(maxConcurrentSnapshots) + } if e.client == nil { if e.config == nil { e.config = config @@ -1180,8 +1187,6 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err return nil } -const compressedExtension = ".zip" - // compressSnapshot compresses the given snapshot and provides the // caller with the path to the file. func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string) (string, error) { @@ -1274,6 +1279,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { if err := e.preSnapshotSetup(ctx, config); err != nil { return err } + if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) { + return fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots) + } + defer e.snapshotSem.Release(maxConcurrentSnapshots) // make sure the core.Factory is initialized before attempting to add snapshot metadata var extraMetadata string diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 573809f131..86a30da8a4 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -106,9 +106,11 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() - opts := minio.PutObjectOptions{ - ContentType: "application/zip", - NumThreads: 2, + opts := minio.PutObjectOptions{NumThreads: 2} + if strings.HasSuffix(snapshot, compressedExtension) { + opts.ContentType = "application/zip" + } else { + opts.ContentType = "application/octet-stream" } uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts) if err != nil {