mirror of https://github.com/k3s-io/k3s
Browse Source
* Move snapshot structs and functions into pkg/etcd/snapshot * Move s3 client code and functions into pkg/etcd/s3 * Refactor pkg/etcd to track snapshot and s3 moves * Add support for reading s3 client config from secret * Add minio client cache, since S3 client configuration can now be changed at runtime by modifying the secret, and don't want to have to create a new minio client every time we read config. * Add tests for pkg/etcd/s3 Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/10494/head
Brad Davidson
5 months ago
committed by
Brad Davidson
19 changed files with 3031 additions and 986 deletions
@ -1,494 +0,0 @@
|
||||
package etcd |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/tls" |
||||
"crypto/x509" |
||||
"encoding/base64" |
||||
"encoding/pem" |
||||
"fmt" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/textproto" |
||||
"os" |
||||
"path" |
||||
"path/filepath" |
||||
"sort" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config" |
||||
"github.com/k3s-io/k3s/pkg/util" |
||||
"github.com/k3s-io/k3s/pkg/version" |
||||
"github.com/minio/minio-go/v7" |
||||
"github.com/minio/minio-go/v7/pkg/credentials" |
||||
"github.com/pkg/errors" |
||||
"github.com/sirupsen/logrus" |
||||
v1 "k8s.io/api/core/v1" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/util/wait" |
||||
) |
||||
|
||||
var ( |
||||
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id") |
||||
tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash") |
||||
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name") |
||||
) |
||||
|
||||
// S3 maintains state for S3 functionality.
|
||||
type S3 struct { |
||||
config *config.Control |
||||
client *minio.Client |
||||
clusterID string |
||||
tokenHash string |
||||
nodeName string |
||||
} |
||||
|
||||
// newS3 creates a new value of type s3 pointer with a
|
||||
// copy of the config.Control pointer and initializes
|
||||
// a new Minio client.
|
||||
func NewS3(ctx context.Context, config *config.Control) (*S3, error) { |
||||
if config.EtcdS3BucketName == "" { |
||||
return nil, errors.New("s3 bucket name was not set") |
||||
} |
||||
tr := http.DefaultTransport |
||||
|
||||
switch { |
||||
case config.EtcdS3EndpointCA != "": |
||||
trCA, err := setTransportCA(tr, config.EtcdS3EndpointCA, config.EtcdS3SkipSSLVerify) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
tr = trCA |
||||
case config.EtcdS3 && config.EtcdS3SkipSSLVerify: |
||||
tr.(*http.Transport).TLSClientConfig = &tls.Config{ |
||||
InsecureSkipVerify: config.EtcdS3SkipSSLVerify, |
||||
} |
||||
} |
||||
|
||||
var creds *credentials.Credentials |
||||
if len(config.EtcdS3AccessKey) == 0 && len(config.EtcdS3SecretKey) == 0 { |
||||
creds = credentials.NewIAM("") // for running on ec2 instance
|
||||
} else { |
||||
creds = credentials.NewStaticV4(config.EtcdS3AccessKey, config.EtcdS3SecretKey, "") |
||||
} |
||||
|
||||
opt := minio.Options{ |
||||
Creds: creds, |
||||
Secure: !config.EtcdS3Insecure, |
||||
Region: config.EtcdS3Region, |
||||
Transport: tr, |
||||
BucketLookup: bucketLookupType(config.EtcdS3Endpoint), |
||||
} |
||||
c, err := minio.New(config.EtcdS3Endpoint, &opt) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
logrus.Infof("Checking if S3 bucket %s exists", config.EtcdS3BucketName) |
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, config.EtcdS3Timeout) |
||||
defer cancel() |
||||
|
||||
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", config.EtcdS3BucketName) |
||||
} |
||||
if !exists { |
||||
return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName) |
||||
} |
||||
logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName) |
||||
|
||||
s3 := &S3{ |
||||
config: config, |
||||
client: c, |
||||
nodeName: os.Getenv("NODE_NAME"), |
||||
} |
||||
|
||||
if config.ClusterReset { |
||||
logrus.Debug("Skip setting S3 snapshot cluster ID and token during cluster-reset") |
||||
} else { |
||||
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { |
||||
if config.Runtime.Core == nil { |
||||
return false, nil |
||||
} |
||||
|
||||
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
|
||||
ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}) |
||||
if err != nil { |
||||
return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID") |
||||
} |
||||
s3.clusterID = string(ns.UID) |
||||
|
||||
tokenHash, err := util.GetTokenHash(config) |
||||
if err != nil { |
||||
return false, errors.Wrap(err, "failed to set S3 snapshot server token hash") |
||||
} |
||||
s3.tokenHash = tokenHash |
||||
|
||||
return true, nil |
||||
}); err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
return s3, nil |
||||
} |
||||
|
||||
// upload uploads the given snapshot to the configured S3
|
||||
// compatible backend.
|
||||
func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) { |
||||
basename := filepath.Base(snapshot) |
||||
metadata := filepath.Join(filepath.Dir(snapshot), "..", metadataDir, basename) |
||||
snapshotKey := path.Join(s.config.EtcdS3Folder, basename) |
||||
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, basename) |
||||
|
||||
sf := &snapshotFile{ |
||||
Name: basename, |
||||
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey), |
||||
NodeName: "s3", |
||||
CreatedAt: &metav1.Time{ |
||||
Time: now, |
||||
}, |
||||
S3: &s3Config{ |
||||
Endpoint: s.config.EtcdS3Endpoint, |
||||
EndpointCA: s.config.EtcdS3EndpointCA, |
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, |
||||
Bucket: s.config.EtcdS3BucketName, |
||||
Region: s.config.EtcdS3Region, |
||||
Folder: s.config.EtcdS3Folder, |
||||
Insecure: s.config.EtcdS3Insecure, |
||||
}, |
||||
Compressed: strings.HasSuffix(snapshot, compressedExtension), |
||||
metadataSource: extraMetadata, |
||||
nodeSource: s.nodeName, |
||||
} |
||||
|
||||
logrus.Infof("Uploading snapshot to s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey) |
||||
uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) |
||||
if err != nil { |
||||
sf.Status = failedSnapshotStatus |
||||
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) |
||||
} else { |
||||
sf.Status = successfulSnapshotStatus |
||||
sf.Size = uploadInfo.Size |
||||
sf.tokenHash = s.tokenHash |
||||
} |
||||
if _, err := s.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil { |
||||
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err) |
||||
} else { |
||||
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", s.config.EtcdS3BucketName, metadataKey) |
||||
} |
||||
return sf, err |
||||
} |
||||
|
||||
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
|
||||
func (s *S3) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { |
||||
opts := minio.PutObjectOptions{ |
||||
NumThreads: 2, |
||||
UserMetadata: map[string]string{ |
||||
clusterIDKey: s.clusterID, |
||||
nodeNameKey: s.nodeName, |
||||
tokenHashKey: s.tokenHash, |
||||
}, |
||||
} |
||||
if strings.HasSuffix(key, compressedExtension) { |
||||
opts.ContentType = "application/zip" |
||||
} else { |
||||
opts.ContentType = "application/octet-stream" |
||||
} |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
|
||||
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) |
||||
} |
||||
|
||||
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
|
||||
// The upload is silently skipped if no extra metadata is provided.
|
||||
func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { |
||||
if _, err := os.Stat(path); err != nil { |
||||
if os.IsNotExist(err) { |
||||
return minio.UploadInfo{}, nil |
||||
} |
||||
return minio.UploadInfo{}, err |
||||
} |
||||
|
||||
opts := minio.PutObjectOptions{ |
||||
NumThreads: 2, |
||||
ContentType: "application/json", |
||||
UserMetadata: map[string]string{ |
||||
clusterIDKey: s.clusterID, |
||||
nodeNameKey: s.nodeName, |
||||
}, |
||||
} |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) |
||||
} |
||||
|
||||
// Download downloads the given snapshot from the configured S3
|
||||
// compatible backend.
|
||||
func (s *S3) Download(ctx context.Context) error { |
||||
snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) |
||||
metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, s.config.ClusterResetRestorePath) |
||||
snapshotDir, err := snapshotDir(s.config, true) |
||||
if err != nil { |
||||
return errors.Wrap(err, "failed to get the snapshot dir") |
||||
} |
||||
snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) |
||||
metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath) |
||||
|
||||
if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil { |
||||
return err |
||||
} |
||||
if err := s.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil { |
||||
return err |
||||
} |
||||
|
||||
s.config.ClusterResetRestorePath = snapshotFile |
||||
return nil |
||||
} |
||||
|
||||
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
|
||||
func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error { |
||||
logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, key) |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
defer os.Chmod(file, 0600) |
||||
return s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) |
||||
} |
||||
|
||||
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
|
||||
// No error is returned if the metadata file does not exist, as it is optional.
|
||||
func (s *S3) downloadSnapshotMetadata(ctx context.Context, key, file string) error { |
||||
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, key) |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
defer os.Chmod(file, 0600) |
||||
err := s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) |
||||
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { |
||||
return nil |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// snapshotPrefix returns the prefix used in the
|
||||
// naming of the snapshots.
|
||||
func (s *S3) snapshotPrefix() string { |
||||
return path.Join(s.config.EtcdS3Folder, s.config.EtcdSnapshotName) |
||||
} |
||||
|
||||
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
|
||||
// Returns a list of pruned snapshot names.
|
||||
func (s *S3) snapshotRetention(ctx context.Context) ([]string, error) { |
||||
if s.config.EtcdSnapshotRetention < 1 { |
||||
return nil, nil |
||||
} |
||||
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix()) |
||||
|
||||
var snapshotFiles []minio.ObjectInfo |
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
|
||||
opts := minio.ListObjectsOptions{ |
||||
Prefix: s.snapshotPrefix(), |
||||
Recursive: true, |
||||
} |
||||
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) { |
||||
if info.Err != nil { |
||||
return nil, info.Err |
||||
} |
||||
|
||||
// skip metadata
|
||||
if path.Base(path.Dir(info.Key)) == metadataDir { |
||||
continue |
||||
} |
||||
|
||||
snapshotFiles = append(snapshotFiles, info) |
||||
} |
||||
|
||||
if len(snapshotFiles) <= s.config.EtcdSnapshotRetention { |
||||
return nil, nil |
||||
} |
||||
|
||||
// sort newest-first so we can prune entries past the retention count
|
||||
sort.Slice(snapshotFiles, func(i, j int) bool { |
||||
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified) |
||||
}) |
||||
|
||||
deleted := []string{} |
||||
for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { |
||||
logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key) |
||||
|
||||
key := path.Base(df.Key) |
||||
if err := s.deleteSnapshot(ctx, key); err != nil { |
||||
return deleted, err |
||||
} |
||||
deleted = append(deleted, key) |
||||
} |
||||
|
||||
return deleted, nil |
||||
} |
||||
|
||||
func (s *S3) deleteSnapshot(ctx context.Context, key string) error { |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
|
||||
key = path.Join(s.config.EtcdS3Folder, key) |
||||
err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, key, minio.RemoveObjectOptions{}) |
||||
if err == nil || isNotExist(err) { |
||||
metadataKey := path.Join(path.Dir(key), metadataDir, path.Base(key)) |
||||
if merr := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !isNotExist(merr) { |
||||
err = merr |
||||
} |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
// listSnapshots provides a list of currently stored
|
||||
// snapshots in S3 along with their relevant
|
||||
// metadata.
|
||||
func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) { |
||||
snapshots := map[string]snapshotFile{} |
||||
metadatas := []string{} |
||||
ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
||||
defer cancel() |
||||
|
||||
opts := minio.ListObjectsOptions{ |
||||
Prefix: s.config.EtcdS3Folder, |
||||
Recursive: true, |
||||
} |
||||
|
||||
objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts) |
||||
|
||||
for obj := range objects { |
||||
if obj.Err != nil { |
||||
return nil, obj.Err |
||||
} |
||||
if obj.Size == 0 { |
||||
continue |
||||
} |
||||
|
||||
if o, err := s.client.StatObject(ctx, s.config.EtcdS3BucketName, obj.Key, minio.StatObjectOptions{}); err != nil { |
||||
logrus.Warnf("Failed to get object metadata: %v", err) |
||||
} else { |
||||
obj = o |
||||
} |
||||
|
||||
filename := path.Base(obj.Key) |
||||
if path.Base(path.Dir(obj.Key)) == metadataDir { |
||||
metadatas = append(metadatas, obj.Key) |
||||
continue |
||||
} |
||||
|
||||
basename, compressed := strings.CutSuffix(filename, compressedExtension) |
||||
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) |
||||
if err != nil { |
||||
ts = obj.LastModified.Unix() |
||||
} |
||||
|
||||
sf := snapshotFile{ |
||||
Name: filename, |
||||
Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, obj.Key), |
||||
NodeName: "s3", |
||||
CreatedAt: &metav1.Time{ |
||||
Time: time.Unix(ts, 0), |
||||
}, |
||||
Size: obj.Size, |
||||
S3: &s3Config{ |
||||
Endpoint: s.config.EtcdS3Endpoint, |
||||
EndpointCA: s.config.EtcdS3EndpointCA, |
||||
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, |
||||
Bucket: s.config.EtcdS3BucketName, |
||||
Region: s.config.EtcdS3Region, |
||||
Folder: s.config.EtcdS3Folder, |
||||
Insecure: s.config.EtcdS3Insecure, |
||||
}, |
||||
Status: successfulSnapshotStatus, |
||||
Compressed: compressed, |
||||
nodeSource: obj.UserMetadata[nodeNameKey], |
||||
tokenHash: obj.UserMetadata[tokenHashKey], |
||||
} |
||||
sfKey := generateSnapshotConfigMapKey(sf) |
||||
snapshots[sfKey] = sf |
||||
} |
||||
|
||||
for _, metadataKey := range metadatas { |
||||
filename := path.Base(metadataKey) |
||||
sfKey := generateSnapshotConfigMapKey(snapshotFile{Name: filename, NodeName: "s3"}) |
||||
if sf, ok := snapshots[sfKey]; ok { |
||||
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, metadataKey) |
||||
if obj, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.GetObjectOptions{}); err != nil { |
||||
if isNotExist(err) { |
||||
logrus.Debugf("Failed to get snapshot metadata: %v", err) |
||||
} else { |
||||
logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err) |
||||
} |
||||
} else { |
||||
if m, err := ioutil.ReadAll(obj); err != nil { |
||||
if isNotExist(err) { |
||||
logrus.Debugf("Failed to read snapshot metadata: %v", err) |
||||
} else { |
||||
logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err) |
||||
} |
||||
} else { |
||||
sf.Metadata = base64.StdEncoding.EncodeToString(m) |
||||
snapshots[sfKey] = sf |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
return snapshots, nil |
||||
} |
||||
|
||||
func readS3EndpointCA(endpointCA string) ([]byte, error) { |
||||
ca, err := base64.StdEncoding.DecodeString(endpointCA) |
||||
if err != nil { |
||||
return os.ReadFile(endpointCA) |
||||
} |
||||
return ca, nil |
||||
} |
||||
|
||||
func setTransportCA(tr http.RoundTripper, endpointCA string, insecureSkipVerify bool) (http.RoundTripper, error) { |
||||
ca, err := readS3EndpointCA(endpointCA) |
||||
if err != nil { |
||||
return tr, err |
||||
} |
||||
if !isValidCertificate(ca) { |
||||
return tr, errors.New("endpoint-ca is not a valid x509 certificate") |
||||
} |
||||
|
||||
certPool := x509.NewCertPool() |
||||
certPool.AppendCertsFromPEM(ca) |
||||
|
||||
tr.(*http.Transport).TLSClientConfig = &tls.Config{ |
||||
RootCAs: certPool, |
||||
InsecureSkipVerify: insecureSkipVerify, |
||||
} |
||||
|
||||
return tr, nil |
||||
} |
||||
|
||||
// isValidCertificate checks to see if the given
|
||||
// byte slice is a valid x509 certificate.
|
||||
func isValidCertificate(c []byte) bool { |
||||
p, _ := pem.Decode(c) |
||||
if p == nil { |
||||
return false |
||||
} |
||||
if _, err := x509.ParseCertificates(p.Bytes); err != nil { |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func bucketLookupType(endpoint string) minio.BucketLookupType { |
||||
if strings.Contains(endpoint, "aliyun") { // backwards compt with RKE1
|
||||
return minio.BucketLookupDNS |
||||
} |
||||
return minio.BucketLookupAuto |
||||
} |
@ -0,0 +1,119 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"encoding/base64" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config" |
||||
"github.com/k3s-io/k3s/pkg/util" |
||||
"github.com/sirupsen/logrus" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
) |
||||
|
||||
var ErrNoConfigSecret = errNoConfigSecret() |
||||
|
||||
type secretError struct { |
||||
err error |
||||
} |
||||
|
||||
func (e *secretError) Error() string { |
||||
return fmt.Sprintf("failed to get etcd S3 config secret: %v", e.err) |
||||
} |
||||
|
||||
func (e *secretError) Is(target error) bool { |
||||
switch target { |
||||
case ErrNoConfigSecret: |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func errNoConfigSecret() error { return &secretError{} } |
||||
|
||||
func (c *Controller) getConfigFromSecret(secretName string) (*config.EtcdS3, error) { |
||||
if c.core == nil { |
||||
return nil, &secretError{err: util.ErrCoreNotReady} |
||||
} |
||||
|
||||
secret, err := c.core.V1().Secret().Get(metav1.NamespaceSystem, secretName, metav1.GetOptions{}) |
||||
if err != nil { |
||||
return nil, &secretError{err: err} |
||||
} |
||||
|
||||
etcdS3 := &config.EtcdS3{ |
||||
AccessKey: string(secret.Data["etcd-s3-access-key"]), |
||||
Bucket: string(secret.Data["etcd-s3-bucket"]), |
||||
Endpoint: defaultEtcdS3.Endpoint, |
||||
Folder: string(secret.Data["etcd-s3-folder"]), |
||||
Proxy: string(secret.Data["etcd-s3-proxy"]), |
||||
Region: defaultEtcdS3.Region, |
||||
SecretKey: string(secret.Data["etcd-s3-secret-key"]), |
||||
Timeout: *defaultEtcdS3.Timeout.DeepCopy(), |
||||
} |
||||
|
||||
// Set endpoint from secret if set
|
||||
if v, ok := secret.Data["etcd-s3-endpoint"]; ok { |
||||
etcdS3.Endpoint = string(v) |
||||
} |
||||
|
||||
// Set region from secret if set
|
||||
if v, ok := secret.Data["etcd-s3-region"]; ok { |
||||
etcdS3.Region = string(v) |
||||
} |
||||
|
||||
// Set timeout from secret if set
|
||||
if v, ok := secret.Data["etcd-s3-timeout"]; ok { |
||||
if duration, err := time.ParseDuration(string(v)); err != nil { |
||||
logrus.Warnf("Failed to parse etcd-s3-timeout value from S3 config secret %s: %v", secretName, err) |
||||
} else { |
||||
etcdS3.Timeout.Duration = duration |
||||
} |
||||
} |
||||
|
||||
// configure ssl verification, if value can be parsed
|
||||
if v, ok := secret.Data["etcd-s3-skip-ssl-verify"]; ok { |
||||
if b, err := strconv.ParseBool(string(v)); err != nil { |
||||
logrus.Warnf("Failed to parse etcd-s3-skip-ssl-verify value from S3 config secret %s: %v", secretName, err) |
||||
} else { |
||||
etcdS3.SkipSSLVerify = b |
||||
} |
||||
} |
||||
|
||||
// configure insecure http, if value can be parsed
|
||||
if v, ok := secret.Data["etcd-s3-insecure"]; ok { |
||||
if b, err := strconv.ParseBool(string(v)); err != nil { |
||||
logrus.Warnf("Failed to parse etcd-s3-insecure value from S3 config secret %s: %v", secretName, err) |
||||
} else { |
||||
etcdS3.Insecure = b |
||||
} |
||||
} |
||||
|
||||
// encode CA bundles from value, and keys in configmap if one is named
|
||||
caBundles := []string{} |
||||
// Add inline CA bundle if set
|
||||
if len(secret.Data["etcd-s3-endpoint-ca"]) > 0 { |
||||
caBundles = append(caBundles, base64.StdEncoding.EncodeToString(secret.Data["etcd-s3-endpoint-ca"])) |
||||
} |
||||
|
||||
// Add CA bundles from named configmap if set
|
||||
if caConfigMapName := string(secret.Data["etcd-s3-endpoint-ca-name"]); caConfigMapName != "" { |
||||
configMap, err := c.core.V1().ConfigMap().Get(metav1.NamespaceSystem, caConfigMapName, metav1.GetOptions{}) |
||||
if err != nil { |
||||
logrus.Warnf("Failed to get ConfigMap %s for etcd-s3-endpoint-ca-name value from S3 config secret %s: %v", caConfigMapName, secretName, err) |
||||
} else { |
||||
for _, v := range configMap.Data { |
||||
caBundles = append(caBundles, base64.StdEncoding.EncodeToString([]byte(v))) |
||||
} |
||||
for _, v := range configMap.BinaryData { |
||||
caBundles = append(caBundles, base64.StdEncoding.EncodeToString(v)) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Concatenate all requested CA bundle strings into config var
|
||||
etcdS3.EndpointCA = strings.Join(caBundles, " ") |
||||
return etcdS3, nil |
||||
} |
@ -0,0 +1,567 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/tls" |
||||
"crypto/x509" |
||||
"encoding/base64" |
||||
"fmt" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/textproto" |
||||
"net/url" |
||||
"os" |
||||
"path" |
||||
"path/filepath" |
||||
"reflect" |
||||
"sort" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/k3s-io/k3s/pkg/daemons/config" |
||||
"github.com/k3s-io/k3s/pkg/etcd/snapshot" |
||||
"github.com/k3s-io/k3s/pkg/util" |
||||
"github.com/k3s-io/k3s/pkg/version" |
||||
"github.com/minio/minio-go/v7" |
||||
"github.com/minio/minio-go/v7/pkg/credentials" |
||||
"github.com/pkg/errors" |
||||
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core" |
||||
"github.com/sirupsen/logrus" |
||||
v1 "k8s.io/api/core/v1" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/util/wait" |
||||
"k8s.io/utils/lru" |
||||
) |
||||
|
||||
var ( |
||||
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id") |
||||
tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash") |
||||
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name") |
||||
) |
||||
|
||||
var defaultEtcdS3 = &config.EtcdS3{ |
||||
Endpoint: "s3.amazonaws.com", |
||||
Region: "us-east-1", |
||||
Timeout: metav1.Duration{ |
||||
Duration: 5 * time.Minute, |
||||
}, |
||||
} |
||||
|
||||
var ( |
||||
controller *Controller |
||||
cErr error |
||||
once sync.Once |
||||
) |
||||
|
||||
// Controller maintains state for S3 functionality,
|
||||
// and can be used to get clients for interacting with
|
||||
// an S3 service, given specific client configuration.
|
||||
type Controller struct { |
||||
clusterID string |
||||
tokenHash string |
||||
nodeName string |
||||
core core.Interface |
||||
clientCache *lru.Cache |
||||
} |
||||
|
||||
// Client holds state for a given configuration - a preconfigured minio client,
|
||||
// and reference to the config it was created for.
|
||||
type Client struct { |
||||
mc *minio.Client |
||||
etcdS3 *config.EtcdS3 |
||||
controller *Controller |
||||
} |
||||
|
||||
// Start initializes the cache and sets the cluster id and token hash,
|
||||
// returning a reference to the the initialized controller. Initialization is
|
||||
// locked by a sync.Once to prevent races, and multiple calls to start will
|
||||
// return the same controller or error.
|
||||
func Start(ctx context.Context, config *config.Control) (*Controller, error) { |
||||
once.Do(func() { |
||||
c := &Controller{ |
||||
clientCache: lru.New(5), |
||||
nodeName: os.Getenv("NODE_NAME"), |
||||
} |
||||
|
||||
if config.ClusterReset { |
||||
logrus.Debug("Skip setting S3 snapshot cluster ID and server token hash during cluster-reset") |
||||
controller = c |
||||
} else { |
||||
logrus.Debug("Getting S3 snapshot cluster ID and server token hash") |
||||
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { |
||||
if config.Runtime.Core == nil { |
||||
return false, nil |
||||
} |
||||
c.core = config.Runtime.Core.Core() |
||||
|
||||
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
|
||||
ns, err := c.core.V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}) |
||||
if err != nil { |
||||
return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID") |
||||
} |
||||
c.clusterID = string(ns.UID) |
||||
|
||||
tokenHash, err := util.GetTokenHash(config) |
||||
if err != nil { |
||||
return false, errors.Wrap(err, "failed to set S3 snapshot server token hash") |
||||
} |
||||
c.tokenHash = tokenHash |
||||
|
||||
return true, nil |
||||
}); err != nil { |
||||
cErr = err |
||||
} else { |
||||
controller = c |
||||
} |
||||
} |
||||
}) |
||||
|
||||
return controller, cErr |
||||
} |
||||
|
||||
func (c *Controller) GetClient(ctx context.Context, etcdS3 *config.EtcdS3) (*Client, error) { |
||||
if etcdS3 == nil { |
||||
return nil, errors.New("nil s3 configuration") |
||||
} |
||||
|
||||
// update ConfigSecret in defaults so that comparisons between current and default config
|
||||
// ignore ConfigSecret when deciding if CLI configuration is present.
|
||||
defaultEtcdS3.ConfigSecret = etcdS3.ConfigSecret |
||||
|
||||
// If config is default, try to load config from secret, and fail if it cannot be retrieved or if the secret name is not set.
|
||||
// If config is not default, and secret name is set, warn that the secret is being ignored
|
||||
isDefault := reflect.DeepEqual(defaultEtcdS3, etcdS3) |
||||
if etcdS3.ConfigSecret != "" { |
||||
if isDefault { |
||||
e, err := c.getConfigFromSecret(etcdS3.ConfigSecret) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, "failed to get config from etcd-s3-config-secret %q", etcdS3.ConfigSecret) |
||||
} |
||||
logrus.Infof("Using etcd s3 configuration from etcd-s3-config-secret %q", etcdS3.ConfigSecret) |
||||
etcdS3 = e |
||||
} else { |
||||
logrus.Warnf("Ignoring s3 configuration from etcd-s3-config-secret %q due to existing configuration from CLI or config file", etcdS3.ConfigSecret) |
||||
} |
||||
} else if isDefault { |
||||
return nil, errors.New("s3 configuration was not set") |
||||
} |
||||
|
||||
// used just for logging
|
||||
scheme := "https://" |
||||
if etcdS3.Insecure { |
||||
scheme = "http://" |
||||
} |
||||
|
||||
// Try to get an existing client from cache. The entire EtcdS3 struct
|
||||
// (including the key id and secret) is used as the cache key, but we only
|
||||
// print the endpoint and bucket name to avoid leaking creds into the logs.
|
||||
if client, ok := c.clientCache.Get(*etcdS3); ok { |
||||
logrus.Infof("Reusing cached S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder) |
||||
return client.(*Client), nil |
||||
} |
||||
logrus.Infof("Attempting to create new S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder) |
||||
|
||||
if etcdS3.Bucket == "" { |
||||
return nil, errors.New("s3 bucket name was not set") |
||||
} |
||||
tr := http.DefaultTransport.(*http.Transport).Clone() |
||||
|
||||
// You can either disable SSL verification or use a custom CA bundle,
|
||||
// it doesn't make sense to do both - if verification is disabled,
|
||||
// the CA is not checked!
|
||||
if etcdS3.SkipSSLVerify { |
||||
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} |
||||
} else if etcdS3.EndpointCA != "" { |
||||
tlsConfig, err := loadEndpointCAs(etcdS3.EndpointCA) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
tr.TLSClientConfig = tlsConfig |
||||
} |
||||
|
||||
// Set a fixed proxy URL, if requested by the user. This replaces the default,
|
||||
// which calls ProxyFromEnvironment to read proxy settings from the environment.
|
||||
if etcdS3.Proxy != "" { |
||||
var u *url.URL |
||||
var err error |
||||
// proxy address of literal "none" disables all use of a proxy by S3
|
||||
if etcdS3.Proxy != "none" { |
||||
u, err = url.Parse(etcdS3.Proxy) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "failed to parse etcd-s3-proxy value as URL") |
||||
} |
||||
if u.Scheme == "" || u.Host == "" { |
||||
return nil, fmt.Errorf("proxy URL must include scheme and host") |
||||
} |
||||
} |
||||
tr.Proxy = http.ProxyURL(u) |
||||
} |
||||
|
||||
var creds *credentials.Credentials |
||||
if len(etcdS3.AccessKey) == 0 && len(etcdS3.SecretKey) == 0 { |
||||
creds = credentials.NewIAM("") // for running on ec2 instance
|
||||
if _, err := creds.Get(); err != nil { |
||||
return nil, errors.Wrap(err, "failed to get IAM credentials") |
||||
} |
||||
} else { |
||||
creds = credentials.NewStaticV4(etcdS3.AccessKey, etcdS3.SecretKey, "") |
||||
} |
||||
|
||||
opt := minio.Options{ |
||||
Creds: creds, |
||||
Secure: !etcdS3.Insecure, |
||||
Region: etcdS3.Region, |
||||
Transport: tr, |
||||
BucketLookup: bucketLookupType(etcdS3.Endpoint), |
||||
} |
||||
mc, err := minio.New(etcdS3.Endpoint, &opt) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
logrus.Infof("Checking if S3 bucket %s exists", etcdS3.Bucket) |
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
|
||||
exists, err := mc.BucketExists(ctx, etcdS3.Bucket) |
||||
if err != nil { |
||||
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", etcdS3.Bucket) |
||||
} |
||||
if !exists { |
||||
return nil, fmt.Errorf("bucket %s does not exist", etcdS3.Bucket) |
||||
} |
||||
logrus.Infof("S3 bucket %s exists", etcdS3.Bucket) |
||||
|
||||
client := &Client{ |
||||
mc: mc, |
||||
etcdS3: etcdS3, |
||||
controller: c, |
||||
} |
||||
logrus.Infof("Adding S3 client to cache") |
||||
c.clientCache.Add(*etcdS3, client) |
||||
return client, nil |
||||
} |
||||
|
||||
// upload uploads the given snapshot to the configured S3
|
||||
// compatible backend.
|
||||
func (c *Client) Upload(ctx context.Context, snapshotPath string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshot.File, error) { |
||||
basename := filepath.Base(snapshotPath) |
||||
metadata := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir, basename) |
||||
snapshotKey := path.Join(c.etcdS3.Folder, basename) |
||||
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, basename) |
||||
|
||||
sf := &snapshot.File{ |
||||
Name: basename, |
||||
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, snapshotKey), |
||||
NodeName: "s3", |
||||
CreatedAt: &metav1.Time{ |
||||
Time: now, |
||||
}, |
||||
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3}, |
||||
Compressed: strings.HasSuffix(snapshotPath, snapshot.CompressedExtension), |
||||
MetadataSource: extraMetadata, |
||||
NodeSource: c.controller.nodeName, |
||||
} |
||||
|
||||
logrus.Infof("Uploading snapshot to s3://%s/%s", c.etcdS3.Bucket, snapshotKey) |
||||
uploadInfo, err := c.uploadSnapshot(ctx, snapshotKey, snapshotPath) |
||||
if err != nil { |
||||
sf.Status = snapshot.FailedStatus |
||||
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) |
||||
} else { |
||||
sf.Status = snapshot.SuccessfulStatus |
||||
sf.Size = uploadInfo.Size |
||||
sf.TokenHash = c.controller.tokenHash |
||||
} |
||||
if uploadInfo, err := c.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil { |
||||
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err) |
||||
} else if uploadInfo.Size != 0 { |
||||
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", c.etcdS3.Bucket, metadataKey) |
||||
} |
||||
return sf, err |
||||
} |
||||
|
||||
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
|
||||
func (c *Client) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { |
||||
opts := minio.PutObjectOptions{ |
||||
NumThreads: 2, |
||||
UserMetadata: map[string]string{ |
||||
clusterIDKey: c.controller.clusterID, |
||||
nodeNameKey: c.controller.nodeName, |
||||
tokenHashKey: c.controller.tokenHash, |
||||
}, |
||||
} |
||||
if strings.HasSuffix(key, snapshot.CompressedExtension) { |
||||
opts.ContentType = "application/zip" |
||||
} else { |
||||
opts.ContentType = "application/octet-stream" |
||||
} |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts) |
||||
} |
||||
|
||||
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
|
||||
// The upload is silently skipped if no extra metadata is provided.
|
||||
func (c *Client) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { |
||||
if _, err := os.Stat(path); err != nil { |
||||
if os.IsNotExist(err) { |
||||
return minio.UploadInfo{}, nil |
||||
} |
||||
return minio.UploadInfo{}, err |
||||
} |
||||
|
||||
opts := minio.PutObjectOptions{ |
||||
NumThreads: 2, |
||||
ContentType: "application/json", |
||||
UserMetadata: map[string]string{ |
||||
clusterIDKey: c.controller.clusterID, |
||||
nodeNameKey: c.controller.nodeName, |
||||
tokenHashKey: c.controller.tokenHash, |
||||
}, |
||||
} |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts) |
||||
} |
||||
|
||||
// Download downloads the given snapshot from the configured S3
|
||||
// compatible backend. If the file is successfully downloaded, it returns
|
||||
// the path the file was downloaded to.
|
||||
func (c *Client) Download(ctx context.Context, snapshotName, snapshotDir string) (string, error) { |
||||
snapshotKey := path.Join(c.etcdS3.Folder, snapshotName) |
||||
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, snapshotName) |
||||
snapshotFile := filepath.Join(snapshotDir, snapshotName) |
||||
metadataFile := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, snapshotName) |
||||
|
||||
if err := c.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil { |
||||
return "", err |
||||
} |
||||
if err := c.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil { |
||||
return "", err |
||||
} |
||||
|
||||
return snapshotFile, nil |
||||
} |
||||
|
||||
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
|
||||
func (c *Client) downloadSnapshot(ctx context.Context, key, file string) error { |
||||
logrus.Debugf("Downloading snapshot from s3://%s/%s", c.etcdS3.Bucket, key) |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
defer os.Chmod(file, 0600) |
||||
return c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{}) |
||||
} |
||||
|
||||
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
|
||||
// No error is returned if the metadata file does not exist, as it is optional.
|
||||
func (c *Client) downloadSnapshotMetadata(ctx context.Context, key, file string) error { |
||||
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, key) |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
defer os.Chmod(file, 0600) |
||||
err := c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{}) |
||||
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { |
||||
return nil |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// SnapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
|
||||
// Returns a list of pruned snapshot names.
|
||||
func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix string) ([]string, error) { |
||||
if retention < 1 { |
||||
return nil, nil |
||||
} |
||||
|
||||
prefix = path.Join(c.etcdS3.Folder, prefix) |
||||
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", retention, c.etcdS3.Bucket, prefix) |
||||
|
||||
var snapshotFiles []minio.ObjectInfo |
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
|
||||
opts := minio.ListObjectsOptions{ |
||||
Prefix: prefix, |
||||
Recursive: true, |
||||
} |
||||
for info := range c.mc.ListObjects(toCtx, c.etcdS3.Bucket, opts) { |
||||
if info.Err != nil { |
||||
return nil, info.Err |
||||
} |
||||
|
||||
// skip metadata
|
||||
if path.Base(path.Dir(info.Key)) == snapshot.MetadataDir { |
||||
continue |
||||
} |
||||
|
||||
snapshotFiles = append(snapshotFiles, info) |
||||
} |
||||
|
||||
if len(snapshotFiles) <= retention { |
||||
return nil, nil |
||||
} |
||||
|
||||
// sort newest-first so we can prune entries past the retention count
|
||||
sort.Slice(snapshotFiles, func(i, j int) bool { |
||||
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified) |
||||
}) |
||||
|
||||
deleted := []string{} |
||||
for _, df := range snapshotFiles[retention:] { |
||||
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key) |
||||
|
||||
key := path.Base(df.Key) |
||||
if err := c.DeleteSnapshot(ctx, key); err != nil { |
||||
return deleted, err |
||||
} |
||||
deleted = append(deleted, key) |
||||
} |
||||
|
||||
return deleted, nil |
||||
} |
||||
|
||||
// DeleteSnapshot deletes the selected snapshot (and its metadata) from S3
|
||||
func (c *Client) DeleteSnapshot(ctx context.Context, key string) error { |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
|
||||
key = path.Join(c.etcdS3.Folder, key) |
||||
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}) |
||||
if err == nil || snapshot.IsNotExist(err) { |
||||
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key)) |
||||
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) { |
||||
err = merr |
||||
} |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
// listSnapshots provides a list of currently stored
|
||||
// snapshots in S3 along with their relevant
|
||||
// metadata.
|
||||
func (c *Client) ListSnapshots(ctx context.Context) (map[string]snapshot.File, error) { |
||||
snapshots := map[string]snapshot.File{} |
||||
metadatas := []string{} |
||||
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration) |
||||
defer cancel() |
||||
|
||||
opts := minio.ListObjectsOptions{ |
||||
Prefix: c.etcdS3.Folder, |
||||
Recursive: true, |
||||
} |
||||
|
||||
objects := c.mc.ListObjects(ctx, c.etcdS3.Bucket, opts) |
||||
|
||||
for obj := range objects { |
||||
if obj.Err != nil { |
||||
return nil, obj.Err |
||||
} |
||||
if obj.Size == 0 { |
||||
continue |
||||
} |
||||
|
||||
if o, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, obj.Key, minio.StatObjectOptions{}); err != nil { |
||||
logrus.Warnf("Failed to get object metadata: %v", err) |
||||
} else { |
||||
obj = o |
||||
} |
||||
|
||||
filename := path.Base(obj.Key) |
||||
if path.Base(path.Dir(obj.Key)) == snapshot.MetadataDir { |
||||
metadatas = append(metadatas, obj.Key) |
||||
continue |
||||
} |
||||
|
||||
basename, compressed := strings.CutSuffix(filename, snapshot.CompressedExtension) |
||||
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) |
||||
if err != nil { |
||||
ts = obj.LastModified.Unix() |
||||
} |
||||
|
||||
sf := snapshot.File{ |
||||
Name: filename, |
||||
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, obj.Key), |
||||
NodeName: "s3", |
||||
CreatedAt: &metav1.Time{ |
||||
Time: time.Unix(ts, 0), |
||||
}, |
||||
Size: obj.Size, |
||||
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3}, |
||||
Status: snapshot.SuccessfulStatus, |
||||
Compressed: compressed, |
||||
NodeSource: obj.UserMetadata[nodeNameKey], |
||||
TokenHash: obj.UserMetadata[tokenHashKey], |
||||
} |
||||
sfKey := sf.GenerateConfigMapKey() |
||||
snapshots[sfKey] = sf |
||||
} |
||||
|
||||
for _, metadataKey := range metadatas { |
||||
filename := path.Base(metadataKey) |
||||
dsf := &snapshot.File{Name: filename, NodeName: "s3"} |
||||
sfKey := dsf.GenerateConfigMapKey() |
||||
if sf, ok := snapshots[sfKey]; ok { |
||||
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, metadataKey) |
||||
if obj, err := c.mc.GetObject(ctx, c.etcdS3.Bucket, metadataKey, minio.GetObjectOptions{}); err != nil { |
||||
if snapshot.IsNotExist(err) { |
||||
logrus.Debugf("Failed to get snapshot metadata: %v", err) |
||||
} else { |
||||
logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err) |
||||
} |
||||
} else { |
||||
if m, err := ioutil.ReadAll(obj); err != nil { |
||||
if snapshot.IsNotExist(err) { |
||||
logrus.Debugf("Failed to read snapshot metadata: %v", err) |
||||
} else { |
||||
logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err) |
||||
} |
||||
} else { |
||||
sf.Metadata = base64.StdEncoding.EncodeToString(m) |
||||
snapshots[sfKey] = sf |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
return snapshots, nil |
||||
} |
||||
|
||||
func loadEndpointCAs(etcdS3EndpointCA string) (*tls.Config, error) { |
||||
var loaded bool |
||||
certPool := x509.NewCertPool() |
||||
|
||||
for _, ca := range strings.Split(etcdS3EndpointCA, " ") { |
||||
// Try to decode the value as base64-encoded data - yes, a base64 string that itself
|
||||
// contains multiline, ascii-armored, base64-encoded certificate data - as would be produced
|
||||
// by `base64 --wrap=0 /path/to/cert.pem`. If this fails, assume the value is the path to a
|
||||
// file on disk, and try to read that. This is backwards compatible with RKE1.
|
||||
caData, err := base64.StdEncoding.DecodeString(ca) |
||||
if err != nil { |
||||
caData, err = os.ReadFile(ca) |
||||
} |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if certPool.AppendCertsFromPEM(caData) { |
||||
loaded = true |
||||
} |
||||
} |
||||
|
||||
if loaded { |
||||
return &tls.Config{RootCAs: certPool}, nil |
||||
} |
||||
return nil, errors.New("no certificates loaded from etcd-s3-endpoint-ca") |
||||
} |
||||
|
||||
func bucketLookupType(endpoint string) minio.BucketLookupType { |
||||
if strings.Contains(endpoint, "aliyun") { // backwards compatible with RKE1
|
||||
return minio.BucketLookupDNS |
||||
} |
||||
return minio.BucketLookupAuto |
||||
} |
@ -0,0 +1,270 @@
|
||||
package snapshot |
||||
|
||||
import ( |
||||
"crypto/sha256" |
||||
"encoding/base64" |
||||
"encoding/hex" |
||||
"encoding/json" |
||||
"fmt" |
||||
"net/http" |
||||
"os" |
||||
"regexp" |
||||
"strings" |
||||
|
||||
k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" |
||||
"github.com/k3s-io/k3s/pkg/daemons/config" |
||||
"github.com/k3s-io/k3s/pkg/version" |
||||
"github.com/minio/minio-go/v7" |
||||
"github.com/sirupsen/logrus" |
||||
v1 "k8s.io/api/core/v1" |
||||
"k8s.io/apimachinery/pkg/api/resource" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/util/validation" |
||||
"k8s.io/utils/ptr" |
||||
) |
||||
|
||||
type SnapshotStatus string |
||||
|
||||
const ( |
||||
SuccessfulStatus SnapshotStatus = "successful" |
||||
FailedStatus SnapshotStatus = "failed" |
||||
|
||||
CompressedExtension = ".zip" |
||||
MetadataDir = ".metadata" |
||||
) |
||||
|
||||
var ( |
||||
InvalidKeyChars = regexp.MustCompile(`[^-._a-zA-Z0-9]`) |
||||
|
||||
LabelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node" |
||||
AnnotationTokenHash = "etcd." + version.Program + ".cattle.io/snapshot-token-hash" |
||||
|
||||
ExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" |
||||
) |
||||
|
||||
type S3Config struct { |
||||
config.EtcdS3 |
||||
// Mask these fields in the embedded struct to avoid serializing their values in the snapshotFile record
|
||||
AccessKey string `json:"accessKey,omitempty"` |
||||
ConfigSecret string `json:"configSecret,omitempty"` |
||||
Proxy string `json:"proxy,omitempty"` |
||||
SecretKey string `json:"secretKey,omitempty"` |
||||
Timeout metav1.Duration `json:"timeout,omitempty"` |
||||
} |
||||
|
||||
// File represents a single snapshot and it's
|
||||
// metadata.
|
||||
type File struct { |
||||
Name string `json:"name"` |
||||
// Location contains the full path of the snapshot. For
|
||||
// local paths, the location will be prefixed with "file://".
|
||||
Location string `json:"location,omitempty"` |
||||
Metadata string `json:"metadata,omitempty"` |
||||
Message string `json:"message,omitempty"` |
||||
NodeName string `json:"nodeName,omitempty"` |
||||
CreatedAt *metav1.Time `json:"createdAt,omitempty"` |
||||
Size int64 `json:"size,omitempty"` |
||||
Status SnapshotStatus `json:"status,omitempty"` |
||||
S3 *S3Config `json:"s3Config,omitempty"` |
||||
Compressed bool `json:"compressed"` |
||||
|
||||
// these fields are used for the internal representation of the snapshot
|
||||
// to populate other fields before serialization to the legacy configmap.
|
||||
MetadataSource *v1.ConfigMap `json:"-"` |
||||
NodeSource string `json:"-"` |
||||
TokenHash string `json:"-"` |
||||
} |
||||
|
||||
// GenerateConfigMapKey generates a derived name for the snapshot that is safe for use
|
||||
// as a configmap key.
|
||||
func (sf *File) GenerateConfigMapKey() string { |
||||
name := InvalidKeyChars.ReplaceAllString(sf.Name, "_") |
||||
if sf.NodeName == "s3" { |
||||
return "s3-" + name |
||||
} |
||||
return "local-" + name |
||||
} |
||||
|
||||
// GenerateName generates a derived name for the snapshot that is safe for use
|
||||
// as a resource name.
|
||||
func (sf *File) GenerateName() string { |
||||
name := strings.ToLower(sf.Name) |
||||
nodename := sf.NodeSource |
||||
if nodename == "" { |
||||
nodename = sf.NodeName |
||||
} |
||||
// Include a digest of the hostname and location to ensure unique resource
|
||||
// names. Snapshots should already include the hostname, but this ensures we
|
||||
// don't accidentally hide records if a snapshot with the same name somehow
|
||||
// exists on multiple nodes.
|
||||
digest := sha256.Sum256([]byte(nodename + sf.Location)) |
||||
// If the lowercase filename isn't usable as a resource name, and short enough that we can include a prefix and suffix,
|
||||
// generate a safe name derived from the hostname and timestamp.
|
||||
if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 || len(name)+13 > validation.DNS1123SubdomainMaxLength { |
||||
nodename, _, _ := strings.Cut(nodename, ".") |
||||
name = fmt.Sprintf("etcd-snapshot-%s-%d", nodename, sf.CreatedAt.Unix()) |
||||
if sf.Compressed { |
||||
name += CompressedExtension |
||||
} |
||||
} |
||||
if sf.NodeName == "s3" { |
||||
return "s3-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] |
||||
} |
||||
return "local-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] |
||||
} |
||||
|
||||
// FromETCDSnapshotFile translates fields to the File from the ETCDSnapshotFile
|
||||
func (sf *File) FromETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) { |
||||
if esf == nil { |
||||
panic("cannot convert from nil ETCDSnapshotFile") |
||||
} |
||||
|
||||
sf.Name = esf.Spec.SnapshotName |
||||
sf.Location = esf.Spec.Location |
||||
sf.CreatedAt = esf.Status.CreationTime |
||||
sf.NodeSource = esf.Spec.NodeName |
||||
sf.Compressed = strings.HasSuffix(esf.Spec.SnapshotName, CompressedExtension) |
||||
|
||||
if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse { |
||||
sf.Status = SuccessfulStatus |
||||
} else { |
||||
sf.Status = FailedStatus |
||||
} |
||||
|
||||
if esf.Status.Size != nil { |
||||
sf.Size = esf.Status.Size.Value() |
||||
} |
||||
|
||||
if esf.Status.Error != nil { |
||||
if esf.Status.Error.Time != nil { |
||||
sf.CreatedAt = esf.Status.Error.Time |
||||
} |
||||
message := "etcd snapshot failed" |
||||
if esf.Status.Error.Message != nil { |
||||
message = *esf.Status.Error.Message |
||||
} |
||||
sf.Message = base64.StdEncoding.EncodeToString([]byte(message)) |
||||
} |
||||
|
||||
if len(esf.Spec.Metadata) > 0 { |
||||
if b, err := json.Marshal(esf.Spec.Metadata); err != nil { |
||||
logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err) |
||||
} else { |
||||
sf.Metadata = base64.StdEncoding.EncodeToString(b) |
||||
} |
||||
} |
||||
|
||||
if tokenHash := esf.Annotations[AnnotationTokenHash]; tokenHash != "" { |
||||
sf.TokenHash = tokenHash |
||||
} |
||||
|
||||
if esf.Spec.S3 == nil { |
||||
sf.NodeName = esf.Spec.NodeName |
||||
} else { |
||||
sf.NodeName = "s3" |
||||
sf.S3 = &S3Config{ |
||||
EtcdS3: config.EtcdS3{ |
||||
Endpoint: esf.Spec.S3.Endpoint, |
||||
EndpointCA: esf.Spec.S3.EndpointCA, |
||||
SkipSSLVerify: esf.Spec.S3.SkipSSLVerify, |
||||
Bucket: esf.Spec.S3.Bucket, |
||||
Region: esf.Spec.S3.Region, |
||||
Folder: esf.Spec.S3.Prefix, |
||||
Insecure: esf.Spec.S3.Insecure, |
||||
}, |
||||
} |
||||
} |
||||
} |
||||
|
||||
// ToETCDSnapshotFile translates fields from the File to the ETCDSnapshotFile
|
||||
func (sf *File) ToETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) { |
||||
if esf == nil { |
||||
panic("cannot convert to nil ETCDSnapshotFile") |
||||
} |
||||
esf.Spec.SnapshotName = sf.Name |
||||
esf.Spec.Location = sf.Location |
||||
esf.Status.CreationTime = sf.CreatedAt |
||||
esf.Status.ReadyToUse = ptr.To(sf.Status == SuccessfulStatus) |
||||
esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI) |
||||
|
||||
if sf.NodeSource != "" { |
||||
esf.Spec.NodeName = sf.NodeSource |
||||
} else { |
||||
esf.Spec.NodeName = sf.NodeName |
||||
} |
||||
|
||||
if sf.Message != "" { |
||||
var message string |
||||
b, err := base64.StdEncoding.DecodeString(sf.Message) |
||||
if err != nil { |
||||
logrus.Warnf("Failed to decode error message for %s: %v", sf.Name, err) |
||||
message = "etcd snapshot failed" |
||||
} else { |
||||
message = string(b) |
||||
} |
||||
esf.Status.Error = &k3s.ETCDSnapshotError{ |
||||
Time: sf.CreatedAt, |
||||
Message: &message, |
||||
} |
||||
} |
||||
|
||||
if sf.MetadataSource != nil { |
||||
esf.Spec.Metadata = sf.MetadataSource.Data |
||||
} else if sf.Metadata != "" { |
||||
metadata, err := base64.StdEncoding.DecodeString(sf.Metadata) |
||||
if err != nil { |
||||
logrus.Warnf("Failed to decode metadata for %s: %v", sf.Name, err) |
||||
} else { |
||||
if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil { |
||||
logrus.Warnf("Failed to unmarshal metadata for %s: %v", sf.Name, err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
if esf.ObjectMeta.Labels == nil { |
||||
esf.ObjectMeta.Labels = map[string]string{} |
||||
} |
||||
|
||||
if esf.ObjectMeta.Annotations == nil { |
||||
esf.ObjectMeta.Annotations = map[string]string{} |
||||
} |
||||
|
||||
if sf.TokenHash != "" { |
||||
esf.ObjectMeta.Annotations[AnnotationTokenHash] = sf.TokenHash |
||||
} |
||||
|
||||
if sf.S3 == nil { |
||||
esf.ObjectMeta.Labels[LabelStorageNode] = esf.Spec.NodeName |
||||
} else { |
||||
esf.ObjectMeta.Labels[LabelStorageNode] = "s3" |
||||
esf.Spec.S3 = &k3s.ETCDSnapshotS3{ |
||||
Endpoint: sf.S3.Endpoint, |
||||
EndpointCA: sf.S3.EndpointCA, |
||||
SkipSSLVerify: sf.S3.SkipSSLVerify, |
||||
Bucket: sf.S3.Bucket, |
||||
Region: sf.S3.Region, |
||||
Prefix: sf.S3.Folder, |
||||
Insecure: sf.S3.Insecure, |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Marshal returns the JSON encoding of the snapshot File, with metadata inlined as base64.
|
||||
func (sf *File) Marshal() ([]byte, error) { |
||||
if sf.MetadataSource != nil { |
||||
if m, err := json.Marshal(sf.MetadataSource.Data); err != nil { |
||||
logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", ExtraMetadataConfigMapName, err) |
||||
} else { |
||||
sf.Metadata = base64.StdEncoding.EncodeToString(m) |
||||
} |
||||
} |
||||
return json.Marshal(sf) |
||||
} |
||||
|
||||
// IsNotExist returns true if the error is from http.StatusNotFound or os.IsNotExist
|
||||
func IsNotExist(err error) bool { |
||||
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) { |
||||
return true |
||||
} |
||||
return false |
||||
} |
Loading…
Reference in new issue