mirror of https://github.com/k3s-io/k3s
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
313 lines
8.3 KiB
313 lines
8.3 KiB
package etcd |
|
|
|
import ( |
|
"context" |
|
"crypto/tls" |
|
"crypto/x509" |
|
"encoding/base64" |
|
"encoding/pem" |
|
"fmt" |
|
"io" |
|
"io/ioutil" |
|
"net/http" |
|
"os" |
|
"path/filepath" |
|
"sort" |
|
"strings" |
|
"time" |
|
|
|
"github.com/k3s-io/k3s/pkg/daemons/config" |
|
"github.com/minio/minio-go/v7" |
|
"github.com/minio/minio-go/v7/pkg/credentials" |
|
"github.com/pkg/errors" |
|
"github.com/sirupsen/logrus" |
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
|
) |
|
|
|
// S3 maintains state for S3 functionality. |
|
type S3 struct { |
|
config *config.Control |
|
client *minio.Client |
|
} |
|
|
|
// newS3 creates a new value of type s3 pointer with a |
|
// copy of the config.Control pointer and initializes |
|
// a new Minio client. |
|
func NewS3(ctx context.Context, config *config.Control) (*S3, error) { |
|
if config.EtcdS3BucketName == "" { |
|
return nil, errors.New("s3 bucket name was not set") |
|
} |
|
tr := http.DefaultTransport |
|
|
|
switch { |
|
case config.EtcdS3EndpointCA != "": |
|
trCA, err := setTransportCA(tr, config.EtcdS3EndpointCA, config.EtcdS3SkipSSLVerify) |
|
if err != nil { |
|
return nil, err |
|
} |
|
tr = trCA |
|
case config.EtcdS3 && config.EtcdS3SkipSSLVerify: |
|
tr.(*http.Transport).TLSClientConfig = &tls.Config{ |
|
InsecureSkipVerify: config.EtcdS3SkipSSLVerify, |
|
} |
|
} |
|
|
|
var creds *credentials.Credentials |
|
if len(config.EtcdS3AccessKey) == 0 && len(config.EtcdS3SecretKey) == 0 { |
|
creds = credentials.NewIAM("") // for running on ec2 instance |
|
} else { |
|
creds = credentials.NewStaticV4(config.EtcdS3AccessKey, config.EtcdS3SecretKey, "") |
|
} |
|
|
|
opt := minio.Options{ |
|
Creds: creds, |
|
Secure: !config.EtcdS3Insecure, |
|
Region: config.EtcdS3Region, |
|
Transport: tr, |
|
BucketLookup: bucketLookupType(config.EtcdS3Endpoint), |
|
} |
|
c, err := minio.New(config.EtcdS3Endpoint, &opt) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
logrus.Infof("Checking if S3 bucket %s exists", config.EtcdS3BucketName) |
|
|
|
ctx, cancel := context.WithTimeout(ctx, config.EtcdS3Timeout) |
|
defer cancel() |
|
|
|
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if !exists { |
|
return nil, fmt.Errorf("bucket: %s does not exist", config.EtcdS3BucketName) |
|
} |
|
logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName) |
|
|
|
return &S3{ |
|
config: config, |
|
client: c, |
|
}, nil |
|
} |
|
|
|
// upload uploads the given snapshot to the configured S3 |
|
// compatible backend. |
|
func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*snapshotFile, error) { |
|
logrus.Infof("Uploading snapshot %s to S3", snapshot) |
|
basename := filepath.Base(snapshot) |
|
var snapshotFileName string |
|
var sf snapshotFile |
|
if s.config.EtcdS3Folder != "" { |
|
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename) |
|
} else { |
|
snapshotFileName = basename |
|
} |
|
|
|
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
|
defer cancel() |
|
opts := minio.PutObjectOptions{ |
|
ContentType: "application/zip", |
|
NumThreads: 2, |
|
} |
|
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts) |
|
if err != nil { |
|
sf = snapshotFile{ |
|
Name: filepath.Base(uploadInfo.Key), |
|
Metadata: extraMetadata, |
|
NodeName: "s3", |
|
CreatedAt: &metav1.Time{ |
|
Time: now, |
|
}, |
|
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), |
|
Size: 0, |
|
Status: failedSnapshotStatus, |
|
S3: &s3Config{ |
|
Endpoint: s.config.EtcdS3Endpoint, |
|
EndpointCA: s.config.EtcdS3EndpointCA, |
|
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, |
|
Bucket: s.config.EtcdS3BucketName, |
|
Region: s.config.EtcdS3Region, |
|
Folder: s.config.EtcdS3Folder, |
|
Insecure: s.config.EtcdS3Insecure, |
|
}, |
|
} |
|
logrus.Errorf("Error received during snapshot upload to S3: %s", err) |
|
} else { |
|
ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339)) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
sf = snapshotFile{ |
|
Name: filepath.Base(uploadInfo.Key), |
|
Metadata: extraMetadata, |
|
NodeName: "s3", |
|
CreatedAt: &metav1.Time{ |
|
Time: ca, |
|
}, |
|
Size: uploadInfo.Size, |
|
Status: successfulSnapshotStatus, |
|
S3: &s3Config{ |
|
Endpoint: s.config.EtcdS3Endpoint, |
|
EndpointCA: s.config.EtcdS3EndpointCA, |
|
SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, |
|
Bucket: s.config.EtcdS3BucketName, |
|
Region: s.config.EtcdS3Region, |
|
Folder: s.config.EtcdS3Folder, |
|
Insecure: s.config.EtcdS3Insecure, |
|
}, |
|
} |
|
} |
|
return &sf, nil |
|
} |
|
|
|
// download downloads the given snapshot from the configured S3 |
|
// compatible backend. |
|
func (s *S3) Download(ctx context.Context) error { |
|
var remotePath string |
|
if s.config.EtcdS3Folder != "" { |
|
remotePath = filepath.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) |
|
} else { |
|
remotePath = s.config.ClusterResetRestorePath |
|
} |
|
|
|
logrus.Debugf("retrieving snapshot: %s", remotePath) |
|
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
|
defer cancel() |
|
|
|
r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{}) |
|
if err != nil { |
|
return nil |
|
} |
|
defer r.Close() |
|
|
|
snapshotDir, err := snapshotDir(s.config, true) |
|
if err != nil { |
|
return errors.Wrap(err, "failed to get the snapshot dir") |
|
} |
|
|
|
fullSnapshotPath := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) |
|
sf, err := os.Create(fullSnapshotPath) |
|
if err != nil { |
|
return err |
|
} |
|
defer sf.Close() |
|
|
|
stat, err := r.Stat() |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if _, err := io.CopyN(sf, r, stat.Size); err != nil { |
|
return err |
|
} |
|
|
|
s.config.ClusterResetRestorePath = fullSnapshotPath |
|
|
|
return os.Chmod(fullSnapshotPath, 0600) |
|
} |
|
|
|
// snapshotPrefix returns the prefix used in the |
|
// naming of the snapshots. |
|
func (s *S3) snapshotPrefix() string { |
|
nodeName := os.Getenv("NODE_NAME") |
|
fullSnapshotPrefix := s.config.EtcdSnapshotName + "-" + nodeName |
|
var prefix string |
|
if s.config.EtcdS3Folder != "" { |
|
prefix = filepath.Join(s.config.EtcdS3Folder, fullSnapshotPrefix) |
|
} else { |
|
prefix = fullSnapshotPrefix |
|
} |
|
return prefix |
|
} |
|
|
|
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node. |
|
func (s *S3) snapshotRetention(ctx context.Context) error { |
|
if s.config.EtcdSnapshotRetention < 1 { |
|
return nil |
|
} |
|
logrus.Infof("Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s", s.config.EtcdSnapshotRetention, s.snapshotPrefix()) |
|
|
|
var snapshotFiles []minio.ObjectInfo |
|
|
|
toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) |
|
defer cancel() |
|
|
|
loo := minio.ListObjectsOptions{ |
|
Recursive: true, |
|
Prefix: s.snapshotPrefix(), |
|
} |
|
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, loo) { |
|
if info.Err != nil { |
|
return info.Err |
|
} |
|
snapshotFiles = append(snapshotFiles, info) |
|
} |
|
|
|
if len(snapshotFiles) <= s.config.EtcdSnapshotRetention { |
|
return nil |
|
} |
|
|
|
sort.Slice(snapshotFiles, func(i, j int) bool { |
|
return snapshotFiles[i].Key < snapshotFiles[j].Key |
|
}) |
|
|
|
delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention |
|
for _, df := range snapshotFiles[:delCount] { |
|
logrus.Infof("Removing S3 snapshot: %s", df.Key) |
|
if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func readS3EndpointCA(endpointCA string) ([]byte, error) { |
|
ca, err := base64.StdEncoding.DecodeString(endpointCA) |
|
if err != nil { |
|
return ioutil.ReadFile(endpointCA) |
|
} |
|
return ca, nil |
|
} |
|
|
|
func setTransportCA(tr http.RoundTripper, endpointCA string, insecureSkipVerify bool) (http.RoundTripper, error) { |
|
ca, err := readS3EndpointCA(endpointCA) |
|
if err != nil { |
|
return tr, err |
|
} |
|
if !isValidCertificate(ca) { |
|
return tr, errors.New("endpoint-ca is not a valid x509 certificate") |
|
} |
|
|
|
certPool := x509.NewCertPool() |
|
certPool.AppendCertsFromPEM(ca) |
|
|
|
tr.(*http.Transport).TLSClientConfig = &tls.Config{ |
|
RootCAs: certPool, |
|
InsecureSkipVerify: insecureSkipVerify, |
|
} |
|
|
|
return tr, nil |
|
} |
|
|
|
// isValidCertificate checks to see if the given |
|
// byte slice is a valid x509 certificate. |
|
func isValidCertificate(c []byte) bool { |
|
p, _ := pem.Decode(c) |
|
if p == nil { |
|
return false |
|
} |
|
if _, err := x509.ParseCertificates(p.Bytes); err != nil { |
|
return false |
|
} |
|
return true |
|
} |
|
|
|
func bucketLookupType(endpoint string) minio.BucketLookupType { |
|
if strings.Contains(endpoint, "aliyun") { // backwards compt with RKE1 |
|
return minio.BucketLookupDNS |
|
} |
|
return minio.BucketLookupAuto |
|
}
|
|
|