package etcd
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// S3 maintains state for S3 functionality.
type S3 struct {
config * config . Control
client * minio . Client
}
// newS3 creates a new value of type s3 pointer with a
// copy of the config.Control pointer and initializes
// a new Minio client.
func NewS3 ( ctx context . Context , config * config . Control ) ( * S3 , error ) {
if config . EtcdS3BucketName == "" {
return nil , errors . New ( "s3 bucket name was not set" )
}
tr := http . DefaultTransport
switch {
case config . EtcdS3EndpointCA != "" :
trCA , err := setTransportCA ( tr , config . EtcdS3EndpointCA , config . EtcdS3SkipSSLVerify )
if err != nil {
return nil , err
}
tr = trCA
case config . EtcdS3 && config . EtcdS3SkipSSLVerify :
tr . ( * http . Transport ) . TLSClientConfig = & tls . Config {
InsecureSkipVerify : config . EtcdS3SkipSSLVerify ,
}
}
var creds * credentials . Credentials
if len ( config . EtcdS3AccessKey ) == 0 && len ( config . EtcdS3SecretKey ) == 0 {
creds = credentials . NewIAM ( "" ) // for running on ec2 instance
} else {
creds = credentials . NewStaticV4 ( config . EtcdS3AccessKey , config . EtcdS3SecretKey , "" )
}
opt := minio . Options {
Creds : creds ,
Secure : ! config . EtcdS3Insecure ,
Region : config . EtcdS3Region ,
Transport : tr ,
BucketLookup : bucketLookupType ( config . EtcdS3Endpoint ) ,
}
c , err := minio . New ( config . EtcdS3Endpoint , & opt )
if err != nil {
return nil , err
}
logrus . Infof ( "Checking if S3 bucket %s exists" , config . EtcdS3BucketName )
ctx , cancel := context . WithTimeout ( ctx , config . EtcdS3Timeout )
defer cancel ( )
exists , err := c . BucketExists ( ctx , config . EtcdS3BucketName )
if err != nil {
return nil , err
}
if ! exists {
return nil , fmt . Errorf ( "bucket: %s does not exist" , config . EtcdS3BucketName )
}
logrus . Infof ( "S3 bucket %s exists" , config . EtcdS3BucketName )
return & S3 {
config : config ,
client : c ,
} , nil
}
// upload uploads the given snapshot to the configured S3
// compatible backend.
func ( s * S3 ) upload ( ctx context . Context , snapshot string , extraMetadata * v1 . ConfigMap , now time . Time ) ( * snapshotFile , error ) {
logrus . Infof ( "Uploading snapshot %s to S3" , snapshot )
basename := filepath . Base ( snapshot )
var snapshotFileName string
var sf snapshotFile
if s . config . EtcdS3Folder != "" {
snapshotFileName = filepath . Join ( s . config . EtcdS3Folder , basename )
} else {
snapshotFileName = basename
}
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
defer cancel ( )
opts := minio . PutObjectOptions { NumThreads : 2 }
if strings . HasSuffix ( snapshot , compressedExtension ) {
opts . ContentType = "application/zip"
} else {
opts . ContentType = "application/octet-stream"
}
uploadInfo , err := s . client . FPutObject ( toCtx , s . config . EtcdS3BucketName , snapshotFileName , snapshot , opts )
if err != nil {
sf = snapshotFile {
Name : filepath . Base ( uploadInfo . Key ) ,
NodeName : "s3" ,
CreatedAt : & metav1 . Time {
Time : now ,
} ,
Message : base64 . StdEncoding . EncodeToString ( [ ] byte ( err . Error ( ) ) ) ,
Size : 0 ,
Status : failedSnapshotStatus ,
S3 : & s3Config {
Endpoint : s . config . EtcdS3Endpoint ,
EndpointCA : s . config . EtcdS3EndpointCA ,
SkipSSLVerify : s . config . EtcdS3SkipSSLVerify ,
Bucket : s . config . EtcdS3BucketName ,
Region : s . config . EtcdS3Region ,
Folder : s . config . EtcdS3Folder ,
Insecure : s . config . EtcdS3Insecure ,
} ,
metadataSource : extraMetadata ,
}
logrus . Errorf ( "Error received during snapshot upload to S3: %s" , err )
} else {
ca , err := time . Parse ( time . RFC3339 , uploadInfo . LastModified . Format ( time . RFC3339 ) )
if err != nil {
return nil , err
}
sf = snapshotFile {
Name : filepath . Base ( uploadInfo . Key ) ,
NodeName : "s3" ,
CreatedAt : & metav1 . Time {
Time : ca ,
} ,
Size : uploadInfo . Size ,
Status : successfulSnapshotStatus ,
S3 : & s3Config {
Endpoint : s . config . EtcdS3Endpoint ,
EndpointCA : s . config . EtcdS3EndpointCA ,
SkipSSLVerify : s . config . EtcdS3SkipSSLVerify ,
Bucket : s . config . EtcdS3BucketName ,
Region : s . config . EtcdS3Region ,
Folder : s . config . EtcdS3Folder ,
Insecure : s . config . EtcdS3Insecure ,
} ,
metadataSource : extraMetadata ,
}
}
return & sf , nil
}
// download downloads the given snapshot from the configured S3
// compatible backend.
func ( s * S3 ) Download ( ctx context . Context ) error {
var remotePath string
if s . config . EtcdS3Folder != "" {
remotePath = filepath . Join ( s . config . EtcdS3Folder , s . config . ClusterResetRestorePath )
} else {
remotePath = s . config . ClusterResetRestorePath
}
logrus . Debugf ( "retrieving snapshot: %s" , remotePath )
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
defer cancel ( )
r , err := s . client . GetObject ( toCtx , s . config . EtcdS3BucketName , remotePath , minio . GetObjectOptions { } )
if err != nil {
return nil
}
defer r . Close ( )
snapshotDir , err := snapshotDir ( s . config , true )
if err != nil {
return errors . Wrap ( err , "failed to get the snapshot dir" )
}
fullSnapshotPath := filepath . Join ( snapshotDir , s . config . ClusterResetRestorePath )
sf , err := os . Create ( fullSnapshotPath )
if err != nil {
return err
}
defer sf . Close ( )
stat , err := r . Stat ( )
if err != nil {
return err
}
if _ , err := io . CopyN ( sf , r , stat . Size ) ; err != nil {
return err
}
s . config . ClusterResetRestorePath = fullSnapshotPath
return os . Chmod ( fullSnapshotPath , 0600 )
}
// snapshotPrefix returns the prefix used in the
// naming of the snapshots.
func ( s * S3 ) snapshotPrefix ( ) string {
fullSnapshotPrefix := s . config . EtcdSnapshotName
var prefix string
if s . config . EtcdS3Folder != "" {
prefix = filepath . Join ( s . config . EtcdS3Folder , fullSnapshotPrefix )
} else {
prefix = fullSnapshotPrefix
}
return prefix
}
// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node.
func ( s * S3 ) snapshotRetention ( ctx context . Context ) error {
if s . config . EtcdSnapshotRetention < 1 {
return nil
}
logrus . Infof ( "Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s" , s . config . EtcdSnapshotRetention , s . snapshotPrefix ( ) )
var snapshotFiles [ ] minio . ObjectInfo
toCtx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
defer cancel ( )
loo := minio . ListObjectsOptions {
Recursive : true ,
Prefix : s . snapshotPrefix ( ) ,
}
for info := range s . client . ListObjects ( toCtx , s . config . EtcdS3BucketName , loo ) {
if info . Err != nil {
return info . Err
}
snapshotFiles = append ( snapshotFiles , info )
}
if len ( snapshotFiles ) <= s . config . EtcdSnapshotRetention {
return nil
}
sort . Slice ( snapshotFiles , func ( firstSnapshot , secondSnapshot int ) bool {
// it takes the key from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date
firstSnapshotName , secondSnapshotName := strings . Split ( snapshotFiles [ firstSnapshot ] . Key , "-" ) , strings . Split ( snapshotFiles [ secondSnapshot ] . Key , "-" )
firstSnapshotDate , secondSnapshotDate := firstSnapshotName [ len ( firstSnapshotName ) - 1 ] , secondSnapshotName [ len ( secondSnapshotName ) - 1 ]
return firstSnapshotDate < secondSnapshotDate
} )
delCount := len ( snapshotFiles ) - s . config . EtcdSnapshotRetention
for _ , df := range snapshotFiles [ : delCount ] {
logrus . Infof ( "Removing S3 snapshot: %s" , df . Key )
if err := s . client . RemoveObject ( ctx , s . config . EtcdS3BucketName , df . Key , minio . RemoveObjectOptions { } ) ; err != nil {
return err
}
}
return nil
}
func readS3EndpointCA ( endpointCA string ) ( [ ] byte , error ) {
ca , err := base64 . StdEncoding . DecodeString ( endpointCA )
if err != nil {
return os . ReadFile ( endpointCA )
}
return ca , nil
}
func setTransportCA ( tr http . RoundTripper , endpointCA string , insecureSkipVerify bool ) ( http . RoundTripper , error ) {
ca , err := readS3EndpointCA ( endpointCA )
if err != nil {
return tr , err
}
if ! isValidCertificate ( ca ) {
return tr , errors . New ( "endpoint-ca is not a valid x509 certificate" )
}
certPool := x509 . NewCertPool ( )
certPool . AppendCertsFromPEM ( ca )
tr . ( * http . Transport ) . TLSClientConfig = & tls . Config {
RootCAs : certPool ,
InsecureSkipVerify : insecureSkipVerify ,
}
return tr , nil
}
// isValidCertificate checks to see if the given
// byte slice is a valid x509 certificate.
func isValidCertificate ( c [ ] byte ) bool {
p , _ := pem . Decode ( c )
if p == nil {
return false
}
if _ , err := x509 . ParseCertificates ( p . Bytes ) ; err != nil {
return false
}
return true
}
func bucketLookupType ( endpoint string ) minio . BucketLookupType {
if strings . Contains ( endpoint , "aliyun" ) { // backwards compt with RKE1
return minio . BucketLookupDNS
}
return minio . BucketLookupAuto
}