diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index 9a0a1ad09d..d15a3d285d 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -1,6 +1,7 @@ package etcdsnapshot import ( + "context" "encoding/json" "errors" "fmt" @@ -12,8 +13,7 @@ import ( "github.com/erikdubbelboer/gspt" "github.com/k3s-io/k3s/pkg/cli/cmds" - "github.com/k3s-io/k3s/pkg/cluster" - "github.com/k3s-io/k3s/pkg/daemons/config" + daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/server" util2 "github.com/k3s-io/k3s/pkg/util" @@ -22,16 +22,22 @@ import ( "gopkg.in/yaml.v2" ) +type etcdCommand struct { + etcd *etcd.ETCD + ctx context.Context +} + // commandSetup setups up common things needed // for each etcd command. -func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error { +func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*etcdCommand, error) { + ctx := signals.SetupSignalContext() gspt.SetProcTitle(os.Args[0]) nodeName := app.String("node-name") if nodeName == "" { h, err := os.Hostname() if err != nil { - return err + return nil, err } nodeName = h } @@ -40,33 +46,53 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) error { dataDir, err := server.ResolveDataDir(cfg.DataDir) if err != nil { - return err + return nil, err } - sc.DisableAgent = true - sc.ControlConfig.DataDir = dataDir - sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName - sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir - sc.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress - sc.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat) - sc.ControlConfig.EtcdS3 = cfg.EtcdS3 - sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint - sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA - sc.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify - sc.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey - sc.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey - sc.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName - sc.ControlConfig.EtcdS3Region = cfg.EtcdS3Region - sc.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder - sc.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure - sc.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout - sc.ControlConfig.Runtime = config.NewRuntime(nil) - sc.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") - sc.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") - sc.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") - sc.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") + config.DisableAgent = true + config.ControlConfig.DataDir = dataDir + config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName + config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir + config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress + config.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat) + config.ControlConfig.EtcdS3 = cfg.EtcdS3 + config.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint + config.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA + config.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify + config.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey + config.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey + config.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName + config.ControlConfig.EtcdS3Region = cfg.EtcdS3Region + config.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder + config.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure + config.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout + config.ControlConfig.Runtime = daemonconfig.NewRuntime(nil) + config.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") + config.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") + config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") + config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") - return nil + e := etcd.NewETCD() + if err := e.SetControlConfig(&config.ControlConfig); err != nil { + return nil, err + } + + initialized, err := e.IsInitialized() + if err != nil { + return nil, err + } + if !initialized { + return nil, fmt.Errorf("etcd database not found in %s", config.ControlConfig.DataDir) + } + + sc, err := server.NewContext(ctx, config.ControlConfig.Runtime.KubeConfigAdmin, false) + if err != nil { + return nil, err + } + config.ControlConfig.Runtime.K3s = sc.K3s + config.ControlConfig.Runtime.Core = sc.Core + + return &etcdCommand{etcd: e, ctx: ctx}, nil } // Run is an alias for Save, retained for compatibility reasons. @@ -85,43 +111,18 @@ func Save(app *cli.Context) error { func save(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { - return err - } - if len(app.Args()) > 0 { return util2.ErrCommandNoArgs } + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { + return err + } + serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - initialized, err := e.IsInitialized() - if err != nil { - return err - } - if !initialized { - return fmt.Errorf("etcd database not found in %s", serverConfig.ControlConfig.DataDir) - } - - cluster := cluster.New(&serverConfig.ControlConfig) - - if err := cluster.Bootstrap(ctx, true); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return cluster.Snapshot(ctx, &serverConfig.ControlConfig) + return ec.etcd.Snapshot(ec.ctx) } func Delete(app *cli.Context) error { @@ -134,7 +135,8 @@ func Delete(app *cli.Context) error { func delete(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } @@ -143,19 +145,7 @@ func delete(app *cli.Context, cfg *cmds.Server) error { return errors.New("no snapshots given for removal") } - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return e.DeleteSnapshots(ctx, app.Args()) + return ec.etcd.DeleteSnapshots(ec.ctx, app.Args()) } func List(app *cli.Context) error { @@ -165,7 +155,7 @@ func List(app *cli.Context) error { return list(app, &cmds.ServerConfig) } -var etcdListFormats = []string{"json", "yaml"} +var etcdListFormats = []string{"json", "yaml", "table"} func validEtcdListFormat(format string) bool { for _, supportedFormat := range etcdListFormats { @@ -179,17 +169,12 @@ func validEtcdListFormat(format string) bool { func list(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sf, err := e.ListSnapshots(ctx) + sf, err := ec.etcd.ListSnapshots(ec.ctx) if err != nil { return err } @@ -213,20 +198,9 @@ func list(app *cli.Context, cfg *cmds.Server) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) defer w.Flush() - if cfg.EtcdS3 { - fmt.Fprint(w, "Name\tSize\tCreated\n") - for _, s := range sf { - if s.NodeName == "s3" { - fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339)) - } - } - } else { - fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") - for _, s := range sf { - if s.NodeName != "s3" { - fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) - } - } + fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") + for _, s := range sf { + fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) } } @@ -243,23 +217,12 @@ func Prune(app *cli.Context) error { func prune(app *cli.Context, cfg *cmds.Server) error { var serverConfig server.Config - if err := commandSetup(app, cfg, &serverConfig); err != nil { + ec, err := commandSetup(app, cfg, &serverConfig) + if err != nil { return err } serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention - ctx := signals.SetupSignalContext() - e := etcd.NewETCD() - if err := e.SetControlConfig(&serverConfig.ControlConfig); err != nil { - return err - } - - sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin, false) - if err != nil { - return err - } - serverConfig.ControlConfig.Runtime.Core = sc.Core - - return e.PruneSnapshots(ctx) + return ec.etcd.PruneSnapshots(ec.ctx) } diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index a2c63a974e..4a5e636a21 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -424,15 +424,6 @@ func (c *Cluster) bootstrap(ctx context.Context) error { return c.storageBootstrap(ctx) } -// Snapshot is a proxy method to call the snapshot method on the managedb -// interface for etcd clusters. -func (c *Cluster) Snapshot(ctx context.Context, config *config.Control) error { - if c.managedDB == nil { - return errors.New("unable to perform etcd snapshot on non-etcd system") - } - return c.managedDB.Snapshot(ctx) -} - // compareConfig verifies that the config of the joining control plane node coincides with the cluster's config func (c *Cluster) compareConfig() error { token := c.config.AgentToken diff --git a/pkg/cluster/bootstrap_test.go b/pkg/cluster/bootstrap_test.go index b20a36fd68..3531fcab25 100644 --- a/pkg/cluster/bootstrap_test.go +++ b/pkg/cluster/bootstrap_test.go @@ -197,50 +197,3 @@ func TestCluster_migrateBootstrapData(t *testing.T) { }) } } - -func TestCluster_Snapshot(t *testing.T) { - type fields struct { - clientAccessInfo *clientaccess.Info - config *config.Control - managedDB managed.Driver - joining bool - storageStarted bool - saveBootstrap bool - shouldBootstrap bool - } - type args struct { - ctx context.Context - config *config.Control - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "Fail on non etcd cluster", - fields: fields{}, - args: args{ - ctx: context.Background(), - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &Cluster{ - clientAccessInfo: tt.fields.clientAccessInfo, - config: tt.fields.config, - managedDB: tt.fields.managedDB, - joining: tt.fields.joining, - storageStarted: tt.fields.storageStarted, - saveBootstrap: tt.fields.saveBootstrap, - shouldBootstrap: tt.fields.shouldBootstrap, - } - if err := c.Snapshot(tt.args.ctx, tt.args.config); (err != nil) != tt.wantErr { - t.Errorf("Cluster.Snapshot() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 70d07e65a1..b13a52108f 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io" "github.com/k3s-io/kine/pkg/endpoint" "github.com/rancher/wrangler/pkg/generated/controllers/core" "github.com/rancher/wrangler/pkg/leader" @@ -341,6 +342,7 @@ type ControlRuntime struct { ClientETCDCert string ClientETCDKey string + K3s *k3s.Factory Core *core.Factory Event record.EventRecorder EtcdConfig endpoint.ETCDConfig diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index e38a58ed88..952e98849a 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -7,17 +7,20 @@ import ( "encoding/base64" "encoding/pem" "fmt" - "io" + "io/ioutil" "net/http" + "net/textproto" "os" "path" "path/filepath" + "runtime" "sort" "strconv" "strings" "time" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" @@ -26,10 +29,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var ( + clusterIDKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-cluster-id") + nodeNameKey = textproto.CanonicalMIMEHeaderKey(version.Program + "-node-name") +) + // S3 maintains state for S3 functionality. type S3 struct { - config *config.Control - client *minio.Client + config *config.Control + client *minio.Client + clusterID string + nodeName string } // newS3 creates a new value of type s3 pointer with a @@ -83,23 +93,42 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { return nil, err } if !exists { - return nil, fmt.Errorf("bucket: %s does not exist", config.EtcdS3BucketName) + return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName) } logrus.Infof("S3 bucket %s exists", config.EtcdS3BucketName) + for config.Runtime.Core == nil { + runtime.Gosched() + } + + // cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ + var clusterID string + if ns, err := config.Runtime.Core.Core().V1().Namespace().Get(metav1.NamespaceSystem, metav1.GetOptions{}); err != nil { + logrus.Warnf("Failed to set cluster ID: %v", err) + } else { + clusterID = string(ns.UID) + } + return &S3{ - config: config, - client: c, + config: config, + client: c, + clusterID: clusterID, + nodeName: os.Getenv("NODE_NAME"), }, nil } // upload uploads the given snapshot to the configured S3 // compatible backend. func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) { - logrus.Infof("Uploading snapshot %s to S3", snapshot) + logrus.Infof("Uploading snapshot to s3://%s/%s", s.config.EtcdS3BucketName, snapshot) basename := filepath.Base(snapshot) + metadata := filepath.Join(filepath.Dir(snapshot), "..", metadataDir, basename) + snapshotKey := path.Join(s.config.EtcdS3Folder, basename) + metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, basename) + sf := &snapshotFile{ Name: basename, + Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey), NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, @@ -113,21 +142,11 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf Folder: s.config.EtcdS3Folder, Insecure: s.config.EtcdS3Insecure, }, + Compressed: strings.HasSuffix(snapshot, compressedExtension), metadataSource: extraMetadata, } - snapshotKey := path.Join(s.config.EtcdS3Folder, basename) - - toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) - defer cancel() - opts := minio.PutObjectOptions{NumThreads: 2} - if strings.HasSuffix(snapshot, compressedExtension) { - opts.ContentType = "application/zip" - sf.Compressed = true - } else { - opts.ContentType = "application/octet-stream" - } - uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, snapshot, opts) + uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) if err != nil { sf.Status = failedSnapshotStatus sf.Message = base64.StdEncoding.EncodeToString([]byte(err.Error())) @@ -135,48 +154,101 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf sf.Status = successfulSnapshotStatus sf.Size = uploadInfo.Size } + if _, err := s.uploadSnapshotMetadata(ctx, metadataKey, metadata); err != nil { + logrus.Warnf("Failed to upload snapshot metadata to S3: %v", err) + } else { + logrus.Infof("Uploaded snapshot metadata s3://%s/%s", s.config.EtcdS3BucketName, metadata) + } return sf, err } +// uploadSnapshot uploads the snapshot file to S3 using the minio API. +func (s *S3) uploadSnapshot(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { + opts := minio.PutObjectOptions{ + NumThreads: 2, + UserMetadata: map[string]string{ + clusterIDKey: s.clusterID, + nodeNameKey: s.nodeName, + }, + } + if strings.HasSuffix(key, compressedExtension) { + opts.ContentType = "application/zip" + } else { + opts.ContentType = "application/octet-stream" + } + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + + return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) +} + +// uploadSnapshotMetadata marshals and uploads the snapshot metadata to S3 using the minio API. +// The upload is silently skipped if no extra metadata is provided. +func (s *S3) uploadSnapshotMetadata(ctx context.Context, key, path string) (info minio.UploadInfo, err error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return minio.UploadInfo{}, nil + } + return minio.UploadInfo{}, err + } + + opts := minio.PutObjectOptions{ + NumThreads: 2, + ContentType: "application/json", + UserMetadata: map[string]string{ + clusterIDKey: s.clusterID, + nodeNameKey: s.nodeName, + }, + } + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + return s.client.FPutObject(ctx, s.config.EtcdS3BucketName, key, path, opts) +} + // download downloads the given snapshot from the configured S3 // compatible backend. func (s *S3) Download(ctx context.Context) error { snapshotKey := path.Join(s.config.EtcdS3Folder, s.config.ClusterResetRestorePath) - - logrus.Debugf("retrieving snapshot: %s", snapshotKey) - toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) - defer cancel() - - r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, snapshotKey, minio.GetObjectOptions{}) - if err != nil { - return nil - } - defer r.Close() - + metadataKey := path.Join(s.config.EtcdS3Folder, metadataDir, s.config.ClusterResetRestorePath) snapshotDir, err := snapshotDir(s.config, true) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } + snapshotFile := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) + metadataFile := filepath.Join(snapshotDir, "..", metadataDir, s.config.ClusterResetRestorePath) - fullSnapshotPath := filepath.Join(snapshotDir, s.config.ClusterResetRestorePath) - sf, err := os.Create(fullSnapshotPath) - if err != nil { + logrus.Debugf("Downloading snapshot from s3://%s/%s", s.config.EtcdS3BucketName, snapshotKey) + if err := s.downloadSnapshot(ctx, snapshotKey, snapshotFile); err != nil { return err } - defer sf.Close() - - stat, err := r.Stat() - if err != nil { + if err := s.downloadSnapshotMetadata(ctx, metadataKey, metadataFile); err != nil { return err } - if _, err := io.CopyN(sf, r, stat.Size); err != nil { - return err + s.config.ClusterResetRestorePath = snapshotFile + return nil +} + +// downloadSnapshot downloads the snapshot file from S3 using the minio API. +func (s *S3) downloadSnapshot(ctx context.Context, key, file string) error { + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + defer os.Chmod(file, 0600) + return s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) +} + +// downloadSnapshotMetadata downloads the snapshot metadata file from S3 using the minio API. +// No error is returned if the metadata file does not exist, as it is optional. +func (s *S3) downloadSnapshotMetadata(ctx context.Context, key, file string) error { + logrus.Debugf("Downloading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, key) + ctx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) + defer cancel() + defer os.Chmod(file, 0600) + err := s.client.FGetObject(ctx, s.config.EtcdS3BucketName, key, file, minio.GetObjectOptions{}) + if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { + return nil } - - s.config.ClusterResetRestorePath = fullSnapshotPath - - return os.Chmod(fullSnapshotPath, 0600) + return err } // snapshotPrefix returns the prefix used in the @@ -190,21 +262,27 @@ func (s *S3) snapshotRetention(ctx context.Context) error { if s.config.EtcdSnapshotRetention < 1 { return nil } - logrus.Infof("Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s", s.config.EtcdSnapshotRetention, s.snapshotPrefix()) + logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix()) var snapshotFiles []minio.ObjectInfo toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) defer cancel() - loo := minio.ListObjectsOptions{ - Recursive: true, + opts := minio.ListObjectsOptions{ Prefix: s.snapshotPrefix(), + Recursive: true, } - for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, loo) { + for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) { if info.Err != nil { return info.Err } + + // skip metadata + if path.Base(path.Dir(info.Key)) == metadataDir { + continue + } + snapshotFiles = append(snapshotFiles, info) } @@ -218,10 +296,17 @@ func (s *S3) snapshotRetention(ctx context.Context) error { }) for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { - logrus.Infof("Removing S3 snapshot: %s", df.Key) + logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key) if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { return err } + metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key)) + if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil { + if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound { + return nil + } + return err + } } return nil @@ -231,19 +316,17 @@ func (s *S3) snapshotRetention(ctx context.Context) error { // snapshots in S3 along with their relevant // metadata. func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) { - snapshots := make(map[string]snapshotFile) + snapshots := map[string]snapshotFile{} + metadatas := []string{} ctx, cancel := context.WithCancel(ctx) defer cancel() - var loo minio.ListObjectsOptions - if s.config.EtcdS3Folder != "" { - loo = minio.ListObjectsOptions{ - Prefix: s.config.EtcdS3Folder, - Recursive: true, - } + opts := minio.ListObjectsOptions{ + Prefix: s.config.EtcdS3Folder, + Recursive: true, } - objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) + objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts) for obj := range objects { if obj.Err != nil { @@ -253,7 +336,18 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) continue } + if o, err := s.client.StatObject(ctx, s.config.EtcdS3BucketName, obj.Key, minio.StatObjectOptions{}); err != nil { + logrus.Warnf("Failed to get object metadata: %v", err) + } else { + obj = o + } + filename := path.Base(obj.Key) + if path.Base(path.Dir(obj.Key)) == metadataDir { + metadatas = append(metadatas, obj.Key) + continue + } + basename, compressed := strings.CutSuffix(filename, compressedExtension) ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) if err != nil { @@ -262,6 +356,7 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) sf := snapshotFile{ Name: filename, + Location: fmt.Sprintf("s3://%s/%s", s.config.EtcdS3BucketName, obj.Key), NodeName: "s3", CreatedAt: &metav1.Time{ Time: time.Unix(ts, 0), @@ -282,6 +377,25 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf } + + for _, metadataKey := range metadatas { + filename := path.Base(metadataKey) + sfKey := generateSnapshotConfigMapKey(snapshotFile{Name: filename, NodeName: "s3"}) + if sf, ok := snapshots[sfKey]; ok { + logrus.Debugf("Loading snapshot metadata from s3://%s/%s", s.config.EtcdS3BucketName, metadataKey) + if obj, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, metadataKey, minio.GetObjectOptions{}); err != nil { + logrus.Warnf("Failed to get snapshot metadata: %v", err) + } else { + if m, err := ioutil.ReadAll(obj); err != nil { + logrus.Warnf("Failed to read snapshot metadata: %v", err) + } else { + sf.Metadata = base64.StdEncoding.EncodeToString(m) + snapshots[sfKey] = sf + } + } + } + } + return snapshots, nil } diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index d640b69ead..82fc553ad0 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -38,6 +38,7 @@ const ( maxConcurrentSnapshots = 1 pruneStepSize = 5 compressedExtension = ".zip" + metadataDir = ".metadata" ) var ( @@ -272,20 +273,20 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } } - if e.config.EtcdSnapshotCompress { - zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) - if err != nil { - return err - } - if err := os.Remove(snapshotPath); err != nil { - return err - } - snapshotPath = zipPath - logrus.Info("Compressed snapshot: " + snapshotPath) - } - - // If the snapshot attempt was successful, sf will be nil as we did not set it. + // If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message. if sf == nil { + if e.config.EtcdSnapshotCompress { + zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) + if err != nil { + return errors.Wrap(err, "failed to compress snapshot") + } + if err := os.Remove(snapshotPath); err != nil { + return errors.Wrap(err, "failed to remove uncompressed snapshot") + } + snapshotPath = zipPath + logrus.Info("Compressed snapshot: " + snapshotPath) + } + f, err := os.Stat(snapshotPath) if err != nil { return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") @@ -303,15 +304,19 @@ func (e *ETCD) Snapshot(ctx context.Context) error { metadataSource: extraMetadata, } + if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil { + return errors.Wrap(err, "failed to save local snapshot metadata") + } + if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to save local snapshot data to configmap") } + if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { return errors.Wrap(err, "failed to apply local snapshot retention policy") } if e.config.EtcdS3 { - logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ @@ -335,6 +340,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { metadataSource: extraMetadata, } } else { + logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) // upload will return a snapshotFile even on error - if there was an // error, it will be reflected in the status and message. sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now) @@ -414,10 +420,21 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { if err != nil { ts = file.ModTime().Unix() } + + // try to read metadata from disk; don't warn if it is missing as it will not exist + // for snapshot files from old releases or if there was no metadata provided. + var metadata string + metadataFile := filepath.Join(filepath.Dir(path), "..", metadataDir, file.Name()) + if m, err := os.ReadFile(metadataFile); err == nil { + logrus.Debugf("Loading snapshot metadata from %s", metadataFile) + metadata = base64.StdEncoding.EncodeToString(m) + } + sf := snapshotFile{ Name: file.Name(), Location: "file://" + filepath.Join(snapshotDir, file.Name()), NodeName: nodeName, + Metadata: metadata, CreatedAt: &metav1.Time{ Time: time.Unix(ts, 0), }, @@ -462,7 +479,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error { if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client during prune: %v", err) + logrus.Warnf("Unable to initialize S3 client: %v", err) } else { if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Error applying S3 snapshot retention policy: %v", err) @@ -478,6 +495,7 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro snapshotFiles := map[string]snapshotFile{} if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) return nil, err } sfs, err := e.s3.listSnapshots(ctx) @@ -506,13 +524,30 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { return errors.Wrap(err, "failed to get the snapshot dir") } - if e.config.EtcdS3 { - logrus.Info("Removing the given etcd snapshot(s) from S3") - logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots) + logrus.Info("Removing the given locally stored etcd snapshot(s)") + logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots) - if e.initS3IfNil(ctx); err != nil { + for _, s := range snapshots { + // check if the given snapshot exists. If it does, + // remove it, otherwise continue. + sf := filepath.Join(snapshotDir, s) + if _, err := os.Stat(sf); os.IsNotExist(err) { + logrus.Infof("Snapshot %s, does not exist", s) + continue + } + if err := os.Remove(sf); err != nil { return err } + logrus.Debug("Removed snapshot ", s) + } + + if e.config.EtcdS3 { + if e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) + return err + } + logrus.Info("Removing the given etcd snapshot(s) from S3") + logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots) objectsCh := make(chan minio.ObjectInfo) @@ -566,23 +601,6 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { } } - logrus.Info("Removing the given locally stored etcd snapshot(s)") - logrus.Debugf("Attempting to remove the given locally stored etcd snapshot(s): %v", snapshots) - - for _, s := range snapshots { - // check if the given snapshot exists. If it does, - // remove it, otherwise continue. - sf := filepath.Join(snapshotDir, s) - if _, err := os.Stat(sf); os.IsNotExist(err) { - logrus.Infof("Snapshot %s, does not exist", s) - continue - } - if err := os.Remove(sf); err != nil { - return err - } - logrus.Debug("Removed snapshot ", s) - } - return e.ReconcileSnapshotData(ctx) } @@ -735,6 +753,11 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { var s3ListSuccessful bool if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) + return err + } + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) } else { @@ -906,10 +929,14 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) for _, df := range snapshotFiles[retention:] { snapshotPath := filepath.Join(snapshotDir, df.Name) + metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name) logrus.Infof("Removing local snapshot %s", snapshotPath) if err := os.Remove(snapshotPath); err != nil { return err } + if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) { + return err + } } return nil @@ -919,3 +946,24 @@ func isTooLargeError(err error) bool { // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) } + +// saveSnapshotMetadata writes extra metadata to disk. +// The upload is silently skipped if no extra metadata is provided. +func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error { + if extraMetadata == nil || len(extraMetadata.Data) == 0 { + return nil + } + + dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir) + filename := filepath.Base(snapshotPath) + metadataPath := filepath.Join(dir, filename) + logrus.Infof("Saving snapshot metadata to %s", metadataPath) + m, err := json.Marshal(extraMetadata.Data) + if err != nil { + return err + } + if err := os.MkdirAll(dir, 0700); err != nil { + return err + } + return os.WriteFile(metadataPath, m, 0700) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index e18001a51a..90e8084b78 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -115,6 +115,7 @@ func runControllers(ctx context.Context, config *Config) error { controlConfig.Runtime.NodePasswdFile); err != nil { logrus.Warn(errors.Wrap(err, "error migrating node-password file")) } + controlConfig.Runtime.K3s = sc.K3s controlConfig.Runtime.Event = sc.Event controlConfig.Runtime.Core = sc.Core