mirror of https://github.com/k3s-io/k3s
Adds the ability to compress etcd snapshots (#4866)
parent
48ffed3852
commit
effcb15adb
|
@ -33,6 +33,10 @@
|
||||||
{
|
{
|
||||||
"linters": "revive",
|
"linters": "revive",
|
||||||
"text": "should have comment"
|
"text": "should have comment"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"linters": "revive",
|
||||||
|
"text": "exported"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,11 @@ var EtcdSnapshotFlags = []cli.Flag{
|
||||||
Destination: &ServerConfig.EtcdSnapshotName,
|
Destination: &ServerConfig.EtcdSnapshotName,
|
||||||
Value: "on-demand",
|
Value: "on-demand",
|
||||||
},
|
},
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "snapshot-compress,etcd-snapshot-compress",
|
||||||
|
Usage: "(db) Compress etcd snapshot",
|
||||||
|
Destination: &ServerConfig.EtcdSnapshotCompress,
|
||||||
|
},
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
Name: "s3,etcd-s3",
|
Name: "s3,etcd-s3",
|
||||||
Usage: "(db) Enable backup to S3",
|
Usage: "(db) Enable backup to S3",
|
||||||
|
|
|
@ -84,6 +84,7 @@ type Server struct {
|
||||||
EtcdSnapshotDir string
|
EtcdSnapshotDir string
|
||||||
EtcdSnapshotCron string
|
EtcdSnapshotCron string
|
||||||
EtcdSnapshotRetention int
|
EtcdSnapshotRetention int
|
||||||
|
EtcdSnapshotCompress bool
|
||||||
EtcdS3 bool
|
EtcdS3 bool
|
||||||
EtcdS3Endpoint string
|
EtcdS3Endpoint string
|
||||||
EtcdS3EndpointCA string
|
EtcdS3EndpointCA string
|
||||||
|
@ -289,6 +290,11 @@ var ServerFlags = []cli.Flag{
|
||||||
Usage: "(db) Directory to save db snapshots. (Default location: ${data-dir}/db/snapshots)",
|
Usage: "(db) Directory to save db snapshots. (Default location: ${data-dir}/db/snapshots)",
|
||||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||||
},
|
},
|
||||||
|
&cli.BoolFlag{
|
||||||
|
Name: "etcd-snapshot-compress",
|
||||||
|
Usage: "(db) Compress etcd snapshot",
|
||||||
|
Destination: &ServerConfig.EtcdSnapshotCompress,
|
||||||
|
},
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
Name: "etcd-s3",
|
Name: "etcd-s3",
|
||||||
Usage: "(db) Enable backup to S3",
|
Usage: "(db) Enable backup to S3",
|
||||||
|
|
|
@ -39,6 +39,7 @@ func commandSetup(app *cli.Context, cfg *cmds.Server, sc *server.Config) (string
|
||||||
sc.ControlConfig.DataDir = cfg.DataDir
|
sc.ControlConfig.DataDir = cfg.DataDir
|
||||||
sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
|
sc.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
|
||||||
sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
|
sc.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
|
||||||
|
sc.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress
|
||||||
sc.ControlConfig.EtcdS3 = cfg.EtcdS3
|
sc.ControlConfig.EtcdS3 = cfg.EtcdS3
|
||||||
sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
|
sc.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
|
||||||
sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
|
sc.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
|
||||||
|
|
|
@ -170,6 +170,7 @@ type Control struct {
|
||||||
EtcdSnapshotDir string
|
EtcdSnapshotDir string
|
||||||
EtcdSnapshotCron string
|
EtcdSnapshotCron string
|
||||||
EtcdSnapshotRetention int
|
EtcdSnapshotRetention int
|
||||||
|
EtcdSnapshotCompress bool
|
||||||
EtcdS3 bool
|
EtcdS3 bool
|
||||||
EtcdS3Endpoint string
|
EtcdS3Endpoint string
|
||||||
EtcdS3EndpointCA string
|
EtcdS3EndpointCA string
|
||||||
|
|
192
pkg/etcd/etcd.go
192
pkg/etcd/etcd.go
|
@ -1,12 +1,14 @@
|
||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"archive/zip"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -941,6 +943,93 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const compressedExtension = ".zip"
|
||||||
|
|
||||||
|
// compressSnapshot compresses the given snapshot and provides the
|
||||||
|
// caller with the path to the file.
|
||||||
|
func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string) (string, error) {
|
||||||
|
logrus.Info("Compressing etcd snapshot file: " + snapshotName)
|
||||||
|
|
||||||
|
zippedSnapshotName := snapshotName + compressedExtension
|
||||||
|
zipPath := filepath.Join(snapshotDir, zippedSnapshotName)
|
||||||
|
|
||||||
|
zf, err := os.Create(zipPath)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer zf.Close()
|
||||||
|
|
||||||
|
zipWriter := zip.NewWriter(zf)
|
||||||
|
defer zipWriter.Close()
|
||||||
|
|
||||||
|
uncompressedPath := filepath.Join(snapshotDir, snapshotName)
|
||||||
|
fileToZip, err := os.Open(uncompressedPath)
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(zipPath)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer fileToZip.Close()
|
||||||
|
|
||||||
|
info, err := fileToZip.Stat()
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(zipPath)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
header, err := zip.FileInfoHeader(info)
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(zipPath)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
header.Name = snapshotName
|
||||||
|
header.Method = zip.Deflate
|
||||||
|
header.Modified = time.Now()
|
||||||
|
|
||||||
|
writer, err := zipWriter.CreateHeader(header)
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(zipPath)
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
_, err = io.Copy(writer, fileToZip)
|
||||||
|
|
||||||
|
return zipPath, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// decompressSnapshot decompresses the given snapshot and provides the caller
|
||||||
|
// with the full path to the uncompressed snapshot.
|
||||||
|
func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, error) {
|
||||||
|
logrus.Info("Decompressing etcd snapshot file: " + snapshotFile)
|
||||||
|
|
||||||
|
r, err := zip.OpenReader(snapshotFile)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
var decompressed *os.File
|
||||||
|
for _, sf := range r.File {
|
||||||
|
decompressed, err = os.OpenFile(strings.Replace(sf.Name, compressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode())
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer decompressed.Close()
|
||||||
|
|
||||||
|
ss, err := sf.Open()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer ss.Close()
|
||||||
|
|
||||||
|
if _, err := io.Copy(decompressed, ss); err != nil {
|
||||||
|
os.Remove("")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return decompressed.Name(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
|
// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed
|
||||||
// snapshots in excess of the retention limits. This method is used in the internal cron snapshot
|
// snapshots in excess of the retention limits. This method is used in the internal cron snapshot
|
||||||
// system as well as used to do on-demand snapshots.
|
// system as well as used to do on-demand snapshots.
|
||||||
|
@ -992,10 +1081,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
|
|
||||||
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
logrus.Infof("Saving etcd snapshot to %s", snapshotPath)
|
||||||
|
|
||||||
var sf *SnapshotFile
|
var sf *snapshotFile
|
||||||
|
|
||||||
if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil {
|
if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil {
|
||||||
sf = &SnapshotFile{
|
sf = &snapshotFile{
|
||||||
Name: snapshotName,
|
Name: snapshotName,
|
||||||
Location: "",
|
Location: "",
|
||||||
Metadata: extraMetadata,
|
Metadata: extraMetadata,
|
||||||
|
@ -1003,9 +1092,10 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
CreatedAt: &metav1.Time{
|
CreatedAt: &metav1.Time{
|
||||||
Time: now,
|
Time: now,
|
||||||
},
|
},
|
||||||
Status: FailedSnapshotStatus,
|
Status: failedSnapshotStatus,
|
||||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||||
Size: 0,
|
Size: 0,
|
||||||
|
Compressed: e.config.EtcdSnapshotCompress,
|
||||||
}
|
}
|
||||||
logrus.Errorf("Failed to take etcd snapshot: %v", err)
|
logrus.Errorf("Failed to take etcd snapshot: %v", err)
|
||||||
if err := e.addSnapshotData(*sf); err != nil {
|
if err := e.addSnapshotData(*sf); err != nil {
|
||||||
|
@ -1013,13 +1103,25 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e.config.EtcdSnapshotCompress {
|
||||||
|
zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Remove(snapshotPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
snapshotPath = zipPath
|
||||||
|
logrus.Info("Compressed snapshot: " + snapshotPath)
|
||||||
|
}
|
||||||
|
|
||||||
// If the snapshot attempt was successful, sf will be nil as we did not set it.
|
// If the snapshot attempt was successful, sf will be nil as we did not set it.
|
||||||
if sf == nil {
|
if sf == nil {
|
||||||
f, err := os.Stat(snapshotPath)
|
f, err := os.Stat(snapshotPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
|
return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot")
|
||||||
}
|
}
|
||||||
sf = &SnapshotFile{
|
sf = &snapshotFile{
|
||||||
Name: f.Name(),
|
Name: f.Name(),
|
||||||
Metadata: extraMetadata,
|
Metadata: extraMetadata,
|
||||||
Location: "file://" + snapshotPath,
|
Location: "file://" + snapshotPath,
|
||||||
|
@ -1027,8 +1129,9 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
CreatedAt: &metav1.Time{
|
CreatedAt: &metav1.Time{
|
||||||
Time: f.ModTime(),
|
Time: f.ModTime(),
|
||||||
},
|
},
|
||||||
Status: SuccessfulSnapshotStatus,
|
Status: successfulSnapshotStatus,
|
||||||
Size: f.Size(),
|
Size: f.Size(),
|
||||||
|
Compressed: e.config.EtcdSnapshotCompress,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := e.addSnapshotData(*sf); err != nil {
|
if err := e.addSnapshotData(*sf); err != nil {
|
||||||
|
@ -1045,7 +1148,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
sf = nil
|
sf = nil
|
||||||
if err := e.initS3IfNil(ctx); err != nil {
|
if err := e.initS3IfNil(ctx); err != nil {
|
||||||
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
logrus.Warnf("Unable to initialize S3 client: %v", err)
|
||||||
sf = &SnapshotFile{
|
sf = &snapshotFile{
|
||||||
Name: filepath.Base(snapshotPath),
|
Name: filepath.Base(snapshotPath),
|
||||||
Metadata: extraMetadata,
|
Metadata: extraMetadata,
|
||||||
NodeName: "s3",
|
NodeName: "s3",
|
||||||
|
@ -1054,7 +1157,7 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
||||||
},
|
},
|
||||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||||
Size: 0,
|
Size: 0,
|
||||||
Status: FailedSnapshotStatus,
|
Status: failedSnapshotStatus,
|
||||||
S3: &s3Config{
|
S3: &s3Config{
|
||||||
Endpoint: e.config.EtcdS3Endpoint,
|
Endpoint: e.config.EtcdS3Endpoint,
|
||||||
EndpointCA: e.config.EtcdS3EndpointCA,
|
EndpointCA: e.config.EtcdS3EndpointCA,
|
||||||
|
@ -1096,14 +1199,16 @@ type s3Config struct {
|
||||||
Insecure bool `json:"insecure,omitempty"`
|
Insecure bool `json:"insecure,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SnapshotStatus string
|
type snapshotStatus string
|
||||||
|
|
||||||
const SuccessfulSnapshotStatus SnapshotStatus = "successful"
|
const (
|
||||||
const FailedSnapshotStatus SnapshotStatus = "failed"
|
successfulSnapshotStatus snapshotStatus = "successful"
|
||||||
|
failedSnapshotStatus snapshotStatus = "failed"
|
||||||
|
)
|
||||||
|
|
||||||
// SnapshotFile represents a single snapshot and it's
|
// snapshotFile represents a single snapshot and it's
|
||||||
// metadata.
|
// metadata.
|
||||||
type SnapshotFile struct {
|
type snapshotFile struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
// Location contains the full path of the snapshot. For
|
// Location contains the full path of the snapshot. For
|
||||||
// local paths, the location will be prefixed with "file://".
|
// local paths, the location will be prefixed with "file://".
|
||||||
|
@ -1113,15 +1218,16 @@ type SnapshotFile struct {
|
||||||
NodeName string `json:"nodeName,omitempty"`
|
NodeName string `json:"nodeName,omitempty"`
|
||||||
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
|
CreatedAt *metav1.Time `json:"createdAt,omitempty"`
|
||||||
Size int64 `json:"size,omitempty"`
|
Size int64 `json:"size,omitempty"`
|
||||||
Status SnapshotStatus `json:"status,omitempty"`
|
Status snapshotStatus `json:"status,omitempty"`
|
||||||
S3 *s3Config `json:"s3Config,omitempty"`
|
S3 *s3Config `json:"s3Config,omitempty"`
|
||||||
|
Compressed bool `json:"compressed"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// listLocalSnapshots provides a list of the currently stored
|
// listLocalSnapshots provides a list of the currently stored
|
||||||
// snapshots on disk along with their relevant
|
// snapshots on disk along with their relevant
|
||||||
// metadata.
|
// metadata.
|
||||||
func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) {
|
func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) {
|
||||||
snapshots := make(map[string]SnapshotFile)
|
snapshots := make(map[string]snapshotFile)
|
||||||
snapshotDir, err := snapshotDir(e.config, true)
|
snapshotDir, err := snapshotDir(e.config, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snapshots, errors.Wrap(err, "failed to get the snapshot dir")
|
return snapshots, errors.Wrap(err, "failed to get the snapshot dir")
|
||||||
|
@ -1135,7 +1241,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) {
|
||||||
nodeName := os.Getenv("NODE_NAME")
|
nodeName := os.Getenv("NODE_NAME")
|
||||||
|
|
||||||
for _, f := range files {
|
for _, f := range files {
|
||||||
sf := SnapshotFile{
|
sf := snapshotFile{
|
||||||
Name: f.Name(),
|
Name: f.Name(),
|
||||||
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
|
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
|
||||||
NodeName: nodeName,
|
NodeName: nodeName,
|
||||||
|
@ -1143,7 +1249,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) {
|
||||||
Time: f.ModTime(),
|
Time: f.ModTime(),
|
||||||
},
|
},
|
||||||
Size: f.Size(),
|
Size: f.Size(),
|
||||||
Status: SuccessfulSnapshotStatus,
|
Status: successfulSnapshotStatus,
|
||||||
}
|
}
|
||||||
sfKey := generateSnapshotConfigMapKey(sf)
|
sfKey := generateSnapshotConfigMapKey(sf)
|
||||||
snapshots[sfKey] = sf
|
snapshots[sfKey] = sf
|
||||||
|
@ -1155,8 +1261,8 @@ func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) {
|
||||||
// listS3Snapshots provides a list of currently stored
|
// listS3Snapshots provides a list of currently stored
|
||||||
// snapshots in S3 along with their relevant
|
// snapshots in S3 along with their relevant
|
||||||
// metadata.
|
// metadata.
|
||||||
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, error) {
|
func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||||
snapshots := make(map[string]SnapshotFile)
|
snapshots := make(map[string]snapshotFile)
|
||||||
|
|
||||||
if e.config.EtcdS3 {
|
if e.config.EtcdS3 {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
@ -1189,7 +1295,7 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sf := SnapshotFile{
|
sf := snapshotFile{
|
||||||
Name: filepath.Base(obj.Key),
|
Name: filepath.Base(obj.Key),
|
||||||
NodeName: "s3",
|
NodeName: "s3",
|
||||||
CreatedAt: &metav1.Time{
|
CreatedAt: &metav1.Time{
|
||||||
|
@ -1205,7 +1311,7 @@ func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, er
|
||||||
Folder: e.config.EtcdS3Folder,
|
Folder: e.config.EtcdS3Folder,
|
||||||
Insecure: e.config.EtcdS3Insecure,
|
Insecure: e.config.EtcdS3Insecure,
|
||||||
},
|
},
|
||||||
Status: SuccessfulSnapshotStatus,
|
Status: successfulSnapshotStatus,
|
||||||
}
|
}
|
||||||
sfKey := generateSnapshotConfigMapKey(sf)
|
sfKey := generateSnapshotConfigMapKey(sf)
|
||||||
snapshots[sfKey] = sf
|
snapshots[sfKey] = sf
|
||||||
|
@ -1254,7 +1360,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error {
|
||||||
|
|
||||||
// ListSnapshots is an exported wrapper method that wraps an
|
// ListSnapshots is an exported wrapper method that wraps an
|
||||||
// unexported method of the same name.
|
// unexported method of the same name.
|
||||||
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]SnapshotFile, error) {
|
func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) {
|
||||||
if e.config.EtcdS3 {
|
if e.config.EtcdS3 {
|
||||||
return e.listS3Snapshots(ctx)
|
return e.listS3Snapshots(ctx)
|
||||||
}
|
}
|
||||||
|
@ -1346,7 +1452,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
|
||||||
|
|
||||||
// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata
|
// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata
|
||||||
// available at the time.
|
// available at the time.
|
||||||
func (e *ETCD) addSnapshotData(sf SnapshotFile) error {
|
func (e *ETCD) addSnapshotData(sf snapshotFile) error {
|
||||||
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
return retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||||
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
|
return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err)
|
||||||
}, func() error {
|
}, func() error {
|
||||||
|
@ -1385,7 +1491,7 @@ func (e *ETCD) addSnapshotData(sf SnapshotFile) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateSnapshotConfigMapKey(sf SnapshotFile) string {
|
func generateSnapshotConfigMapKey(sf snapshotFile) string {
|
||||||
var sfKey string
|
var sfKey string
|
||||||
if sf.NodeName == "s3" {
|
if sf.NodeName == "s3" {
|
||||||
sfKey = "s3-" + sf.Name
|
sfKey = "s3-" + sf.Name
|
||||||
|
@ -1453,25 +1559,25 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
deletedSnapshots := make(map[string]string)
|
deletedSnapshots := make(map[string]string)
|
||||||
// failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap
|
// failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap
|
||||||
// These are stored unmarshaled so we can sort based on name.
|
// These are stored unmarshaled so we can sort based on name.
|
||||||
var failedSnapshots []SnapshotFile
|
var failedSnapshots []snapshotFile
|
||||||
var failedS3Snapshots []SnapshotFile
|
var failedS3Snapshots []snapshotFile
|
||||||
|
|
||||||
// remove entries for this node and s3 (if S3 is enabled) only
|
// remove entries for this node and s3 (if S3 is enabled) only
|
||||||
for k, v := range snapshotConfigMap.Data {
|
for k, v := range snapshotConfigMap.Data {
|
||||||
var sf SnapshotFile
|
var sf snapshotFile
|
||||||
if err := json.Unmarshal([]byte(v), &sf); err != nil {
|
if err := json.Unmarshal([]byte(v), &sf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != FailedSnapshotStatus {
|
if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != failedSnapshotStatus {
|
||||||
// Only delete the snapshot if the snapshot was not failed
|
// Only delete the snapshot if the snapshot was not failed
|
||||||
// sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status
|
// sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status
|
||||||
deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot
|
deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot
|
||||||
delete(snapshotConfigMap.Data, k)
|
delete(snapshotConfigMap.Data, k)
|
||||||
} else if sf.Status == FailedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 {
|
} else if sf.Status == failedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 {
|
||||||
// Handle locally failed snapshots.
|
// Handle locally failed snapshots.
|
||||||
failedSnapshots = append(failedSnapshots, sf)
|
failedSnapshots = append(failedSnapshots, sf)
|
||||||
delete(snapshotConfigMap.Data, k)
|
delete(snapshotConfigMap.Data, k)
|
||||||
} else if sf.Status == FailedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 {
|
} else if sf.Status == failedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 {
|
||||||
// If we're operating against S3, we can clean up failed S3 snapshots that failed on this node.
|
// If we're operating against S3, we can clean up failed S3 snapshots that failed on this node.
|
||||||
failedS3Snapshots = append(failedS3Snapshots, sf)
|
failedS3Snapshots = append(failedS3Snapshots, sf)
|
||||||
delete(snapshotConfigMap.Data, k)
|
delete(snapshotConfigMap.Data, k)
|
||||||
|
@ -1526,7 +1632,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
|
|
||||||
// save the local entries to the ConfigMap if they are still on disk or in S3.
|
// save the local entries to the ConfigMap if they are still on disk or in S3.
|
||||||
for _, snapshot := range snapshotFiles {
|
for _, snapshot := range snapshotFiles {
|
||||||
var sf SnapshotFile
|
var sf snapshotFile
|
||||||
sfKey := generateSnapshotConfigMapKey(snapshot)
|
sfKey := generateSnapshotConfigMapKey(snapshot)
|
||||||
if v, ok := deletedSnapshots[sfKey]; ok {
|
if v, ok := deletedSnapshots[sfKey]; ok {
|
||||||
// use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it
|
// use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it
|
||||||
|
@ -1539,7 +1645,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
sf = snapshot
|
sf = snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
sf.Status = SuccessfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful.
|
sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful.
|
||||||
|
|
||||||
marshalledSnapshot, err := json.Marshal(sf)
|
marshalledSnapshot, err := json.Marshal(sf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1577,13 +1683,33 @@ func (e *ETCD) Restore(ctx context.Context) error {
|
||||||
if _, err := os.Stat(e.config.ClusterResetRestorePath); err != nil {
|
if _, err := os.Stat(e.config.ClusterResetRestorePath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var restorePath string
|
||||||
|
if strings.HasSuffix(e.config.ClusterResetRestorePath, compressedExtension) {
|
||||||
|
snapshotDir, err := snapshotDir(e.config, true)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressSnapshot, err := e.decompressSnapshot(snapshotDir, e.config.ClusterResetRestorePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
restorePath = decompressSnapshot
|
||||||
|
} else {
|
||||||
|
restorePath = e.config.ClusterResetRestorePath
|
||||||
|
}
|
||||||
|
|
||||||
// move the data directory to a temp path
|
// move the data directory to a temp path
|
||||||
if err := os.Rename(DBDir(e.config), oldDataDir); err != nil {
|
if err := os.Rename(DBDir(e.config), oldDataDir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir)
|
logrus.Infof("Pre-restore etcd database moved to %s", oldDataDir)
|
||||||
|
|
||||||
return snapshot.NewV3(nil).Restore(snapshot.RestoreConfig{
|
return snapshot.NewV3(nil).Restore(snapshot.RestoreConfig{
|
||||||
SnapshotPath: e.config.ClusterResetRestorePath,
|
SnapshotPath: restorePath,
|
||||||
Name: e.name,
|
Name: e.name,
|
||||||
OutputDataDir: DBDir(e.config),
|
OutputDataDir: DBDir(e.config),
|
||||||
OutputWALDir: walDir(e.config),
|
OutputWALDir: walDir(e.config),
|
||||||
|
|
|
@ -93,11 +93,11 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
|
||||||
|
|
||||||
// upload uploads the given snapshot to the configured S3
|
// upload uploads the given snapshot to the configured S3
|
||||||
// compatible backend.
|
// compatible backend.
|
||||||
func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*SnapshotFile, error) {
|
func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*snapshotFile, error) {
|
||||||
logrus.Infof("Uploading snapshot %s to S3", snapshot)
|
logrus.Infof("Uploading snapshot %s to S3", snapshot)
|
||||||
basename := filepath.Base(snapshot)
|
basename := filepath.Base(snapshot)
|
||||||
var snapshotFileName string
|
var snapshotFileName string
|
||||||
var snapshotFile SnapshotFile
|
var sf snapshotFile
|
||||||
if s.config.EtcdS3Folder != "" {
|
if s.config.EtcdS3Folder != "" {
|
||||||
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename)
|
snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename)
|
||||||
} else {
|
} else {
|
||||||
|
@ -112,7 +112,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
|
||||||
}
|
}
|
||||||
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
|
uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
snapshotFile = SnapshotFile{
|
sf = snapshotFile{
|
||||||
Name: filepath.Base(uploadInfo.Key),
|
Name: filepath.Base(uploadInfo.Key),
|
||||||
Metadata: extraMetadata,
|
Metadata: extraMetadata,
|
||||||
NodeName: "s3",
|
NodeName: "s3",
|
||||||
|
@ -121,7 +121,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
|
||||||
},
|
},
|
||||||
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
Message: base64.StdEncoding.EncodeToString([]byte(err.Error())),
|
||||||
Size: 0,
|
Size: 0,
|
||||||
Status: FailedSnapshotStatus,
|
Status: failedSnapshotStatus,
|
||||||
S3: &s3Config{
|
S3: &s3Config{
|
||||||
Endpoint: s.config.EtcdS3Endpoint,
|
Endpoint: s.config.EtcdS3Endpoint,
|
||||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||||
|
@ -139,7 +139,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotFile = SnapshotFile{
|
sf = snapshotFile{
|
||||||
Name: filepath.Base(uploadInfo.Key),
|
Name: filepath.Base(uploadInfo.Key),
|
||||||
Metadata: extraMetadata,
|
Metadata: extraMetadata,
|
||||||
NodeName: "s3",
|
NodeName: "s3",
|
||||||
|
@ -147,7 +147,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
|
||||||
Time: ca,
|
Time: ca,
|
||||||
},
|
},
|
||||||
Size: uploadInfo.Size,
|
Size: uploadInfo.Size,
|
||||||
Status: SuccessfulSnapshotStatus,
|
Status: successfulSnapshotStatus,
|
||||||
S3: &s3Config{
|
S3: &s3Config{
|
||||||
Endpoint: s.config.EtcdS3Endpoint,
|
Endpoint: s.config.EtcdS3Endpoint,
|
||||||
EndpointCA: s.config.EtcdS3EndpointCA,
|
EndpointCA: s.config.EtcdS3EndpointCA,
|
||||||
|
@ -159,7 +159,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &snapshotFile, nil
|
return &sf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// download downloads the given snapshot from the configured S3
|
// download downloads the given snapshot from the configured S3
|
||||||
|
|
Loading…
Reference in New Issue