mirror of https://github.com/k3s-io/k3s
Move s3 snapshot list functionality to s3.go
Also, don't list ONLY s3 snapshots if S3 is enabled.
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 80f909d0ca
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/8644/head
parent
514bcade78
commit
db4ee1b2ae
|
@ -13,6 +13,7 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -226,6 +227,64 @@ func (s *S3) snapshotRetention(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// listSnapshots provides a list of currently stored
|
||||||
|
// snapshots in S3 along with their relevant
|
||||||
|
// metadata.
|
||||||
|
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||||
|
snapshots := make(map[string]snapshotFile)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var loo minio.ListObjectsOptions
|
||||||
|
if s.config.EtcdS3Folder != "" {
|
||||||
|
loo = minio.ListObjectsOptions{
|
||||||
|
Prefix: s.config.EtcdS3Folder,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo)
|
||||||
|
|
||||||
|
for obj := range objects {
|
||||||
|
if obj.Err != nil {
|
||||||
|
return nil, obj.Err
|
||||||
|
}
|
||||||
|
if obj.Size == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := path.Base(obj.Key)
|
||||||
|
basename, compressed := strings.CutSuffix(filename, compressedExtension)
|
||||||
|
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
ts = obj.LastModified.Unix()
|
||||||
|
}
|
||||||
|
|
||||||
|
sf := snapshotFile{
|
||||||
|
Name: filename,
|
||||||
|
NodeName: "s3",
|
||||||
|
CreatedAt: &metav1.Time{
|
||||||
|
Time: time.Unix(ts, 0),
|
||||||
|
},
|
||||||
|
Size: obj.Size,
|
||||||
|
S3: &s3Config{
|
||||||
|
Endpoint: s.config.EtcdS3Endpoint,
|
||||||
|
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||||
|
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify,
|
||||||
|
Bucket: s.config.EtcdS3BucketName,
|
||||||
|
Region: s.config.EtcdS3Region,
|
||||||
|
Folder: s.config.EtcdS3Folder,
|
||||||
|
Insecure: s.config.EtcdS3Insecure,
|
||||||
|
},
|
||||||
|
Status: successfulSnapshotStatus,
|
||||||
|
Compressed: compressed,
|
||||||
|
}
|
||||||
|
sfKey := generateSnapshotConfigMapKey(sf)
|
||||||
|
snapshots[sfKey] = sf
|
||||||
|
}
|
||||||
|
return snapshots, nil
|
||||||
|
}
|
||||||
|
|
||||||
func readS3EndpointCA(endpointCA string) ([]byte, error) {
|
func readS3EndpointCA(endpointCA string) ([]byte, error) {
|
||||||
ca, err := base64.StdEncoding.DecodeString(endpointCA)
|
ca, err := base64.StdEncoding.DecodeString(endpointCA)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -436,71 +435,6 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
|
||||||
return snapshots, nil
|
return snapshots, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// listS3Snapshots provides a list of currently stored
|
|
||||||
// snapshots in S3 along with their relevant
|
|
||||||
// metadata.
|
|
||||||
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
|
||||||
snapshots := make(map[string]snapshotFile)
|
|
||||||
|
|
||||||
if e.config.EtcdS3 {
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
if err := e.initS3IfNil(ctx); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var loo minio.ListObjectsOptions
|
|
||||||
if e.config.EtcdS3Folder != "" {
|
|
||||||
loo = minio.ListObjectsOptions{
|
|
||||||
Prefix: e.config.EtcdS3Folder,
|
|
||||||
Recursive: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, loo)
|
|
||||||
|
|
||||||
for obj := range objects {
|
|
||||||
if obj.Err != nil {
|
|
||||||
return nil, obj.Err
|
|
||||||
}
|
|
||||||
if obj.Size == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
filename := path.Base(obj.Key)
|
|
||||||
basename, compressed := strings.CutSuffix(filename, compressedExtension)
|
|
||||||
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
ts = obj.LastModified.Unix()
|
|
||||||
}
|
|
||||||
|
|
||||||
sf := snapshotFile{
|
|
||||||
Name: filename,
|
|
||||||
NodeName: "s3",
|
|
||||||
CreatedAt: &metav1.Time{
|
|
||||||
Time: time.Unix(ts, 0),
|
|
||||||
},
|
|
||||||
Size: obj.Size,
|
|
||||||
S3: &s3Config{
|
|
||||||
Endpoint: e.config.EtcdS3Endpoint,
|
|
||||||
EndpointCA: e.config.EtcdS3EndpointCA,
|
|
||||||
SkipSSLVerify: e.config.EtcdS3SkipSSLVerify,
|
|
||||||
Bucket: e.config.EtcdS3BucketName,
|
|
||||||
Region: e.config.EtcdS3Region,
|
|
||||||
Folder: e.config.EtcdS3Folder,
|
|
||||||
Insecure: e.config.EtcdS3Insecure,
|
|
||||||
},
|
|
||||||
Status: successfulSnapshotStatus,
|
|
||||||
Compressed: compressed,
|
|
||||||
}
|
|
||||||
sfKey := generateSnapshotConfigMapKey(sf)
|
|
||||||
snapshots[sfKey] = sf
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return snapshots, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// initS3IfNil initializes the S3 client
|
// initS3IfNil initializes the S3 client
|
||||||
// if it hasn't yet been initialized.
|
// if it hasn't yet been initialized.
|
||||||
func (e *ETCD) initS3IfNil(ctx context.Context) error {
|
func (e *ETCD) initS3IfNil(ctx context.Context) error {
|
||||||
|
@ -535,17 +469,33 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.ReconcileSnapshotData(ctx)
|
return e.ReconcileSnapshotData(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListSnapshots is an exported wrapper method that wraps an
|
// ListSnapshots is an exported wrapper method that wraps an
|
||||||
// unexported method of the same name.
|
// unexported method of the same name.
|
||||||
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||||
|
snapshotFiles := map[string]snapshotFile{}
|
||||||
if e.config.EtcdS3 {
|
if e.config.EtcdS3 {
|
||||||
return e.listS3Snapshots(ctx)
|
if err := e.initS3IfNil(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sfs, err := e.s3.listSnapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
snapshotFiles = sfs
|
||||||
}
|
}
|
||||||
return e.listLocalSnapshots()
|
|
||||||
|
sfs, err := e.listLocalSnapshots()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for k, sf := range sfs {
|
||||||
|
snapshotFiles[k] = sf
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshotFiles, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteSnapshots removes the given snapshots from
|
// deleteSnapshots removes the given snapshots from
|
||||||
|
@ -785,7 +735,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
var s3ListSuccessful bool
|
var s3ListSuccessful bool
|
||||||
|
|
||||||
if e.config.EtcdS3 {
|
if e.config.EtcdS3 {
|
||||||
if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil {
|
if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil {
|
||||||
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
|
logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for k, v := range s3Snapshots {
|
for k, v := range s3Snapshots {
|
||||||
|
|
Loading…
Reference in New Issue