From a89645dfb4e9d1ca3575530b1c8305a7b58eb7f5 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Fri, 29 Sep 2023 08:42:34 +0000 Subject: [PATCH] Tidy s3 upload functions Consistently refer to object keys as such, simplify error handling. Signed-off-by: Brad Davidson (cherry picked from commit f1afe153a39c788f1b549a3bb9ac9f06ed136e1a) Signed-off-by: Brad Davidson --- pkg/etcd/s3.go | 110 +++++++++++++------------------------------ pkg/etcd/snapshot.go | 29 +++++------- 2 files changed, 45 insertions(+), 94 deletions(-) diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 0bd09a2c24..dcd061d93c 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "os" + "path" "path/filepath" "sort" "strings" @@ -96,89 +97,56 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) { logrus.Infof("Uploading snapshot %s to S3", snapshot) basename := filepath.Base(snapshot) - var snapshotFileName string - var sf snapshotFile - if s.config.EtcdS3Folder != "" { - snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename) - } else { - snapshotFileName = basename + sf := &snapshotFile{ + Name: basename, + NodeName: "s3", + CreatedAt: &metav1.Time{}, + S3: &s3Config{ + Endpoint: s.config.EtcdS3Endpoint, + EndpointCA: s.config.EtcdS3EndpointCA, + SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, + Bucket: s.config.EtcdS3BucketName, + Region: s.config.EtcdS3Region, + Folder: s.config.EtcdS3Folder, + Insecure: s.config.EtcdS3Insecure, + }, + metadataSource: extraMetadata, } + snapshotKey := path.Join(s.config.EtcdS3Folder, basename) + toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() opts := minio.PutObjectOptions{NumThreads: 2} if strings.HasSuffix(snapshot, compressedExtension) { opts.ContentType = "application/zip" + sf.Compressed = true } else { opts.ContentType = "application/octet-stream" } - uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts) + uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, snapshot, opts) if err != nil { - sf = snapshotFile{ - Name: filepath.Base(uploadInfo.Key), - NodeName: "s3", - CreatedAt: &metav1.Time{ - Time: now, - }, - Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), - Size: 0, - Status: failedSnapshotStatus, - S3: &s3Config{ - Endpoint: s.config.EtcdS3Endpoint, - EndpointCA: s.config.EtcdS3EndpointCA, - SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, - Bucket: s.config.EtcdS3BucketName, - Region: s.config.EtcdS3Region, - Folder: s.config.EtcdS3Folder, - Insecure: s.config.EtcdS3Insecure, - }, - metadataSource: extraMetadata, - } - logrus.Errorf("Error received during snapshot upload to S3: %s", err) + sf.CreatedAt.Time = now + sf.Status = failedSnapshotStatus + sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) } else { - ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339)) - if err != nil { - return nil, err - } - - sf = snapshotFile{ - Name: filepath.Base(uploadInfo.Key), - NodeName: "s3", - CreatedAt: &metav1.Time{ - Time: ca, - }, - Size: uploadInfo.Size, - Status: successfulSnapshotStatus, - S3: &s3Config{ - Endpoint: s.config.EtcdS3Endpoint, - EndpointCA: s.config.EtcdS3EndpointCA, - SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, - Bucket: s.config.EtcdS3BucketName, - Region: s.config.EtcdS3Region, - Folder: s.config.EtcdS3Folder, - Insecure: s.config.EtcdS3Insecure, - }, - metadataSource: extraMetadata, - } + sf.CreatedAt.Time = uploadInfo.LastModified + sf.Status = successfulSnapshotStatus + sf.Size = uploadInfo.Size } - return &sf, nil + return sf, err } // download downloads the given snapshot from the configured S3 // compatible backend. func (s *S3) Download(ctx context.Context) error { - var remotePath string - if s.config.EtcdS3Folder != "" { - remotePath = filepath.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) - } else { - remotePath = s.config.ClusterResetRestorePath - } + snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) - logrus.Debugf("retrieving snapshot: %s", remotePath) + logrus.Debugf("retrieving snapshot: %s", snapshotKey) toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() - r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{}) + r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, minio.GetObjectOptions{}) if err != nil { return nil } @@ -213,14 +181,7 @@ func (s *S3) Download(ctx context.Context) error { // snapshotPrefix returns the prefix used in the // naming of the snapshots. func (s *S3) snapshotPrefix() string { - fullSnapshotPrefix := s.config.EtcdSnapshotName - var prefix string - if s.config.EtcdS3Folder != "" { - prefix = filepath.Join(s.config.EtcdS3Folder, fullSnapshotPrefix) - } else { - prefix = fullSnapshotPrefix - } - return prefix + return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName) } // snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node. @@ -250,15 +211,12 @@ func (s *S3) snapshotRetention(ctx context.Context) error { return nil } - sort.Slice(snapshotFiles, func(firstSnapshot, secondSnapshot int) bool { - // it takes the key from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date - firstSnapshotName, secondSnapshotName := strings.Split(snapshotFiles[firstSnapshot].Key, "-"), strings.Split(snapshotFiles[secondSnapshot].Key, "-") - firstSnapshotDate, secondSnapshotDate := firstSnapshotName[len(firstSnapshotName)-1], secondSnapshotName[len(secondSnapshotName)-1] - return firstSnapshotDate < secondSnapshotDate + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified) }) - delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention - for _, df := range snapshotFiles[:delCount] { + for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { logrus.Infof("Removing S3 snapshot: %s", df.Key) if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { return err diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 9c8d210884..ced9f90f5b 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -312,8 +312,6 @@ func (e *ETCD) Snapshot(ctx context.Context) error { if e.config.EtcdS3 { logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) - // Set sf to nil so that we can attempt to now upload the snapshot to S3 if needed - sf = nil if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ @@ -336,21 +334,23 @@ func (e *ETCD) Snapshot(ctx context.Context) error { }, metadataSource: extraMetadata, } - } - // sf should be nil if we were able to successfully initialize the S3 client. - if sf == nil { + } else { + // upload will return a snapshotFile even on error - if there was an + // error, it will be reflected in the status and message. sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now) if err != nil { - return err - } - logrus.Infof("S3 upload complete for %s", snapshotName) - if err := e.s3.snapshotRetention(ctx); err != nil { - return errors.Wrap(err, "failed to apply s3 snapshot retention policy") + logrus.Errorf("Error received during snapshot upload to S3: %s", err) + } else { + logrus.Infof("S3 upload complete for %s", snapshotName) } } if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to save snapshot data to configmap") } + if err := e.s3.snapshotRetention(ctx); err != nil { + logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) + } + } } @@ -463,17 +463,11 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, er if obj.Size == 0 { continue } - - ca, err := time.Parse(time.RFC3339, obj.LastModified.Format(time.RFC3339)) - if err != nil { - return nil, err - } - sf := snapshotFile{ Name: filepath.Base(obj.Key), NodeName: "s3", CreatedAt: &metav1.Time{ - Time: ca, + Time: obj.LastModified, }, Size: obj.Size, S3: &s3Config{ @@ -634,7 +628,6 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { if m, err := json.Marshal(sf.metadataSource.Data); err != nil { logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) } else { - logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m)) sf.Metadata = base64.StdEncoding.EncodeToString(m) } }