From 7464007037a692817368146e53886a0775b27e1b Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Mon, 2 Oct 2023 23:20:22 +0000 Subject: [PATCH] Store extra metadata and cluster ID for snapshots Write the extra metadata both locally and to S3. These files are placed such that they will not be used by older versions of K3s that do not make use of them. Signed-off-by: Brad Davidson --- pkg/cli/etcdsnapshot/etcd_snapshot.go | 183 +++++++++------------ pkg/cluster/bootstrap.go | 9 - pkg/cluster/bootstrap_test.go | 47 ------ pkg/daemons/config/types.go | 2 + pkg/etcd/s3.go | 226 +++++++++++++++++++------- pkg/etcd/snapshot.go | 120 ++++++++++---- pkg/server/server.go | 1 + 7 files changed, 330 insertions(+), 258 deletions(-) diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index 714ccc982a..93dd738cd7 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -1,6 +1,7 @@ package etcdsnapshot import ( + "context" "encoding/json" "errors" "fmt" @@ -12,8 +13,7 @@ import ( "github.com/erikdubbelboer/gspt" "github.com/k3s-io/k3s/pkg/cli/cmds" - "github.com/k3s-io/k3s/pkg/cluster" - "github.com/k3s-io/k3s/pkg/daemons/config" + daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/server" util2 "github.com/k3s-io/k3s/pkg/util" @@ -22,16 +22,22 @@ import ( "gopkg.in/yaml.v2" ) +type etcdCommand struct { + etcd *etcd.ETCD + ctx context.Context +} + // commandSetup setups up common things needed // for each etcd command. -func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error { +func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*etcdCommand, error) { + ctx := signals.SetupSignalContext() gspt.SetProcTitle(os.Args[0]) nodeName := app.String("node-name") if nodeName == "" { h, err := os.Hostname() if err != nil { - return err + return nil, err } nodeName = h } @@ -40,33 +46,53 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error { dataDir, err := server.ResolveDataDir(cfg.DataDir) if err != nil { - return err + return nil, err } - sc.DisableAgent = true - sc.ControlConfig.DataDir = dataDir - sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName - sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir - sc.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress - sc.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat) - sc.ControlConfig.EtcdS3 = cfg.EtcdS3 - sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint - sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA - sc.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify - sc.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey - sc.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey - sc.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName - sc.ControlConfig.EtcdS3Region = cfg.EtcdS3Region - sc.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder - sc.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure - sc.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout - sc.ControlConfig.Runtime = config.NewRuntime(nil) - sc.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") - sc.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") - sc.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") - sc.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") + config.DisableAgent = true + config.ControlConfig.DataDir = dataDir + config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName + config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir + config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress + config.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat) + config.ControlConfig.EtcdS3 = cfg.EtcdS3 + config.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint + config.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA + config.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify + config.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey + config.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey + config.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName + config.ControlConfig.EtcdS3Region = cfg.EtcdS3Region + config.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder + config.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure + config.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout + config.ControlConfig.Runtime = daemonconfig.NewRuntime(nil) + config.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") + config.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") + config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") + config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") - return nil + e := etcd.NewETCD() + if err := e.SetControlConfig(&config.ControlConfig); err != nil { + return nil, err + } + + initialized, err := e.IsInitialized() + if err != nil { + return nil, err + } + if !initialized { + return nil, fmt.Errorf("etcd database not found in %s", config.ControlConfig.DataDir) + } + + sc, err := server.NewContext(ctx, config.ControlConfig.Runtime.KubeConfigAdmin, false) + if err != nil { + return nil, err + } + config.ControlConfig.Runtime.K3s = sc.K3s + config.ControlConfig.Runtime.Core = sc.Core + + return &etcdCommand{etcd: e, ctx: ctx}, nil } // Save triggers an on-demand etcd snapshot operation @@ -80,43 +106,18 @@ func Save(app *cli.Context) error { func save(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { - return err - } - if len(app.Args()) > 0 { return util2.ErrCommandNoArgs } + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { + return err + } + serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - initialized, err := e.IsInitialized() - if err != nil { - return err - } - if !initialized { - return fmt.Errorf("etcd database not found in %s", serverConfig.ControlConfig.DataDir) - } - - cluster := cluster.New(&serverConfig.ControlConfig) - - if err := cluster.Bootstrap(ctx, true); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return cluster.Snapshot(ctx, &serverConfig.ControlConfig) + return ec.etcd.Snapshot(ec.ctx) } func Delete(app *cli.Context) error { @@ -129,7 +130,8 @@ func Delete(app *cli.Context) error { func delete(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } @@ -138,19 +140,7 @@ func delete(app *cli.Context, cfg *cmds.Server) error { return errors.New("no snapshots given for removal") } - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return e.DeleteSnapshots(ctx, app.Args()) + return ec.etcd.DeleteSnapshots(ec.ctx, app.Args()) } func List(app *cli.Context) error { @@ -160,7 +150,7 @@ func List(app *cli.Context) error { return list(app, &cmds.ServerConfig) } -var etcdListFormats = []string{"json", "yaml"} +var etcdListFormats = []string{"json", "yaml", "table"} func validEtcdListFormat(format string) bool { for _, supportedFormat := range etcdListFormats { @@ -174,17 +164,12 @@ func validEtcdListFormat(format string) bool { func list(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sf, err := e.ListSnapshots(ctx) + sf, err := ec.etcd.ListSnapshots(ec.ctx) if err != nil { return err } @@ -208,20 +193,9 @@ func list(app *cli.Context, cfg *cmds.Server) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) defer w.Flush() - if cfg.EtcdS3 { - fmt.Fprint(w, "Name\tSize\tCreated\n") - for _, s := range sf { - if s.NodeName == "s3" { - fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339)) - } - } - } else { - fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") - for _, s := range sf { - if s.NodeName != "s3" { - fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) - } - } + fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") + for _, s := range sf { + fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) } } @@ -238,23 +212,12 @@ func Prune(app *cli.Context) error { func prune(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return e.PruneSnapshots(ctx) + return ec.etcd.PruneSnapshots(ec.ctx) } diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index a2c63a974e..4a5e636a21 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -424,15 +424,6 @@ func (c *Cluster) bootstrap(ctx context.Context) error { return c.storageBootstrap(ctx) } -// Snapshot is a proxy method to call the snapshot method on the managedb -// interface for etcd clusters. -func (c *Cluster) Snapshot(ctx context.Context, config *config.Control) error { - if c.managedDB == nil { - return errors.New("unable to perform etcd snapshot on non-etcd system") - } - return c.managedDB.Snapshot(ctx) -} - // compareConfig verifies that the config of the joining control plane node coincides with the cluster's config func (c *Cluster) compareConfig() error { token := c.config.AgentToken diff --git a/pkg/cluster/bootstrap_test.go b/pkg/cluster/bootstrap_test.go index b20a36fd68..3531fcab25 100644 --- a/pkg/cluster/bootstrap_test.go +++ b/pkg/cluster/bootstrap_test.go @@ -197,50 +197,3 @@ func TestCluster_migrateBootstrapData(t *testing.T) { }) } } - -func TestCluster_Snapshot(t *testing.T) { - type fields struct { - clientAccessInfo *clientaccess.Info - config *config.Control - managedDB managed.Driver - joining bool - storageStarted bool - saveBootstrap bool - shouldBootstrap bool - } - type args struct { - ctx context.Context - config *config.Control - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "Fail on non etcd cluster", - fields: fields{}, - args: args{ - ctx: context.Background(), - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &Cluster{ - clientAccessInfo: tt.fields.clientAccessInfo, - config: tt.fields.config, - managedDB: tt.fields.managedDB, - joining: tt.fields.joining, - storageStarted: tt.fields.storageStarted, - saveBootstrap: tt.fields.saveBootstrap, - shouldBootstrap: tt.fields.shouldBootstrap, - } - if err := c.Snapshot(tt.args.ctx, tt.args.config); (err != nil) != tt.wantErr { - t.Errorf("Cluster.Snapshot() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 6bd022735e..7744ce26db 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io" "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/wrangler/pkg/generated/controllers/core" "github.com/rancher/wrangler/pkg/leader" @@ -342,6 +343,7 @@ type ControlRuntime struct { ClientETCDCert string ClientETCDKey string + K3s *k3s.Factory Core *core.Factory Event record.EventRecorder EtcdConfig endpoint.ETCDConfig diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index e38a58ed88..952e98849a 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -7,17 +7,20 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "io" + "io/ioutil" "net/http" + "net/textproto" "os" "path" "path/filepath" + "runtime" "sort" "strconv" "strings" "time" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" @@ -26,10 +29,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var ( + clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id") + nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name") +) + // S3 maintains state for S3 functionality. type S3 struct { - config *config.Control - client *minio.Client + config *config.Control + client *minio.Client + clusterID string + nodeName string } // newS3 creates a new value of type s3 pointer with a @@ -83,23 +93,42 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { return nil, err } if !exists { - return nil, fmt.Errorf("bucket: %s does not exist", config.EtcdS3BucketName) + return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName) } logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName) + for config.Runtime.Core == nil { + runtime.Gosched() + } + + // cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ + var clusterID string + if ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}); err != nil { + logrus.Warnf("Failed to set cluster ID: %v", err) + } else { + clusterID = string(ns.UID) + } + return &S3{ - config: config, - client: c, + config: config, + client: c, + clusterID: clusterID, + nodeName: os.Getenv("NODE_NAME"), }, nil } // upload uploads the given snapshot to the configured S3 // compatible backend. func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) { - logrus.Infof("Uploading snapshot %s to S3", snapshot) + logrus.Infof("Uploading snapshot to s3://%s/%s", s.config.EtcdS3BucketName, snapshot) basename := filepath.Base(snapshot) + metadata := filepath.Join(filepath.Dir(snapshot), "..", metadataDir, basename) + snapshotKey := path.Join(s.config.EtcdS3Folder, basename) + metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, basename) + sf := &snapshotFile{ Name: basename, + Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey), NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, @@ -113,21 +142,11 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf Folder: s.config.EtcdS3Folder, Insecure: s.config.EtcdS3Insecure, }, + Compressed: strings.HasSuffix(snapshot, compressedExtension), metadataSource: extraMetadata, } - snapshotKey := path.Join(s.config.EtcdS3Folder, basename) - - toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) - defer cancel() - opts := minio.PutObjectOptions{NumThreads: 2} - if strings.HasSuffix(snapshot, compressedExtension) { - opts.ContentType = "application/zip" - sf.Compressed = true - } else { - opts.ContentType = "application/octet-stream" - } - uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, snapshot, opts) + uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) if err != nil { sf.Status = failedSnapshotStatus sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) @@ -135,48 +154,101 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf sf.Status = successfulSnapshotStatus sf.Size = uploadInfo.Size } + if _, err := s.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil { + logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err) + } else { + logrus.Infof("Uploaded snapshot metadata s3://%s/%s", s.config.EtcdS3BucketName, metadata) + } return sf, err } +// uploadSnapshot uploads the snapshot file to S3 using the minio API. +func (s *S3) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { + opts := minio.PutObjectOptions{ + NumThreads: 2, + UserMetadata: map[string]string{ + clusterIDKey: s.clusterID, + nodeNameKey: s.nodeName, + }, + } + if strings.HasSuffix(key, compressedExtension) { + opts.ContentType = "application/zip" + } else { + opts.ContentType = "application/octet-stream" + } + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + + return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) +} + +// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API. +// The upload is silently skipped if no extra metadata is provided. +func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return minio.UploadInfo{}, nil + } + return minio.UploadInfo{}, err + } + + opts := minio.PutObjectOptions{ + NumThreads: 2, + ContentType: "application/json", + UserMetadata: map[string]string{ + clusterIDKey: s.clusterID, + nodeNameKey: s.nodeName, + }, + } + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) +} + // download downloads the given snapshot from the configured S3 // compatible backend. func (s *S3) Download(ctx context.Context) error { snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) - - logrus.Debugf("retrieving snapshot: %s", snapshotKey) - toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) - defer cancel() - - r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, minio.GetObjectOptions{}) - if err != nil { - return nil - } - defer r.Close() - + metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, s.config.ClusterResetRestorePath) snapshotDir, err := snapshotDir(s.config, true) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } + snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) + metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath) - fullSnapshotPath := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) - sf, err := os.Create(fullSnapshotPath) - if err != nil { + logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey) + if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil { return err } - defer sf.Close() - - stat, err := r.Stat() - if err != nil { + if err := s.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil { return err } - if _, err := io.CopyN(sf, r, stat.Size); err != nil { - return err + s.config.ClusterResetRestorePath = snapshotFile + return nil +} + +// downloadSnapshot downloads the snapshot file from S3 using the minio API. +func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error { + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + defer os.Chmod(file, 0600) + return s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) +} + +// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API. +// No error is returned if the metadata file does not exist, as it is optional. +func (s *S3) downloadSnapshotMetadata(ctx context.Context, key, file string) error { + logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, key) + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + defer os.Chmod(file, 0600) + err := s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) + if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { + return nil } - - s.config.ClusterResetRestorePath = fullSnapshotPath - - return os.Chmod(fullSnapshotPath, 0600) + return err } // snapshotPrefix returns the prefix used in the @@ -190,21 +262,27 @@ func (s *S3) snapshotRetention(ctx context.Context) error { if s.config.EtcdSnapshotRetention < 1 { return nil } - logrus.Infof("Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s", s.config.EtcdSnapshotRetention, s.snapshotPrefix()) + logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix()) var snapshotFiles []minio.ObjectInfo toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() - loo := minio.ListObjectsOptions{ - Recursive: true, + opts := minio.ListObjectsOptions{ Prefix: s.snapshotPrefix(), + Recursive: true, } - for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, loo) { + for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) { if info.Err != nil { return info.Err } + + // skip metadata + if path.Base(path.Dir(info.Key)) == metadataDir { + continue + } + snapshotFiles = append(snapshotFiles, info) } @@ -218,10 +296,17 @@ func (s *S3) snapshotRetention(ctx context.Context) error { }) for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { - logrus.Infof("Removing S3 snapshot: %s", df.Key) + logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key) if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { return err } + metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key)) + if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil { + if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { + return nil + } + return err + } } return nil @@ -231,19 +316,17 @@ func (s *S3) snapshotRetention(ctx context.Context) error { // snapshots in S3 along with their relevant // metadata. func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) { - snapshots := make(map[string]snapshotFile) + snapshots := map[string]snapshotFile{} + metadatas := []string{} ctx, cancel := context.WithCancel(ctx) defer cancel() - var loo minio.ListObjectsOptions - if s.config.EtcdS3Folder != "" { - loo = minio.ListObjectsOptions{ - Prefix: s.config.EtcdS3Folder, - Recursive: true, - } + opts := minio.ListObjectsOptions{ + Prefix: s.config.EtcdS3Folder, + Recursive: true, } - objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) + objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts) for obj := range objects { if obj.Err != nil { @@ -253,7 +336,18 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) continue } + if o, err := s.client.StatObject(ctx, s.config.EtcdS3BucketName, obj.Key, minio.StatObjectOptions{}); err != nil { + logrus.Warnf("Failed to get object metadata: %v", err) + } else { + obj = o + } + filename := path.Base(obj.Key) + if path.Base(path.Dir(obj.Key)) == metadataDir { + metadatas = append(metadatas, obj.Key) + continue + } + basename, compressed := strings.CutSuffix(filename, compressedExtension) ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) if err != nil { @@ -262,6 +356,7 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) sf := snapshotFile{ Name: filename, + Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, obj.Key), NodeName: "s3", CreatedAt: &metav1.Time{ Time: time.Unix(ts, 0), @@ -282,6 +377,25 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf } + + for _, metadataKey := range metadatas { + filename := path.Base(metadataKey) + sfKey := generateSnapshotConfigMapKey(snapshotFile{Name: filename, NodeName: "s3"}) + if sf, ok := snapshots[sfKey]; ok { + logrus.Debugf("Loading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, metadataKey) + if obj, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.GetObjectOptions{}); err != nil { + logrus.Warnf("Failed to get snapshot metadata: %v", err) + } else { + if m, err := ioutil.ReadAll(obj); err != nil { + logrus.Warnf("Failed to read snapshot metadata: %v", err) + } else { + sf.Metadata = base64.StdEncoding.EncodeToString(m) + snapshots[sfKey] = sf + } + } + } + } + return snapshots, nil } diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index d640b69ead..82fc553ad0 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -38,6 +38,7 @@ const ( maxConcurrentSnapshots = 1 pruneStepSize = 5 compressedExtension = ".zip" + metadataDir = ".metadata" ) var ( @@ -272,20 +273,20 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } } - if e.config.EtcdSnapshotCompress { - zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) - if err != nil { - return err - } - if err := os.Remove(snapshotPath); err != nil { - return err - } - snapshotPath = zipPath - logrus.Info("Compressed snapshot: " + snapshotPath) - } - - // If the snapshot attempt was successful, sf will be nil as we did not set it. + // If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message. if sf == nil { + if e.config.EtcdSnapshotCompress { + zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) + if err != nil { + return errors.Wrap(err, "failed to compress snapshot") + } + if err := os.Remove(snapshotPath); err != nil { + return errors.Wrap(err, "failed to remove uncompressed snapshot") + } + snapshotPath = zipPath + logrus.Info("Compressed snapshot: " + snapshotPath) + } + f, err := os.Stat(snapshotPath) if err != nil { return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") @@ -303,15 +304,19 @@ func (e *ETCD) Snapshot(ctx context.Context) error { metadataSource: extraMetadata, } + if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil { + return errors.Wrap(err, "failed to save local snapshot metadata") + } + if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to save local snapshot data to configmap") } + if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { return errors.Wrap(err, "failed to apply local snapshot retention policy") } if e.config.EtcdS3 { - logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ @@ -335,6 +340,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { metadataSource: extraMetadata, } } else { + logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) // upload will return a snapshotFile even on error - if there was an // error, it will be reflected in the status and message. sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now) @@ -414,10 +420,21 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { if err != nil { ts = file.ModTime().Unix() } + + // try to read metadata from disk; don't warn if it is missing as it will not exist + // for snapshot files from old releases or if there was no metadata provided. + var metadata string + metadataFile := filepath.Join(filepath.Dir(path), "..", metadataDir, file.Name()) + if m, err := os.ReadFile(metadataFile); err == nil { + logrus.Debugf("Loading snapshot metadata from %s", metadataFile) + metadata = base64.StdEncoding.EncodeToString(m) + } + sf := snapshotFile{ Name: file.Name(), Location: "file://" + filepath.Join(snapshotDir, file.Name()), NodeName: nodeName, + Metadata: metadata, CreatedAt: &metav1.Time{ Time: time.Unix(ts, 0), }, @@ -462,7 +479,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error { if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client during prune: %v", err) + logrus.Warnf("Unable to initialize S3 client: %v", err) } else { if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Error applying S3 snapshot retention policy: %v", err) @@ -478,6 +495,7 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro snapshotFiles := map[string]snapshotFile{} if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) return nil, err } sfs, err := e.s3.listSnapshots(ctx) @@ -506,13 +524,30 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { return errors.Wrap(err, "failed to get the snapshot dir") } - if e.config.EtcdS3 { - logrus.Info("Removing the given etcd snapshot(s) from S3") - logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots) + logrus.Info("Removing the given locally stored etcd snapshot(s)") + logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots) - if e.initS3IfNil(ctx); err != nil { + for _, s := range snapshots { + // check if the given snapshot exists. If it does, + // remove it, otherwise continue. + sf := filepath.Join(snapshotDir, s) + if _, err := os.Stat(sf); os.IsNotExist(err) { + logrus.Infof("Snapshot %s, does not exist", s) + continue + } + if err := os.Remove(sf); err != nil { return err } + logrus.Debug("Removed snapshot ", s) + } + + if e.config.EtcdS3 { + if e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) + return err + } + logrus.Info("Removing the given etcd snapshot(s) from S3") + logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots) objectsCh := make(chan minio.ObjectInfo) @@ -566,23 +601,6 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { } } - logrus.Info("Removing the given locally stored etcd snapshot(s)") - logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots) - - for _, s := range snapshots { - // check if the given snapshot exists. If it does, - // remove it, otherwise continue. - sf := filepath.Join(snapshotDir, s) - if _, err := os.Stat(sf); os.IsNotExist(err) { - logrus.Infof("Snapshot %s, does not exist", s) - continue - } - if err := os.Remove(sf); err != nil { - return err - } - logrus.Debug("Removed snapshot ", s) - } - return e.ReconcileSnapshotData(ctx) } @@ -735,6 +753,11 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { var s3ListSuccessful bool if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) + return err + } + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) } else { @@ -906,10 +929,14 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) for _, df := range snapshotFiles[retention:] { snapshotPath := filepath.Join(snapshotDir, df.Name) + metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name) logrus.Infof("Removing local snapshot %s", snapshotPath) if err := os.Remove(snapshotPath); err != nil { return err } + if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) { + return err + } } return nil @@ -919,3 +946,24 @@ func isTooLargeError(err error) bool { // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) } + +// saveSnapshotMetadata writes extra metadata to disk. +// The upload is silently skipped if no extra metadata is provided. +func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error { + if extraMetadata == nil || len(extraMetadata.Data) == 0 { + return nil + } + + dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir) + filename := filepath.Base(snapshotPath) + metadataPath := filepath.Join(dir, filename) + logrus.Infof("Saving snapshot metadata to %s", metadataPath) + m, err := json.Marshal(extraMetadata.Data) + if err != nil { + return err + } + if err := os.MkdirAll(dir, 0700); err != nil { + return err + } + return os.WriteFile(metadataPath, m, 0700) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2734f6ed51..7ddc7c23fa 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -114,6 +114,7 @@ func runControllers(ctx context.Context, config *Config) error { controlConfig.Runtime.NodePasswdFile); err != nil { logrus.Warn(errors.Wrap(err, "error migrating node-password file")) } + controlConfig.Runtime.K3s = sc.K3s controlConfig.Runtime.Event = sc.Event controlConfig.Runtime.Core = sc.Core