mirror of https://github.com/k3s-io/k3s
Add the ability to delete an etcd snapshot locally or from S3 (#3277)
* Add the ability to delete a given set of etcd snapshots from the CLI for locally stored and S3 store snapshots.pull/3303/head
parent
e77fd18270
commit
e998cd110d
|
@ -13,7 +13,7 @@ import (
|
|||
func main() {
|
||||
app := cmds.NewApp()
|
||||
app.Commands = []cli.Command{
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run, cmds.NewEtcdSnapshotSubcommands(etcdsnapshot.Delete)),
|
||||
}
|
||||
|
||||
if err := app.Run(configfilearg.MustParse(os.Args)); err != nil {
|
||||
|
|
|
@ -33,6 +33,8 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
etcdsnapshotCommand := internalCLIAction(version.Program+"-"+cmds.EtcdSnapshotCommand, dataDir, os.Args)
|
||||
|
||||
// Handle subcommand invocation (k3s server, k3s crictl, etc)
|
||||
app := cmds.NewApp()
|
||||
app.Commands = []cli.Command{
|
||||
|
@ -42,7 +44,7 @@ func main() {
|
|||
cmds.NewCRICTL(externalCLIAction("crictl", dataDir)),
|
||||
cmds.NewCtrCommand(externalCLIAction("ctr", dataDir)),
|
||||
cmds.NewCheckConfigCommand(externalCLIAction("check-config", dataDir)),
|
||||
cmds.NewEtcdSnapshotCommand(internalCLIAction(version.Program+"-"+cmds.EtcdSnapshotCommand, dataDir, os.Args)),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshotCommand, cmds.NewEtcdSnapshotSubcommands(etcdsnapshotCommand)),
|
||||
}
|
||||
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
|
|
|
@ -43,7 +43,7 @@ func main() {
|
|||
cmds.NewKubectlCommand(kubectl.Run),
|
||||
cmds.NewCRICTL(crictl.Run),
|
||||
cmds.NewCtrCommand(ctr.Run),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run, cmds.NewEtcdSnapshotSubcommands(etcdsnapshot.Delete)),
|
||||
}
|
||||
|
||||
err := app.Run(configfilearg.MustParse(os.Args))
|
||||
|
|
2
main.go
2
main.go
|
@ -27,7 +27,7 @@ func main() {
|
|||
cmds.NewAgentCommand(agent.Run),
|
||||
cmds.NewKubectlCommand(kubectl.Run),
|
||||
cmds.NewCRICTL(crictl.Run),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run),
|
||||
cmds.NewEtcdSnapshotCommand(etcdsnapshot.Run, cmds.NewEtcdSnapshotSubcommands(etcdsnapshot.Delete)),
|
||||
}
|
||||
|
||||
if err := app.Run(configfilearg.MustParse(os.Args)); err != nil {
|
||||
|
|
|
@ -7,88 +7,103 @@ import (
|
|||
|
||||
const EtcdSnapshotCommand = "etcd-snapshot"
|
||||
|
||||
func NewEtcdSnapshotCommand(action func(*cli.Context) error) cli.Command {
|
||||
var EtcdSnapshotFlags = []cli.Flag{
|
||||
DebugFlag,
|
||||
LogFile,
|
||||
AlsoLogToStderr,
|
||||
cli.StringFlag{
|
||||
Name: "node-name",
|
||||
Usage: "(agent/node) Node name",
|
||||
EnvVar: version.ProgramUpper + "_NODE_NAME",
|
||||
Destination: &AgentConfig.NodeName,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "data-dir,d",
|
||||
Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root",
|
||||
Destination: &ServerConfig.DataDir,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "(db) Set the base name of the etcd on-demand snapshot (appended with UNIX timestamp).",
|
||||
Destination: &ServerConfig.EtcdSnapshotName,
|
||||
Value: "on-demand",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "s3",
|
||||
Usage: "(db) Enable backup to S3",
|
||||
Destination: &ServerConfig.EtcdS3,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-endpoint",
|
||||
Usage: "(db) S3 endpoint url",
|
||||
Destination: &ServerConfig.EtcdS3Endpoint,
|
||||
Value: "s3.amazonaws.com",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-endpoint-ca",
|
||||
Usage: "(db) S3 custom CA cert to connect to S3 endpoint",
|
||||
Destination: &ServerConfig.EtcdS3EndpointCA,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "s3-skip-ssl-verify",
|
||||
Usage: "(db) Disables S3 SSL certificate validation",
|
||||
Destination: &ServerConfig.EtcdS3SkipSSLVerify,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-access-key",
|
||||
Usage: "(db) S3 access key",
|
||||
EnvVar: "AWS_ACCESS_KEY_ID",
|
||||
Destination: &ServerConfig.EtcdS3AccessKey,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-secret-key",
|
||||
Usage: "(db) S3 secret key",
|
||||
EnvVar: "AWS_SECRET_ACCESS_KEY",
|
||||
Destination: &ServerConfig.EtcdS3SecretKey,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-bucket",
|
||||
Usage: "(db) S3 bucket name",
|
||||
Destination: &ServerConfig.EtcdS3BucketName,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-region",
|
||||
Usage: "(db) S3 region / bucket location (optional)",
|
||||
Destination: &ServerConfig.EtcdS3Region,
|
||||
Value: "us-east-1",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-folder",
|
||||
Usage: "(db) S3 folder",
|
||||
Destination: &ServerConfig.EtcdS3Folder,
|
||||
},
|
||||
}
|
||||
|
||||
func NewEtcdSnapshotCommand(action func(*cli.Context) error, subcommands []cli.Command) cli.Command {
|
||||
return cli.Command{
|
||||
Name: EtcdSnapshotCommand,
|
||||
Usage: "Trigger an immediate etcd snapshot",
|
||||
SkipFlagParsing: false,
|
||||
SkipArgReorder: true,
|
||||
Action: action,
|
||||
Flags: []cli.Flag{
|
||||
DebugFlag,
|
||||
LogFile,
|
||||
AlsoLogToStderr,
|
||||
cli.StringFlag{
|
||||
Name: "node-name",
|
||||
Usage: "(agent/node) Node name",
|
||||
EnvVar: version.ProgramUpper + "_NODE_NAME",
|
||||
Destination: &AgentConfig.NodeName,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "data-dir,d",
|
||||
Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root",
|
||||
Destination: &ServerConfig.DataDir,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "(db) Set the base name of the etcd on-demand snapshot (appended with UNIX timestamp).",
|
||||
Destination: &ServerConfig.EtcdSnapshotName,
|
||||
Value: "on-demand",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "dir",
|
||||
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",
|
||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "s3",
|
||||
Usage: "(db) Enable backup to S3",
|
||||
Destination: &ServerConfig.EtcdS3,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-endpoint",
|
||||
Usage: "(db) S3 endpoint url",
|
||||
Destination: &ServerConfig.EtcdS3Endpoint,
|
||||
Value: "s3.amazonaws.com",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-endpoint-ca",
|
||||
Usage: "(db) S3 custom CA cert to connect to S3 endpoint",
|
||||
Destination: &ServerConfig.EtcdS3EndpointCA,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "s3-skip-ssl-verify",
|
||||
Usage: "(db) Disables S3 SSL certificate validation",
|
||||
Destination: &ServerConfig.EtcdS3SkipSSLVerify,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-access-key",
|
||||
Usage: "(db) S3 access key",
|
||||
EnvVar: "AWS_ACCESS_KEY_ID",
|
||||
Destination: &ServerConfig.EtcdS3AccessKey,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-secret-key",
|
||||
Usage: "(db) S3 secret key",
|
||||
EnvVar: "AWS_SECRET_ACCESS_KEY",
|
||||
Destination: &ServerConfig.EtcdS3SecretKey,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-bucket",
|
||||
Usage: "(db) S3 bucket name",
|
||||
Destination: &ServerConfig.EtcdS3BucketName,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-region",
|
||||
Usage: "(db) S3 region / bucket location (optional)",
|
||||
Destination: &ServerConfig.EtcdS3Region,
|
||||
Value: "us-east-1",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "s3-folder",
|
||||
Usage: "(db) S3 folder",
|
||||
Destination: &ServerConfig.EtcdS3Folder,
|
||||
},
|
||||
Subcommands: subcommands,
|
||||
Flags: append(EtcdSnapshotFlags, &cli.StringFlag{
|
||||
Name: "dir",
|
||||
Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)",
|
||||
Destination: &ServerConfig.EtcdSnapshotDir,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func NewEtcdSnapshotSubcommands(delete func(ctx *cli.Context) error) []cli.Command {
|
||||
return []cli.Command{
|
||||
{
|
||||
Name: "delete",
|
||||
Usage: "Delete given snapshot(s)",
|
||||
SkipFlagParsing: false,
|
||||
SkipArgReorder: true,
|
||||
Action: delete,
|
||||
Flags: EtcdSnapshotFlags,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package cmds
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
|
@ -20,6 +21,8 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
var ErrCommandNoArgs = errors.New("this command does not take any arguments")
|
||||
|
||||
func init() {
|
||||
// hack - force "file,dns" lookup order if go dns is used
|
||||
if os.Getenv("RES_OPTIONS") == "" {
|
||||
|
|
|
@ -16,6 +16,25 @@ import (
|
|||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
// commandSetup setups up common things needed
|
||||
// for each etcd command.
|
||||
func commandSetup(app *cli.Context, cfg *cmds.Server) (string, error) {
|
||||
gspt.SetProcTitle(os.Args[0])
|
||||
|
||||
nodeName := app.String("node-name")
|
||||
if nodeName == "" {
|
||||
h, err := os.Hostname()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
nodeName = h
|
||||
}
|
||||
|
||||
os.Setenv("NODE_NAME", nodeName)
|
||||
|
||||
return server.ResolveDataDir(cfg.DataDir)
|
||||
}
|
||||
|
||||
func Run(app *cli.Context) error {
|
||||
if err := cmds.InitLogging(); err != nil {
|
||||
return err
|
||||
|
@ -24,24 +43,15 @@ func Run(app *cli.Context) error {
|
|||
}
|
||||
|
||||
func run(app *cli.Context, cfg *cmds.Server) error {
|
||||
gspt.SetProcTitle(os.Args[0])
|
||||
|
||||
dataDir, err := server.ResolveDataDir(cfg.DataDir)
|
||||
dataDir, err := commandSetup(app, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nodeName := app.String("node-name")
|
||||
if nodeName == "" {
|
||||
h, err := os.Hostname()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeName = h
|
||||
if len(app.Args()) > 0 {
|
||||
return cmds.ErrCommandNoArgs
|
||||
}
|
||||
|
||||
os.Setenv("NODE_NAME", nodeName)
|
||||
|
||||
var serverConfig server.Config
|
||||
serverConfig.DisableAgent = true
|
||||
serverConfig.ControlConfig.DataDir = dataDir
|
||||
|
@ -64,8 +74,10 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
|||
serverConfig.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
|
||||
|
||||
ctx := signals.SetupSignalHandler(context.Background())
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
|
||||
initialized, err := etcd.NewETCD().IsInitialized(ctx, &serverConfig.ControlConfig)
|
||||
initialized, err := e.IsInitialized(ctx, &serverConfig.ControlConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -87,3 +99,51 @@ func run(app *cli.Context, cfg *cmds.Server) error {
|
|||
|
||||
return cluster.Snapshot(ctx, &serverConfig.ControlConfig)
|
||||
}
|
||||
|
||||
func Delete(app *cli.Context) error {
|
||||
if err := cmds.InitLogging(); err != nil {
|
||||
return err
|
||||
}
|
||||
return delete(app, &cmds.ServerConfig)
|
||||
}
|
||||
|
||||
func delete(app *cli.Context, cfg *cmds.Server) error {
|
||||
dataDir, err := commandSetup(app, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapshots := app.Args()
|
||||
if len(snapshots) == 0 {
|
||||
return errors.New("no snapshots given for removal")
|
||||
}
|
||||
|
||||
var serverConfig server.Config
|
||||
serverConfig.DisableAgent = true
|
||||
serverConfig.ControlConfig.DataDir = dataDir
|
||||
serverConfig.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName
|
||||
serverConfig.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir
|
||||
serverConfig.ControlConfig.EtcdS3 = cfg.EtcdS3
|
||||
serverConfig.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint
|
||||
serverConfig.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA
|
||||
serverConfig.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify
|
||||
serverConfig.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey
|
||||
serverConfig.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey
|
||||
serverConfig.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName
|
||||
serverConfig.ControlConfig.EtcdS3Region = cfg.EtcdS3Region
|
||||
serverConfig.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder
|
||||
serverConfig.ControlConfig.Runtime = &config.ControlRuntime{}
|
||||
serverConfig.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig")
|
||||
|
||||
ctx := signals.SetupSignalHandler(context.Background())
|
||||
e := etcd.NewETCD()
|
||||
e.SetControlConfig(&serverConfig.ControlConfig)
|
||||
|
||||
sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
serverConfig.ControlConfig.Runtime.Core = sc.Core
|
||||
|
||||
return e.DeleteSnapshots(ctx, app.Args())
|
||||
}
|
||||
|
|
123
pkg/etcd/etcd.go
123
pkg/etcd/etcd.go
|
@ -100,6 +100,11 @@ func (e *ETCD) EndpointName() string {
|
|||
return "etcd"
|
||||
}
|
||||
|
||||
// SetControlConfig sets the given config on the etcd struct.
|
||||
func (e *ETCD) SetControlConfig(config *config.Control) {
|
||||
e.config = config
|
||||
}
|
||||
|
||||
// Test ensures that the local node is a voting member of the target cluster.
|
||||
// If it is still a learner or not a part of the cluster, an error is raised.
|
||||
func (e *ETCD) Test(ctx context.Context) error {
|
||||
|
@ -201,12 +206,8 @@ func (e *ETCD) Reset(ctx context.Context, rebootstrap func() error) error {
|
|||
// If asked to restore from a snapshot, do so
|
||||
if e.config.ClusterResetRestorePath != "" {
|
||||
if e.config.EtcdS3 {
|
||||
if e.s3 == nil {
|
||||
s3, err := newS3(ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.s3 = s3
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("Retrieving etcd snapshot %s from S3", e.config.ClusterResetRestorePath)
|
||||
if err := e.s3.download(ctx); err != nil {
|
||||
|
@ -836,12 +837,8 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error {
|
|||
|
||||
if e.config.EtcdS3 {
|
||||
logrus.Infof("Saving etcd snapshot %s to S3", snapshotName)
|
||||
if e.s3 == nil {
|
||||
s3, err := newS3(ctx, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.s3 = s3
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := e.s3.upload(ctx, snapshotPath); err != nil {
|
||||
return err
|
||||
|
@ -897,12 +894,8 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]snapsho
|
|||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if e.s3 == nil {
|
||||
s3, err := newS3(ctx, e.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.s3 = s3
|
||||
if err := e.initS3IfNil(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objects := e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, minio.ListObjectsOptions{})
|
||||
|
@ -963,6 +956,100 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]snapsho
|
|||
return snapshots, nil
|
||||
}
|
||||
|
||||
// initS3IfNil initializes the S3 client
|
||||
// if it hasn't yet been initialized.
|
||||
func (e *ETCD) initS3IfNil(ctx context.Context) error {
|
||||
if e.s3 == nil {
|
||||
s3, err := newS3(ctx, e.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.s3 = s3
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteSnapshots removes the given snapshots from
|
||||
// either local storage or S3.
|
||||
func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error {
|
||||
snapshotDir, err := snapshotDir(e.config)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get the snapshot dir")
|
||||
}
|
||||
|
||||
if e.config.EtcdS3 {
|
||||
logrus.Info("Removing the given etcd snapshot(s) from S3")
|
||||
logrus.Debugf("Removing the given etcd snapshot(s) from S3: %v", snapshots)
|
||||
|
||||
if e.initS3IfNil(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objectsCh := make(chan minio.ObjectInfo)
|
||||
defer close(objectsCh)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultS3OpTimeout)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
opts := minio.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
}
|
||||
|
||||
for obj := range e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, opts) {
|
||||
if obj.Err != nil {
|
||||
logrus.Error(obj.Err)
|
||||
return
|
||||
}
|
||||
|
||||
// iterate through the given snapshots and only
|
||||
// add them to the channel for remove if they're
|
||||
// actually found from the bucket listing.
|
||||
for _, snapshot := range snapshots {
|
||||
if snapshot == obj.Key {
|
||||
objectsCh <- obj
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logrus.Errorf("Unable to delete snapshot: %v", ctx.Err())
|
||||
return e.StoreSnapshotData(ctx)
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
continue
|
||||
case err, ok := <-e.s3.client.RemoveObjects(ctx, e.config.EtcdS3BucketName, objectsCh, minio.RemoveObjectsOptions{}):
|
||||
if err.Err != nil {
|
||||
logrus.Errorf("Unable to delete snapshot: %v", err.Err)
|
||||
}
|
||||
if !ok {
|
||||
return e.StoreSnapshotData(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Info("Removing the given locally stored etcd snapshot(s)")
|
||||
logrus.Debugf("Removing the given locally stored etcd snapshot(s): %v", snapshots)
|
||||
|
||||
for _, s := range snapshots {
|
||||
// check if the given snapshot exists. If it does,
|
||||
// remove it, otherwise continue.
|
||||
sf := filepath.Join(snapshotDir, s)
|
||||
if _, err := os.Stat(sf); os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
if err := os.Remove(sf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return e.StoreSnapshotData(ctx)
|
||||
}
|
||||
|
||||
// updateSnapshotData populates the given map with the contents of the given slice.
|
||||
func updateSnapshotData(data map[string]string, snapshotFiles []snapshotFile) error {
|
||||
for _, v := range snapshotFiles {
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
@ -22,6 +23,8 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const defaultS3OpTimeout = time.Second * 30
|
||||
|
||||
// s3 maintains state for S3 functionality.
|
||||
type s3 struct {
|
||||
config *config.Control
|
||||
|
@ -61,6 +64,10 @@ func newS3(ctx context.Context, config *config.Control) (*s3, error) {
|
|||
}
|
||||
|
||||
logrus.Infof("Checking if S3 bucket %s exists", config.EtcdS3BucketName)
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, defaultS3OpTimeout)
|
||||
defer cancel()
|
||||
|
||||
exists, err := c.BucketExists(ctx, config.EtcdS3BucketName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -87,11 +94,13 @@ func (s *s3) upload(ctx context.Context, snapshot string) error {
|
|||
snapshotFileName = basename
|
||||
}
|
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, defaultS3OpTimeout)
|
||||
defer cancel()
|
||||
opts := minio.PutObjectOptions{
|
||||
ContentType: "application/zip",
|
||||
NumThreads: 2,
|
||||
}
|
||||
if _, err := s.client.FPutObject(ctx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts); err != nil {
|
||||
if _, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts); err != nil {
|
||||
logrus.Errorf("Error received in attempt to upload snapshot to S3: %s", err)
|
||||
}
|
||||
|
||||
|
@ -109,7 +118,10 @@ func (s *s3) download(ctx context.Context) error {
|
|||
}
|
||||
|
||||
logrus.Debugf("retrieving snapshot: %s", remotePath)
|
||||
r, err := s.client.GetObject(ctx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{})
|
||||
toCtx, cancel := context.WithTimeout(ctx, defaultS3OpTimeout)
|
||||
defer cancel()
|
||||
|
||||
r, err := s.client.GetObject(toCtx, s.config.EtcdS3BucketName, remotePath, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -160,11 +172,14 @@ func (s *s3) snapshotPrefix() string {
|
|||
func (s *s3) snapshotRetention(ctx context.Context) error {
|
||||
var snapshotFiles []minio.ObjectInfo
|
||||
|
||||
toCtx, cancel := context.WithTimeout(ctx, defaultS3OpTimeout)
|
||||
defer cancel()
|
||||
|
||||
loo := minio.ListObjectsOptions{
|
||||
Recursive: true,
|
||||
Prefix: s.snapshotPrefix(),
|
||||
}
|
||||
for info := range s.client.ListObjects(ctx, s.config.EtcdS3BucketName, loo) {
|
||||
for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, loo) {
|
||||
if info.Err != nil {
|
||||
return info.Err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue