@ -13,7 +13,6 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
@ -28,6 +27,7 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
@ -100,30 +100,40 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
}
logrus . Infof ( "S3 bucket %s exists" , config . EtcdS3BucketName )
for config . Runtime . Core == nil {
runtime . Gosched ( )
s3 := & S3 {
config : config ,
client : c ,
nodeName : os . Getenv ( "NODE_NAME" ) ,
}
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
var clusterID string
if ns , err := config . Runtime . Core . Core ( ) . V1 ( ) . Namespace ( ) . Get ( metav1 . NamespaceSystem , metav1 . GetOptions { } ) ; err != nil {
logrus . Warnf ( "Failed to set cluster ID: %v" , err )
if config . ClusterReset {
logrus . Debug ( "Skip setting S3 snapshot cluster ID and token during cluster-reset" )
} else {
clusterID = string ( ns . UID )
}
if err := wait . PollImmediateUntilWithContext ( ctx , time . Second , func ( ctx context . Context ) ( bool , error ) {
if config . Runtime . Core == nil {
return false , nil
}
tokenHash , err := util . GetTokenHash ( config )
if err != nil {
return nil , errors . Wrap ( err , "failed to get server token hash for etcd snapshot" )
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
ns , err := config . Runtime . Core . Core ( ) . V1 ( ) . Namespace ( ) . Get ( metav1 . NamespaceSystem , metav1 . GetOptions { } )
if err != nil {
return false , errors . Wrap ( err , "failed to set S3 snapshot cluster ID" )
}
s3 . clusterID = string ( ns . UID )
tokenHash , err := util . GetTokenHash ( config )
if err != nil {
return false , errors . Wrap ( err , "failed to set S3 snapshot server token hash" )
}
s3 . tokenHash = tokenHash
return true , nil
} ) ; err != nil {
return nil , err
}
}
return & S3 {
config : config ,
client : c ,
clusterID : clusterID ,
tokenHash : tokenHash ,
nodeName : os . Getenv ( "NODE_NAME" ) ,
} , nil
return s3 , nil
}
// upload uploads the given snapshot to the configured S3
@ -229,7 +239,6 @@ func (s *S3) Download(ctx context.Context) error {
snapshotFile := filepath . Join ( snapshotDir , s . config . ClusterResetRestorePath )
metadataFile := filepath . Join ( snapshotDir , ".." , metadataDir , s . config . ClusterResetRestorePath )
logrus . Debugf ( "Downloading snapshot from s3://%s/%s" , s . config . EtcdS3BucketName , snapshotKey )
if err := s . downloadSnapshot ( ctx , snapshotKey , snapshotFile ) ; err != nil {
return err
}
@ -243,6 +252,7 @@ func (s *S3) Download(ctx context.Context) error {
// downloadSnapshot downloads the snapshot file from S3 using the minio API.
func ( s * S3 ) downloadSnapshot ( ctx context . Context , key , file string ) error {
logrus . Debugf ( "Downloading snapshot from s3://%s/%s" , s . config . EtcdS3BucketName , key )
ctx , cancel := context . WithTimeout ( ctx , s . config . EtcdS3Timeout )
defer cancel ( )
defer os . Chmod ( file , 0600 )