diff --git a/.golangci.json b/.golangci.json index 3e2499bfaf..c8b5684071 100644 --- a/.golangci.json +++ b/.golangci.json @@ -33,6 +33,10 @@ { "linters": "revive", "text": "should have comment" + }, + { + "linters": "revive", + "text": "exported" } ] } diff --git a/pkg/cli/cmds/etcd_snapshot.go b/pkg/cli/cmds/etcd_snapshot.go index e1d5c7208a..3325821b10 100644 --- a/pkg/cli/cmds/etcd_snapshot.go +++ b/pkg/cli/cmds/etcd_snapshot.go @@ -32,6 +32,11 @@ var EtcdSnapshotFlags = []cli.Flag{ Destination: &ServerConfig.EtcdSnapshotName, Value: "on-demand", }, + &cli.BoolFlag{ + Name: "snapshot-compress,etcd-snapshot-compress", + Usage: "(db) Compress etcd snapshot", + Destination: &ServerConfig.EtcdSnapshotCompress, + }, &cli.BoolFlag{ Name: "s3,etcd-s3", Usage: "(db) Enable backup to S3", diff --git a/pkg/cli/cmds/server.go b/pkg/cli/cmds/server.go index 3d7a58fd39..9ddbf05615 100644 --- a/pkg/cli/cmds/server.go +++ b/pkg/cli/cmds/server.go @@ -84,6 +84,7 @@ type Server struct { EtcdSnapshotDir string EtcdSnapshotCron string EtcdSnapshotRetention int + EtcdSnapshotCompress bool EtcdS3 bool EtcdS3Endpoint string EtcdS3EndpointCA string @@ -289,6 +290,11 @@ var ServerFlags = []cli.Flag{ Usage: "(db) Directory to save db snapshots. (Default location: ${data-dir}/db/snapshots)", Destination: &ServerConfig.EtcdSnapshotDir, }, + &cli.BoolFlag{ + Name: "etcd-snapshot-compress", + Usage: "(db) Compress etcd snapshot", + Destination: &ServerConfig.EtcdSnapshotCompress, + }, &cli.BoolFlag{ Name: "etcd-s3", Usage: "(db) Enable backup to S3", diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index ccdc72d67a..4a3b3b0f91 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -39,6 +39,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) (string sc.ControlConfig.DataDir = cfg.DataDir sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir + sc.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress sc.ControlConfig.EtcdS3 = cfg.EtcdS3 sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA diff --git a/pkg/daemons/config/types.go b/pkg/daemons/config/types.go index 6f6014fd55..84400ab7b2 100644 --- a/pkg/daemons/config/types.go +++ b/pkg/daemons/config/types.go @@ -170,6 +170,7 @@ type Control struct { EtcdSnapshotDir string EtcdSnapshotCron string EtcdSnapshotRetention int + EtcdSnapshotCompress bool EtcdS3 bool EtcdS3Endpoint string EtcdS3EndpointCA string diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 94077dcf9e..6aac98e11b 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -1,12 +1,14 @@ package etcd import ( + "archive/zip" "bytes" "context" "crypto/tls" "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" @@ -941,6 +943,93 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err return nil } +const compressedExtension = ".zip" + +// compressSnapshot compresses the given snapshot and provides the +// caller with the path to the file. +func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string) (string, error) { + logrus.Info("Compressing etcd snapshot file: " + snapshotName) + + zippedSnapshotName := snapshotName + compressedExtension + zipPath := filepath.Join(snapshotDir, zippedSnapshotName) + + zf, err := os.Create(zipPath) + if err != nil { + return "", err + } + defer zf.Close() + + zipWriter := zip.NewWriter(zf) + defer zipWriter.Close() + + uncompressedPath := filepath.Join(snapshotDir, snapshotName) + fileToZip, err := os.Open(uncompressedPath) + if err != nil { + os.Remove(zipPath) + return "", err + } + defer fileToZip.Close() + + info, err := fileToZip.Stat() + if err != nil { + os.Remove(zipPath) + return "", err + } + + header, err := zip.FileInfoHeader(info) + if err != nil { + os.Remove(zipPath) + return "", err + } + + header.Name = snapshotName + header.Method = zip.Deflate + header.Modified = time.Now() + + writer, err := zipWriter.CreateHeader(header) + if err != nil { + os.Remove(zipPath) + return "", err + } + _, err = io.Copy(writer, fileToZip) + + return zipPath, err +} + +// decompressSnapshot decompresses the given snapshot and provides the caller +// with the full path to the uncompressed snapshot. +func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, error) { + logrus.Info("Decompressing etcd snapshot file: " + snapshotFile) + + r, err := zip.OpenReader(snapshotFile) + if err != nil { + return "", err + } + defer r.Close() + + var decompressed *os.File + for _, sf := range r.File { + decompressed, err = os.OpenFile(strings.Replace(sf.Name, compressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode()) + if err != nil { + return "", err + } + defer decompressed.Close() + + ss, err := sf.Open() + if err != nil { + return "", err + } + defer ss.Close() + + if _, err := io.Copy(decompressed, ss); err != nil { + os.Remove("") + return "", err + } + } + + return decompressed.Name(), nil +} + // Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed // snapshots in excess of the retention limits. This method is used in the internal cron snapshot // system as well as used to do on-demand snapshots. @@ -992,10 +1081,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { logrus.Infof("Saving etcd snapshot to %s", snapshotPath) - var sf *SnapshotFile + var sf *snapshotFile if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil { - sf = &SnapshotFile{ + sf = &snapshotFile{ Name: snapshotName, Location: "", Metadata: extraMetadata, @@ -1003,9 +1092,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { CreatedAt: &metav1.Time{ Time: now, }, - Status: FailedSnapshotStatus, - Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), - Size: 0, + Status: failedSnapshotStatus, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Compressed: e.config.EtcdSnapshotCompress, } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { @@ -1013,13 +1103,25 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { } } + if e.config.EtcdSnapshotCompress { + zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath) + if err != nil { + return err + } + if err := os.Remove(snapshotPath); err != nil { + return err + } + snapshotPath = zipPath + logrus.Info("Compressed snapshot: " + snapshotPath) + } + // If the snapshot attempt was successful, sf will be nil as we did not set it. if sf == nil { f, err := os.Stat(snapshotPath) if err != nil { return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") } - sf = &SnapshotFile{ + sf = &snapshotFile{ Name: f.Name(), Metadata: extraMetadata, Location: "file://" + snapshotPath, @@ -1027,8 +1129,9 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { CreatedAt: &metav1.Time{ Time: f.ModTime(), }, - Status: SuccessfulSnapshotStatus, - Size: f.Size(), + Status: successfulSnapshotStatus, + Size: f.Size(), + Compressed: e.config.EtcdSnapshotCompress, } if err := e.addSnapshotData(*sf); err != nil { @@ -1045,7 +1148,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { sf = nil if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) - sf = &SnapshotFile{ + sf = &snapshotFile{ Name: filepath.Base(snapshotPath), Metadata: extraMetadata, NodeName: "s3", @@ -1054,7 +1157,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { }, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, - Status: FailedSnapshotStatus, + Status: failedSnapshotStatus, S3: &s3Config{ Endpoint: e.config.EtcdS3Endpoint, EndpointCA: e.config.EtcdS3EndpointCA, @@ -1096,32 +1199,35 @@ type s3Config struct { Insecure bool `json:"insecure,omitempty"` } -type SnapshotStatus string +type snapshotStatus string -const SuccessfulSnapshotStatus SnapshotStatus = "successful" -const FailedSnapshotStatus SnapshotStatus = "failed" +const ( + successfulSnapshotStatus snapshotStatus = "successful" + failedSnapshotStatus snapshotStatus = "failed" +) -// SnapshotFile represents a single snapshot and it's +// snapshotFile represents a single snapshot and it's // metadata. -type SnapshotFile struct { +type snapshotFile struct { Name string `json:"name"` // Location contains the full path of the snapshot. For // local paths, the location will be prefixed with "file://". - Location string `json:"location,omitempty"` - Metadata string `json:"metadata,omitempty"` - Message string `json:"message,omitempty"` - NodeName string `json:"nodeName,omitempty"` - CreatedAt *metav1.Time `json:"createdAt,omitempty"` - Size int64 `json:"size,omitempty"` - Status SnapshotStatus `json:"status,omitempty"` - S3 *s3Config `json:"s3Config,omitempty"` + Location string `json:"location,omitempty"` + Metadata string `json:"metadata,omitempty"` + Message string `json:"message,omitempty"` + NodeName string `json:"nodeName,omitempty"` + CreatedAt *metav1.Time `json:"createdAt,omitempty"` + Size int64 `json:"size,omitempty"` + Status snapshotStatus `json:"status,omitempty"` + S3 *s3Config `json:"s3Config,omitempty"` + Compressed bool `json:"compressed"` } // listLocalSnapshots provides a list of the currently stored // snapshots on disk along with their relevant // metadata. -func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) { - snapshots := make(map[string]SnapshotFile) +func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { + snapshots := make(map[string]snapshotFile) snapshotDir, err := snapshotDir(e.config, true) if err != nil { return snapshots, errors.Wrap(err, "failed to get the snapshot dir") @@ -1135,7 +1241,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) { nodeName := os.Getenv("NODE_NAME") for _, f := range files { - sf := SnapshotFile{ + sf := snapshotFile{ Name: f.Name(), Location: "file://" + filepath.Join(snapshotDir, f.Name()), NodeName: nodeName, @@ -1143,7 +1249,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) { Time: f.ModTime(), }, Size: f.Size(), - Status: SuccessfulSnapshotStatus, + Status: successfulSnapshotStatus, } sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf @@ -1155,8 +1261,8 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) { // listS3Snapshots provides a list of currently stored // snapshots in S3 along with their relevant // metadata. -func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, error) { - snapshots := make(map[string]SnapshotFile) +func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) { + snapshots := make(map[string]snapshotFile) if e.config.EtcdS3 { ctx, cancel := context.WithCancel(ctx) @@ -1189,7 +1295,7 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, er return nil, err } - sf := SnapshotFile{ + sf := snapshotFile{ Name: filepath.Base(obj.Key), NodeName: "s3", CreatedAt: &metav1.Time{ @@ -1205,7 +1311,7 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, er Folder: e.config.EtcdS3Folder, Insecure: e.config.EtcdS3Insecure, }, - Status: SuccessfulSnapshotStatus, + Status: successfulSnapshotStatus, } sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf @@ -1254,7 +1360,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error { // ListSnapshots is an exported wrapper method that wraps an // unexported method of the same name. -func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]SnapshotFile, error) { +func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) { if e.config.EtcdS3 { return e.listS3Snapshots(ctx) } @@ -1346,7 +1452,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { // AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata // available at the time. -func (e *ETCD) addSnapshotData(sf SnapshotFile) error { +func (e *ETCD) addSnapshotData(sf snapshotFile) error { return retry.OnError(retry.DefaultBackoff, func(err error) bool { return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) }, func() error { @@ -1385,7 +1491,7 @@ func (e *ETCD) addSnapshotData(sf SnapshotFile) error { }) } -func generateSnapshotConfigMapKey(sf SnapshotFile) string { +func generateSnapshotConfigMapKey(sf snapshotFile) string { var sfKey string if sf.NodeName == "s3" { sfKey = "s3-" + sf.Name @@ -1453,25 +1559,25 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { deletedSnapshots := make(map[string]string) // failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap // These are stored unmarshaled so we can sort based on name. - var failedSnapshots []SnapshotFile - var failedS3Snapshots []SnapshotFile + var failedSnapshots []snapshotFile + var failedS3Snapshots []snapshotFile // remove entries for this node and s3 (if S3 is enabled) only for k, v := range snapshotConfigMap.Data { - var sf SnapshotFile + var sf snapshotFile if err := json.Unmarshal([]byte(v), &sf); err != nil { return err } - if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != FailedSnapshotStatus { + if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != failedSnapshotStatus { // Only delete the snapshot if the snapshot was not failed // sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot delete(snapshotConfigMap.Data, k) - } else if sf.Status == FailedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { + } else if sf.Status == failedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { // Handle locally failed snapshots. failedSnapshots = append(failedSnapshots, sf) delete(snapshotConfigMap.Data, k) - } else if sf.Status == FailedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { + } else if sf.Status == failedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { // If we're operating against S3, we can clean up failed S3 snapshots that failed on this node. failedS3Snapshots = append(failedS3Snapshots, sf) delete(snapshotConfigMap.Data, k) @@ -1526,7 +1632,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // save the local entries to the ConfigMap if they are still on disk or in S3. for _, snapshot := range snapshotFiles { - var sf SnapshotFile + var sf snapshotFile sfKey := generateSnapshotConfigMapKey(snapshot) if v, ok := deletedSnapshots[sfKey]; ok { // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it @@ -1539,7 +1645,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { sf = snapshot } - sf.Status = SuccessfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. + sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. marshalledSnapshot, err := json.Marshal(sf) if err != nil { @@ -1577,13 +1683,33 @@ func (e *ETCD) Restore(ctx context.Context) error { if _, err := os.Stat(e.config.ClusterResetRestorePath); err != nil { return err } + + var restorePath string + if strings.HasSuffix(e.config.ClusterResetRestorePath, compressedExtension) { + snapshotDir, err := snapshotDir(e.config, true) + if err != nil { + return errors.Wrap(err, "failed to get the snapshot dir") + } + + decompressSnapshot, err := e.decompressSnapshot(snapshotDir, e.config.ClusterResetRestorePath) + if err != nil { + return err + } + + restorePath = decompressSnapshot + } else { + restorePath = e.config.ClusterResetRestorePath + } + // move the data directory to a temp path if err := os.Rename(DBDir(e.config), oldDataDir); err != nil { return err } + logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir) + return snapshot.NewV3(nil).Restore(snapshot.RestoreConfig{ - SnapshotPath: e.config.ClusterResetRestorePath, + SnapshotPath: restorePath, Name: e.name, OutputDataDir: DBDir(e.config), OutputWALDir: walDir(e.config), diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 6eee3092c2..3a8d5ccf2d 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -93,11 +93,11 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { // upload uploads the given snapshot to the configured S3 // compatible backend. -func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*SnapshotFile, error) { +func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*snapshotFile, error) { logrus.Infof("Uploading snapshot %s to S3", snapshot) basename := filepath.Base(snapshot) var snapshotFileName string - var snapshotFile SnapshotFile + var sf snapshotFile if s.config.EtcdS3Folder != "" { snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename) } else { @@ -112,7 +112,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim } uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts) if err != nil { - snapshotFile = SnapshotFile{ + sf = snapshotFile{ Name: filepath.Base(uploadInfo.Key), Metadata: extraMetadata, NodeName: "s3", @@ -121,7 +121,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim }, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, - Status: FailedSnapshotStatus, + Status: failedSnapshotStatus, S3: &s3Config{ Endpoint: s.config.EtcdS3Endpoint, EndpointCA: s.config.EtcdS3EndpointCA, @@ -139,7 +139,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim return nil, err } - snapshotFile = SnapshotFile{ + sf = snapshotFile{ Name: filepath.Base(uploadInfo.Key), Metadata: extraMetadata, NodeName: "s3", @@ -147,7 +147,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim Time: ca, }, Size: uploadInfo.Size, - Status: SuccessfulSnapshotStatus, + Status: successfulSnapshotStatus, S3: &s3Config{ Endpoint: s.config.EtcdS3Endpoint, EndpointCA: s.config.EtcdS3EndpointCA, @@ -159,7 +159,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim }, } } - return &snapshotFile, nil + return &sf, nil } // download downloads the given snapshot from the configured S3