mirror of https://github.com/k3s-io/k3s
Tidy s3 upload functions
Consistently refer to object keys as such, simplify error handling.
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit f1afe153a3
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/8644/head
parent
a8d117dc42
commit
a89645dfb4
110
pkg/etcd/s3.go
110
pkg/etcd/s3.go
|
@ -10,6 +10,7 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
|
@ -96,89 +97,56 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
|
|||
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) {
|
||||
logrus.Infof("Uploading snapshot %s to S3", snapshot)
|
||||
basename := filepath.Base(snapshot)
|
||||
var snapshotFileName string
|
||||
var sf snapshotFile
|
||||
if s.config.EtcdS3Folder != "" {
|
||||
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename)
|
||||
} else {
|
||||
snapshotFileName = basename
|
||||
sf := &snapshotFile{
|
||||
Name: basename,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{},
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
metadataSource: extraMetadata,
|
||||
}
|
||||
|
||||
snapshotKey := path.Join(s.config.EtcdS3Folder, basename)
|
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
|
||||
defer cancel()
|
||||
opts := minio.PutObjectOptions{NumThreads: 2}
|
||||
if strings.HasSuffix(snapshot, compressedExtension) {
|
||||
opts.ContentType = "application/zip"
|
||||
sf.Compressed = true
|
||||
} else {
|
||||
opts.ContentType = "application/octet-stream"
|
||||
}
|
||||
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
|
||||
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, snapshot, opts)
|
||||
if err != nil {
|
||||
sf = snapshotFile{
|
||||
Name: filepath.Base(uploadInfo.Key),
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: now,
|
||||
},
|
||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||
Size: 0,
|
||||
Status: failedSnapshotStatus,
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
metadataSource: extraMetadata,
|
||||
}
|
||||
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
|
||||
sf.CreatedAt.Time = now
|
||||
sf.Status = failedSnapshotStatus
|
||||
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
|
||||
} else {
|
||||
ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sf = snapshotFile{
|
||||
Name: filepath.Base(uploadInfo.Key),
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: ca,
|
||||
},
|
||||
Size: uploadInfo.Size,
|
||||
Status: successfulSnapshotStatus,
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
metadataSource: extraMetadata,
|
||||
}
|
||||
sf.CreatedAt.Time = uploadInfo.LastModified
|
||||
sf.Status = successfulSnapshotStatus
|
||||
sf.Size = uploadInfo.Size
|
||||
}
|
||||
return &sf, nil
|
||||
return sf, err
|
||||
}
|
||||
|
||||
// download downloads the given snapshot from the configured S3
|
||||
// compatible backend.
|
||||
func (s *S3) Download(ctx context.Context) error {
|
||||
var remotePath string
|
||||
if s.config.EtcdS3Folder != "" {
|
||||
remotePath = filepath.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
|
||||
} else {
|
||||
remotePath = s.config.ClusterResetRestorePath
|
||||
}
|
||||
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath)
|
||||
|
||||
logrus.Debugf("retrieving snapshot: %s", remotePath)
|
||||
logrus.Debugf("retrieving snapshot: %s", snapshotKey)
|
||||
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{})
|
||||
r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -213,14 +181,7 @@ func (s *S3) Download(ctx context.Context) error {
|
|||
// snapshotPrefix returns the prefix used in the
|
||||
// naming of the snapshots.
|
||||
func (s *S3) snapshotPrefix() string {
|
||||
fullSnapshotPrefix := s.config.EtcdSnapshotName
|
||||
var prefix string
|
||||
if s.config.EtcdS3Folder != "" {
|
||||
prefix = filepath.Join(s.config.EtcdS3Folder, fullSnapshotPrefix)
|
||||
} else {
|
||||
prefix = fullSnapshotPrefix
|
||||
}
|
||||
return prefix
|
||||
return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName)
|
||||
}
|
||||
|
||||
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
|
||||
|
@ -250,15 +211,12 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(snapshotFiles, func(firstSnapshot, secondSnapshot int) bool {
|
||||
// it takes the key from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date
|
||||
firstSnapshotName, secondSnapshotName := strings.Split(snapshotFiles[firstSnapshot].Key, "-"), strings.Split(snapshotFiles[secondSnapshot].Key, "-")
|
||||
firstSnapshotDate, secondSnapshotDate := firstSnapshotName[len(firstSnapshotName)-1], secondSnapshotName[len(secondSnapshotName)-1]
|
||||
return firstSnapshotDate < secondSnapshotDate
|
||||
// sort newest-first so we can prune entries past the retention count
|
||||
sort.Slice(snapshotFiles, func(i, j int) bool {
|
||||
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
|
||||
})
|
||||
|
||||
delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention
|
||||
for _, df := range snapshotFiles[:delCount] {
|
||||
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] {
|
||||
logrus.Infof("Removing S3 snapshot: %s", df.Key)
|
||||
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil {
|
||||
return err
|
||||
|
|
|
@ -312,8 +312,6 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
|
|||
|
||||
if e.config.EtcdS3 {
|
||||
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
|
||||
// Set sf to nil so that we can attempt to now upload the snapshot to S3 if needed
|
||||
sf = nil
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
||||
sf = &snapshotFile{
|
||||
|
@ -336,21 +334,23 @@ func (e *ETCD) Snapshot(ctx context.Context) error {
|
|||
},
|
||||
metadataSource: extraMetadata,
|
||||
}
|
||||
}
|
||||
// sf should be nil if we were able to successfully initialize the S3 client.
|
||||
if sf == nil {
|
||||
} else {
|
||||
// upload will return a snapshotFile even on error - if there was an
|
||||
// error, it will be reflected in the status and message.
|
||||
sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("S3 upload complete for %s", snapshotName)
|
||||
if err := e.s3.snapshotRetention(ctx); err != nil {
|
||||
return errors.Wrap(err, "failed to apply s3 snapshot retention policy")
|
||||
logrus.Errorf("Error received during snapshot upload to S3: %s", err)
|
||||
} else {
|
||||
logrus.Infof("S3 upload complete for %s", snapshotName)
|
||||
}
|
||||
}
|
||||
if err := e.addSnapshotData(*sf); err != nil {
|
||||
return errors.Wrap(err, "failed to save snapshot data to configmap")
|
||||
}
|
||||
if err := e.s3.snapshotRetention(ctx); err != nil {
|
||||
logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -463,17 +463,11 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, er
|
|||
if obj.Size == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
ca, err := time.Parse(time.RFC3339, obj.LastModified.Format(time.RFC3339))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sf := snapshotFile{
|
||||
Name: filepath.Base(obj.Key),
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: ca,
|
||||
Time: obj.LastModified,
|
||||
},
|
||||
Size: obj.Size,
|
||||
S3: &s3Config{
|
||||
|
@ -634,7 +628,6 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) {
|
|||
if m, err := json.Marshal(sf.metadataSource.Data); err != nil {
|
||||
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err)
|
||||
} else {
|
||||
logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m))
|
||||
sf.Metadata = base64.StdEncoding.EncodeToString(m)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue