k3s/pkg/etcd/s3/s3.go

581 lines
19 KiB
Go

package s3
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"net/textproto"
"net/url"
"os"
"path"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/lru"
)
var (
clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id")
tokenHashKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-token-hash")
nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name")
)
var defaultEtcdS3 = &config.EtcdS3{
Endpoint: "s3.amazonaws.com",
Region: "us-east-1",
Timeout: metav1.Duration{
Duration: 5 * time.Minute,
},
}
var (
controller *Controller
cErr error
once sync.Once
)
// Controller maintains state for S3 functionality,
// and can be used to get clients for interacting with
// an S3 service, given specific client configuration.
type Controller struct {
clusterID string
tokenHash string
nodeName string
core core.Interface
clientCache *lru.Cache
}
// Client holds state for a given configuration - a preconfigured minio client,
// and reference to the config it was created for.
type Client struct {
mc *minio.Client
etcdS3 *config.EtcdS3
controller *Controller
}
// Start initializes the cache and sets the cluster id and token hash,
// returning a reference to the the initialized controller. Initialization is
// locked by a sync.Once to prevent races, and multiple calls to start will
// return the same controller or error.
func Start(ctx context.Context, config *config.Control) (*Controller, error) {
once.Do(func() {
c := &Controller{
clientCache: lru.New(5),
nodeName: os.Getenv("NODE_NAME"),
}
if config.ClusterReset {
logrus.Debug("Skip setting S3 snapshot cluster ID and server token hash during cluster-reset")
controller = c
} else {
logrus.Debug("Getting S3 snapshot cluster ID and server token hash")
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if config.Runtime.Core == nil {
return false, nil
}
c.core = config.Runtime.Core.Core()
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
ns, err := c.core.V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID")
}
c.clusterID = string(ns.UID)
tokenHash, err := util.GetTokenHash(config)
if err != nil {
return false, errors.Wrap(err, "failed to set S3 snapshot server token hash")
}
c.tokenHash = tokenHash
return true, nil
}); err != nil {
cErr = err
} else {
controller = c
}
}
})
return controller, cErr
}
func (c *Controller) GetClient(ctx context.Context, etcdS3 *config.EtcdS3) (*Client, error) {
if etcdS3 == nil {
return nil, errors.New("nil s3 configuration")
}
// update ConfigSecret in defaults so that comparisons between current and default config
// ignore ConfigSecret when deciding if CLI configuration is present.
defaultEtcdS3.ConfigSecret = etcdS3.ConfigSecret
// If config is default, try to load config from secret, and fail if it cannot be retrieved or if the secret name is not set.
// If config is not default, and secret name is set, warn that the secret is being ignored
isDefault := reflect.DeepEqual(defaultEtcdS3, etcdS3)
if etcdS3.ConfigSecret != "" {
if isDefault {
e, err := c.getConfigFromSecret(etcdS3.ConfigSecret)
if err != nil {
return nil, errors.Wrapf(err, "failed to get config from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
}
logrus.Infof("Using etcd s3 configuration from etcd-s3-config-secret %q", etcdS3.ConfigSecret)
etcdS3 = e
} else {
logrus.Warnf("Ignoring s3 configuration from etcd-s3-config-secret %q due to existing configuration from CLI or config file", etcdS3.ConfigSecret)
}
} else if isDefault {
return nil, errors.New("s3 configuration was not set")
}
// used just for logging
scheme := "https://"
if etcdS3.Insecure {
scheme = "http://"
}
// Try to get an existing client from cache. The entire EtcdS3 struct
// (including the key id and secret) is used as the cache key, but we only
// print the endpoint and bucket name to avoid leaking creds into the logs.
if client, ok := c.clientCache.Get(*etcdS3); ok {
logrus.Infof("Reusing cached S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)
return client.(*Client), nil
}
logrus.Infof("Attempting to create new S3 client for endpoint=%q bucket=%q folder=%q", scheme+etcdS3.Endpoint, etcdS3.Bucket, etcdS3.Folder)
if etcdS3.Bucket == "" {
return nil, errors.New("s3 bucket name was not set")
}
tr := http.DefaultTransport.(*http.Transport).Clone()
// You can either disable SSL verification or use a custom CA bundle,
// it doesn't make sense to do both - if verification is disabled,
// the CA is not checked!
if etcdS3.SkipSSLVerify {
tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
} else if etcdS3.EndpointCA != "" {
tlsConfig, err := loadEndpointCAs(etcdS3.EndpointCA)
if err != nil {
return nil, err
}
tr.TLSClientConfig = tlsConfig
}
// Set a fixed proxy URL, if requested by the user. This replaces the default,
// which calls ProxyFromEnvironment to read proxy settings from the environment.
if etcdS3.Proxy != "" {
var u *url.URL
var err error
// proxy address of literal "none" disables all use of a proxy by S3
if etcdS3.Proxy != "none" {
u, err = url.Parse(etcdS3.Proxy)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd-s3-proxy value as URL")
}
if u.Scheme == "" || u.Host == "" {
return nil, fmt.Errorf("proxy URL must include scheme and host")
}
}
tr.Proxy = http.ProxyURL(u)
}
var creds *credentials.Credentials
if len(etcdS3.AccessKey) == 0 && len(etcdS3.SecretKey) == 0 {
creds = credentials.NewIAM("") // for running on ec2 instance
if _, err := creds.Get(); err != nil {
return nil, errors.Wrap(err, "failed to get IAM credentials")
}
} else {
creds = credentials.NewStaticV4(etcdS3.AccessKey, etcdS3.SecretKey, "")
}
opt := minio.Options{
Creds: creds,
Secure: !etcdS3.Insecure,
Region: etcdS3.Region,
Transport: tr,
BucketLookup: bucketLookupType(etcdS3.Endpoint),
}
mc, err := minio.New(etcdS3.Endpoint, &opt)
if err != nil {
return nil, err
}
logrus.Infof("Checking if S3 bucket %s exists", etcdS3.Bucket)
ctx, cancel := context.WithTimeout(ctx, etcdS3.Timeout.Duration)
defer cancel()
exists, err := mc.BucketExists(ctx, etcdS3.Bucket)
if err != nil {
return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", etcdS3.Bucket)
}
if !exists {
return nil, fmt.Errorf("bucket %s does not exist", etcdS3.Bucket)
}
logrus.Infof("S3 bucket %s exists", etcdS3.Bucket)
client := &Client{
mc: mc,
etcdS3: etcdS3,
controller: c,
}
logrus.Infof("Adding S3 client to cache")
c.clientCache.Add(*etcdS3, client)
return client, nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
func (c *Client) Upload(ctx context.Context, snapshotPath string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshot.File, error) {
basename := filepath.Base(snapshotPath)
metadata := filepath.Join(filepath.Dir(snapshotPath), "..", snapshot.MetadataDir, basename)
snapshotKey := path.Join(c.etcdS3.Folder, basename)
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, basename)
sf := &snapshot.File{
Name: basename,
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, snapshotKey),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: now,
},
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3},
Compressed: strings.HasSuffix(snapshotPath, snapshot.CompressedExtension),
MetadataSource: extraMetadata,
NodeSource: c.controller.nodeName,
}
logrus.Infof("Uploading snapshot to s3://%s/%s", c.etcdS3.Bucket, snapshotKey)
uploadInfo, err := c.uploadSnapshot(ctx, snapshotKey, snapshotPath)
if err != nil {
sf.Status = snapshot.FailedStatus
sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error()))
} else {
sf.Status = snapshot.SuccessfulStatus
sf.Size = uploadInfo.Size
sf.TokenHash = c.controller.tokenHash
}
if uploadInfo, err := c.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil {
logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err)
} else if uploadInfo.Size != 0 {
logrus.Infof("Uploaded snapshot metadata s3://%s/%s", c.etcdS3.Bucket, metadataKey)
}
return sf, err
}
// uploadSnapshot uploads the snapshot file to S3 using the minio API.
func (c *Client) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
opts := minio.PutObjectOptions{
NumThreads: 2,
UserMetadata: map[string]string{
clusterIDKey: c.controller.clusterID,
nodeNameKey: c.controller.nodeName,
tokenHashKey: c.controller.tokenHash,
},
}
if strings.HasSuffix(key, snapshot.CompressedExtension) {
opts.ContentType = "application/zip"
} else {
opts.ContentType = "application/octet-stream"
}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}
// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API.
// The upload is silently skipped if no extra metadata is provided.
func (c *Client) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return minio.UploadInfo{}, nil
}
return minio.UploadInfo{}, err
}
opts := minio.PutObjectOptions{
NumThreads: 2,
ContentType: "application/json",
UserMetadata: map[string]string{
clusterIDKey: c.controller.clusterID,
nodeNameKey: c.controller.nodeName,
tokenHashKey: c.controller.tokenHash,
},
}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
return c.mc.FPutObject(ctx, c.etcdS3.Bucket, key, path, opts)
}
// Download downloads the given snapshot from the configured S3
// compatible backend. If the file is successfully downloaded, it returns
// the path the file was downloaded to.
func (c *Client) Download(ctx context.Context, snapshotName, snapshotDir string) (string, error) {
snapshotKey := path.Join(c.etcdS3.Folder, snapshotName)
metadataKey := path.Join(c.etcdS3.Folder, snapshot.MetadataDir, snapshotName)
snapshotFile := filepath.Join(snapshotDir, snapshotName)
metadataFile := filepath.Join(snapshotDir, "..", snapshot.MetadataDir, snapshotName)
if err := c.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil {
return "", err
}
if err := c.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil {
return "", err
}
return snapshotFile, nil
}
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func (c *Client) downloadSnapshot(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot from s3://%s/%s", c.etcdS3.Bucket, key)
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
defer os.Chmod(file, 0600)
return c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
}
// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API.
// No error is returned if the metadata file does not exist, as it is optional.
func (c *Client) downloadSnapshotMetadata(ctx context.Context, key, file string) error {
logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, key)
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
defer os.Chmod(file, 0600)
err := c.mc.FGetObject(ctx, c.etcdS3.Bucket, key, file, minio.GetObjectOptions{})
if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound {
return nil
}
return err
}
// SnapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
// Returns a list of pruned snapshot names.
func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix string) ([]string, error) {
if retention < 1 {
return nil, nil
}
prefix = path.Join(c.etcdS3.Folder, prefix)
logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", retention, c.etcdS3.Bucket, prefix)
var snapshotFiles []minio.ObjectInfo
toCtx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: prefix,
Recursive: true,
}
for info := range c.mc.ListObjects(toCtx, c.etcdS3.Bucket, opts) {
if info.Err != nil {
return nil, info.Err
}
// skip metadata
if path.Base(path.Dir(info.Key)) == snapshot.MetadataDir {
continue
}
snapshotFiles = append(snapshotFiles, info)
}
if len(snapshotFiles) <= retention {
return nil, nil
}
// sort newest-first so we can prune entries past the retention count
sort.Slice(snapshotFiles, func(i, j int) bool {
return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified)
})
deleted := []string{}
for _, df := range snapshotFiles[retention:] {
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)
key := path.Base(df.Key)
if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
return deleted, err
}
deleted = append(deleted, key)
}
return deleted, nil
}
// DeleteSnapshot deletes the selected snapshot (and its metadata) from S3
func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
key = path.Join(c.etcdS3.Folder, key)
_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
if err == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
// check for and try to delete the metadata regardless of whether or not the
// snapshot existed, just to ensure that things are cleaned up in the case of
// ephemeral errors. Metadata delete errors are only exposed if the object
// exists and fails to delete.
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
if merr == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
// return error from snapshot StatObject call, so that callers can determine
// if the object was actually deleted or not by checking for a NotFound error.
return err
}
// listSnapshots provides a list of currently stored
// snapshots in S3 along with their relevant
// metadata.
func (c *Client) ListSnapshots(ctx context.Context) (map[string]snapshot.File, error) {
snapshots := map[string]snapshot.File{}
metadatas := []string{}
ctx, cancel := context.WithTimeout(ctx, c.etcdS3.Timeout.Duration)
defer cancel()
opts := minio.ListObjectsOptions{
Prefix: c.etcdS3.Folder,
Recursive: true,
}
objects := c.mc.ListObjects(ctx, c.etcdS3.Bucket, opts)
for obj := range objects {
if obj.Err != nil {
return nil, obj.Err
}
if obj.Size == 0 {
continue
}
if o, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, obj.Key, minio.StatObjectOptions{}); err != nil {
logrus.Warnf("Failed to get object metadata: %v", err)
} else {
obj = o
}
filename := path.Base(obj.Key)
if path.Base(path.Dir(obj.Key)) == snapshot.MetadataDir {
metadatas = append(metadatas, obj.Key)
continue
}
basename, compressed := strings.CutSuffix(filename, snapshot.CompressedExtension)
ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64)
if err != nil {
ts = obj.LastModified.Unix()
}
sf := snapshot.File{
Name: filename,
Location: fmt.Sprintf("s3://%s/%s", c.etcdS3.Bucket, obj.Key),
NodeName: "s3",
CreatedAt: &metav1.Time{
Time: time.Unix(ts, 0),
},
Size: obj.Size,
S3: &snapshot.S3Config{EtcdS3: *c.etcdS3},
Status: snapshot.SuccessfulStatus,
Compressed: compressed,
NodeSource: obj.UserMetadata[nodeNameKey],
TokenHash: obj.UserMetadata[tokenHashKey],
}
sfKey := sf.GenerateConfigMapKey()
snapshots[sfKey] = sf
}
for _, metadataKey := range metadatas {
filename := path.Base(metadataKey)
dsf := &snapshot.File{Name: filename, NodeName: "s3"}
sfKey := dsf.GenerateConfigMapKey()
if sf, ok := snapshots[sfKey]; ok {
logrus.Debugf("Loading snapshot metadata from s3://%s/%s", c.etcdS3.Bucket, metadataKey)
if obj, err := c.mc.GetObject(ctx, c.etcdS3.Bucket, metadataKey, minio.GetObjectOptions{}); err != nil {
if snapshot.IsNotExist(err) {
logrus.Debugf("Failed to get snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to get snapshot metadata for %s: %v", filename, err)
}
} else {
if m, err := ioutil.ReadAll(obj); err != nil {
if snapshot.IsNotExist(err) {
logrus.Debugf("Failed to read snapshot metadata: %v", err)
} else {
logrus.Warnf("Failed to read snapshot metadata for %s: %v", filename, err)
}
} else {
sf.Metadata = base64.StdEncoding.EncodeToString(m)
snapshots[sfKey] = sf
}
}
}
}
return snapshots, nil
}
func loadEndpointCAs(etcdS3EndpointCA string) (*tls.Config, error) {
var loaded bool
certPool := x509.NewCertPool()
for _, ca := range strings.Split(etcdS3EndpointCA, " ") {
// Try to decode the value as base64-encoded data - yes, a base64 string that itself
// contains multiline, ascii-armored, base64-encoded certificate data - as would be produced
// by `base64 --wrap=0 /path/to/cert.pem`. If this fails, assume the value is the path to a
// file on disk, and try to read that. This is backwards compatible with RKE1.
caData, err := base64.StdEncoding.DecodeString(ca)
if err != nil {
caData, err = os.ReadFile(ca)
}
if err != nil {
return nil, err
}
if certPool.AppendCertsFromPEM(caData) {
loaded = true
}
}
if loaded {
return &tls.Config{RootCAs: certPool}, nil
}
return nil, errors.New("no certificates loaded from etcd-s3-endpoint-ca")
}
func bucketLookupType(endpoint string) minio.BucketLookupType {
if strings.Contains(endpoint, "aliyun") { // backwards compatible with RKE1
return minio.BucketLookupDNS
}
return minio.BucketLookupAuto
}