From e0222ac1a2ba6f977f42241d94a3c9542f05d46b Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 5 Oct 2023 16:49:22 +0000 Subject: [PATCH] Move snapshot delete into local/s3 functions Signed-off-by: Brad Davidson (cherry picked from commit 5cd4f69bfa4174acc9698df2b3623ff8f9b63a9c) Signed-off-by: Brad Davidson --- pkg/etcd/s3.go | 26 +++- pkg/etcd/snapshot.go | 112 +++++++----------- .../etcdsnapshot/etcdsnapshot_int_test.go | 6 +- 3 files changed, 67 insertions(+), 77 deletions(-) diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 952e98849a..ebe5abaf1d 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -205,7 +205,7 @@ func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) } -// download downloads the given snapshot from the configured S3 +// Download downloads the given snapshot from the configured S3 // compatible backend. func (s *S3) Download(ctx context.Context) error { snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) @@ -297,12 +297,12 @@ func (s *S3) snapshotRetention(ctx context.Context) error { for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key) - if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { + if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { return err } metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key)) - if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil { - if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { + if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil { + if isNotExist(err) { return nil } return err @@ -312,13 +312,29 @@ func (s *S3) snapshotRetention(ctx context.Context) error { return nil } +func (s *S3) deleteSnapshot(ctx context.Context, key string) error { + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + + key = path.Join(s.config.EtcdS3Folder, key) + err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, key, minio.RemoveObjectOptions{}) + if err == nil || isNotExist(err) { + metadataKey := path.Join(path.Dir(key), metadataDir, path.Base(key)) + if merr := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !isNotExist(merr) { + err = merr + } + } + + return err +} + // listSnapshots provides a list of currently stored // snapshots in S3 along with their relevant // metadata. func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) { snapshots := map[string]snapshotFile{} metadatas := []string{} - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() opts := minio.ListObjectsOptions{ diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 82fc553ad0..3b436e26ac 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math/rand" + "net/http" "os" "path/filepath" "runtime" @@ -516,94 +517,60 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro return snapshotFiles, err } -// deleteSnapshots removes the given snapshots from -// either local storage or S3. +// DeleteSnapshots removes the given snapshots from local storage and S3. func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { snapshotDir, err := snapshotDir(e.config, false) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } - - logrus.Info("Removing the given locally stored etcd snapshot(s)") - logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots) - - for _, s := range snapshots { - // check if the given snapshot exists. If it does, - // remove it, otherwise continue. - sf := filepath.Join(snapshotDir, s) - if _, err := os.Stat(sf); os.IsNotExist(err) { - logrus.Infof("Snapshot %s, does not exist", s) - continue - } - if err := os.Remove(sf); err != nil { + if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { return err } - logrus.Debug("Removed snapshot ", s) } - if e.config.EtcdS3 { - if e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client: %v", err) - return err + for _, s := range snapshots { + if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil { + if isNotExist(err) { + logrus.Infof("Snapshot %s not found locally", s) + } else { + logrus.Errorf("Failed to delete local snapshot %s: %v", s, err) + } + } else { + logrus.Infof("Snapshot %s deleted locally", s) } - logrus.Info("Removing the given etcd snapshot(s) from S3") - logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots) - objectsCh := make(chan minio.ObjectInfo) - - ctx, cancel := context.WithTimeout(ctx, e.config.EtcdS3Timeout) - defer cancel() - - go func() { - defer close(objectsCh) - - opts := minio.ListObjectsOptions{ - Recursive: true, - } - - for obj := range e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, opts) { - if obj.Err != nil { - logrus.Errorf("Failed to list snapshots from S3: %v", obj.Err) - return - } - - // iterate through the given snapshots and only - // add them to the channel for remove if they're - // actually found from the bucket listing. - for _, snapshot := range snapshots { - if snapshot == obj.Key { - objectsCh <- obj - } + if e.config.EtcdS3 { + if err := e.s3.deleteSnapshot(s); err != nil { + if isNotExist(err) { + logrus.Infof("Snapshot %s not found in S3", s) + } else { + logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err) } + } else { + logrus.Infof("Snapshot %s deleted from S3", s) } - }() - - err = func() error { - for { - select { - case <-ctx.Done(): - logrus.Errorf("Unable to delete snapshot: %v", ctx.Err()) - return e.ReconcileSnapshotData(ctx) - case <-time.After(time.Millisecond * 100): - continue - case err, ok := <-e.s3.client.RemoveObjects(ctx, e.config.EtcdS3BucketName, objectsCh, minio.RemoveObjectsOptions{}): - if err.Err != nil { - logrus.Errorf("Unable to delete snapshot: %v", err.Err) - } - if !ok { - return e.ReconcileSnapshotData(ctx) - } - } - } - }() - if err != nil { - return err } } return e.ReconcileSnapshotData(ctx) } +func (e *ETCD) deleteSnapshot(snapshotPath string) error { + dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir) + filename := filepath.Base(snapshotPath) + metadataPath := filepath.Join(dir, filename) + + err := os.Remove(snapshotPath) + if err == nil || os.IsNotExist(err) { + if merr := os.Remove(metadataPath); err != nil && !isNotExist(err) { + err = merr + } + } + + return err +} + func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { if sf.metadataSource != nil { if m, err := json.Marshal(sf.metadataSource.Data); err != nil { @@ -947,6 +914,13 @@ func isTooLargeError(err error) bool { return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) } +func isNotExist(err error) bool { + if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) { + return true + } + return false +} + // saveSnapshotMetadata writes extra metadata to disk. // The upload is silently skipped if no extra metadata is provided. func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error { diff --git a/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go b/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go index 3fe9f4152b..ee5ec6b049 100644 --- a/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go +++ b/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go @@ -58,7 +58,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) snapshotName := reg.FindString(lsResult) Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Removing the given locally stored etcd snapshot")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) }) }) When("saving a custom name", func() { @@ -73,7 +73,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) snapshotName := reg.FindString(lsResult) Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Removing the given locally stored etcd snapshot")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) }) }) When("using etcd snapshot prune", func() { @@ -113,7 +113,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) for _, snapshotName := range reg.FindAllString(lsResult, -1) { Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Removing the given locally stored etcd snapshot")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) } }) })