mirror of https://github.com/k3s-io/k3s
Move s3 snapshot list functionality to s3.go
Also, don't list ONLY s3 snapshots if S3 is enabled. Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/8624/head
parent
8d47645312
commit
80f909d0ca
|
@ -13,6 +13,7 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -226,6 +227,64 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// listSnapshots provides a list of currently stored
|
||||
// snapshots in S3 along with their relevant
|
||||
// metadata.
|
||||
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||
snapshots := make(map[string]snapshotFile)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
var loo minio.ListObjectsOptions
|
||||
if s.config.EtcdS3Folder != "" {
|
||||
loo = minio.ListObjectsOptions{
|
||||
Prefix: s.config.EtcdS3Folder,
|
||||
Recursive: true,
|
||||
}
|
||||
}
|
||||
|
||||
objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo)
|
||||
|
||||
for obj := range objects {
|
||||
if obj.Err != nil {
|
||||
return nil, obj.Err
|
||||
}
|
||||
if obj.Size == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
filename := path.Base(obj.Key)
|
||||
basename, compressed := strings.CutSuffix(filename, compressedExtension)
|
||||
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
|
||||
if err != nil {
|
||||
ts = obj.LastModified.Unix()
|
||||
}
|
||||
|
||||
sf := snapshotFile{
|
||||
Name: filename,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: time.Unix(ts, 0),
|
||||
},
|
||||
Size: obj.Size,
|
||||
S3: &s3Config{
|
||||
Endpoint: s.config.EtcdS3Endpoint,
|
||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: s.config.EtcdS3BucketName,
|
||||
Region: s.config.EtcdS3Region,
|
||||
Folder: s.config.EtcdS3Folder,
|
||||
Insecure: s.config.EtcdS3Insecure,
|
||||
},
|
||||
Status: successfulSnapshotStatus,
|
||||
Compressed: compressed,
|
||||
}
|
||||
sfKey := generateSnapshotConfigMapKey(sf)
|
||||
snapshots[sfKey] = sf
|
||||
}
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
func readS3EndpointCA(endpointCA string) ([]byte, error) {
|
||||
ca, err := base64.StdEncoding.DecodeString(endpointCA)
|
||||
if err != nil {
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
|
@ -436,71 +435,6 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
|
|||
return snapshots, nil
|
||||
}
|
||||
|
||||
// listS3Snapshots provides a list of currently stored
|
||||
// snapshots in S3 along with their relevant
|
||||
// metadata.
|
||||
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||
snapshots := make(map[string]snapshotFile)
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var loo minio.ListObjectsOptions
|
||||
if e.config.EtcdS3Folder != "" {
|
||||
loo = minio.ListObjectsOptions{
|
||||
Prefix: e.config.EtcdS3Folder,
|
||||
Recursive: true,
|
||||
}
|
||||
}
|
||||
|
||||
objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, loo)
|
||||
|
||||
for obj := range objects {
|
||||
if obj.Err != nil {
|
||||
return nil, obj.Err
|
||||
}
|
||||
if obj.Size == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
filename := path.Base(obj.Key)
|
||||
basename, compressed := strings.CutSuffix(filename, compressedExtension)
|
||||
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
|
||||
if err != nil {
|
||||
ts = obj.LastModified.Unix()
|
||||
}
|
||||
|
||||
sf := snapshotFile{
|
||||
Name: filename,
|
||||
NodeName: "s3",
|
||||
CreatedAt: &metav1.Time{
|
||||
Time: time.Unix(ts, 0),
|
||||
},
|
||||
Size: obj.Size,
|
||||
S3: &s3Config{
|
||||
Endpoint: e.config.EtcdS3Endpoint,
|
||||
EndpointCA: e.config.EtcdS3EndpointCA,
|
||||
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
|
||||
Bucket: e.config.EtcdS3BucketName,
|
||||
Region: e.config.EtcdS3Region,
|
||||
Folder: e.config.EtcdS3Folder,
|
||||
Insecure: e.config.EtcdS3Insecure,
|
||||
},
|
||||
Status: successfulSnapshotStatus,
|
||||
Compressed: compressed,
|
||||
}
|
||||
sfKey := generateSnapshotConfigMapKey(sf)
|
||||
snapshots[sfKey] = sf
|
||||
}
|
||||
}
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
// initS3IfNil initializes the S3 client
|
||||
// if it hasn't yet been initialized.
|
||||
func (e *ETCD) initS3IfNil(ctx context.Context) error {
|
||||
|
@ -535,17 +469,33 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return e.ReconcileSnapshotData(ctx)
|
||||
}
|
||||
|
||||
// ListSnapshots is an exported wrapper method that wraps an
|
||||
// unexported method of the same name.
|
||||
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||
snapshotFiles := map[string]snapshotFile{}
|
||||
if e.config.EtcdS3 {
|
||||
return e.listS3Snapshots(ctx)
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sfs, err := e.s3.listSnapshots(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
snapshotFiles = sfs
|
||||
}
|
||||
return e.listLocalSnapshots()
|
||||
|
||||
sfs, err := e.listLocalSnapshots()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, sf := range sfs {
|
||||
snapshotFiles[k] = sf
|
||||
}
|
||||
|
||||
return snapshotFiles, err
|
||||
}
|
||||
|
||||
// deleteSnapshots removes the given snapshots from
|
||||
|
@ -785,7 +735,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
|||
var s3ListSuccessful bool
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil {
|
||||
if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil {
|
||||
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
|
||||
} else {
|
||||
for k, v := range s3Snapshots {
|
||||
|
|
Loading…
Reference in New Issue