diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 7858991be6..72a4ffc5e0 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -354,10 +354,10 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error { // If asked to restore from a snapshot, do so if e.config.ClusterResetRestorePath != "" { if e.config.EtcdS3 { + logrus.Infof("Retrieving etcd snapshot %s from S3", e.config.ClusterResetRestorePath) if err := e.initS3IfNil(ctx); err != nil { return err } - logrus.Infof("Retrieving etcd snapshot %s from S3", e.config.ClusterResetRestorePath) if err := e.s3.Download(ctx); err != nil { return err } diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 3409337d0b..abdbf8f86d 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -13,7 +13,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strconv" "strings" @@ -28,6 +27,7 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -100,30 +100,40 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { } logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName) - for config.Runtime.Core == nil { - runtime.Gosched() + s3 := &S3{ + config: config, + client: c, + nodeName: os.Getenv("NODE_NAME"), } - // cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ - var clusterID string - if ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}); err != nil { - logrus.Warnf("Failed to set cluster ID: %v", err) + if config.ClusterReset { + logrus.Debug("Skip setting S3 snapshot cluster ID and token during cluster-reset") } else { - clusterID = string(ns.UID) - } + if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) { + if config.Runtime.Core == nil { + return false, nil + } - tokenHash, err := util.GetTokenHash(config) - if err != nil { - return nil, errors.Wrap(err, "failed to get server token hash for etcd snapshot") + // cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ + ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrap(err, "failed to set S3 snapshot cluster ID") + } + s3.clusterID = string(ns.UID) + + tokenHash, err := util.GetTokenHash(config) + if err != nil { + return false, errors.Wrap(err, "failed to set S3 snapshot server token hash") + } + s3.tokenHash = tokenHash + + return true, nil + }); err != nil { + return nil, err + } } - return &S3{ - config: config, - client: c, - clusterID: clusterID, - tokenHash: tokenHash, - nodeName: os.Getenv("NODE_NAME"), - }, nil + return s3, nil } // upload uploads the given snapshot to the configured S3 @@ -229,7 +239,6 @@ func (s *S3) Download(ctx context.Context) error { snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath) - logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey) if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil { return err } @@ -243,6 +252,7 @@ func (s *S3) Download(ctx context.Context) error { // downloadSnapshot downloads the snapshot file from S3 using the minio API. func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error { + logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, key) ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() defer os.Chmod(file, 0600)