From fe465cc83253fbc9139418f8a2a6254d24af1888 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 28 Mar 2024 00:48:13 +0000 Subject: [PATCH] Move etcd snapshot management CLI to request/response Signed-off-by: Brad Davidson --- go.mod | 2 +- pkg/cli/cmds/etcd_snapshot.go | 8 + pkg/cli/etcdsnapshot/etcd_snapshot.go | 294 +++++++++--------- pkg/cli/secretsencrypt/secrets_encrypt.go | 1 - pkg/clientaccess/token.go | 80 ++++- pkg/cluster/managed/drivers.go | 9 +- pkg/configfilearg/defaultparser_test.go | 2 +- pkg/etcd/etcd.go | 8 + pkg/etcd/s3.go | 26 +- pkg/etcd/snapshot.go | 202 ++++++------ pkg/etcd/snapshot_handler.go | 221 +++++++++++++ pkg/server/{ => auth}/auth.go | 4 +- pkg/server/cert.go | 3 +- pkg/server/router.go | 10 +- pkg/server/secrets-encrypt.go | 29 +- pkg/server/token.go | 2 +- pkg/util/apierrors.go | 12 + tests/e2e/s3/s3_test.go | 26 +- .../etcdsnapshot/etcdsnapshot_int_test.go | 10 +- 19 files changed, 629 insertions(+), 320 deletions(-) create mode 100644 pkg/etcd/snapshot_handler.go rename pkg/server/{ => auth}/auth.go (94%) diff --git a/go.mod b/go.mod index c55484698d..6804c6cf7f 100644 --- a/go.mod +++ b/go.mod @@ -151,6 +151,7 @@ require ( k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/apiserver v0.29.3 + k8s.io/cli-runtime v0.22.2 k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible k8s.io/cloud-provider v0.29.3 k8s.io/cluster-bootstrap v0.0.0 @@ -485,7 +486,6 @@ require ( gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.3 // indirect - k8s.io/cli-runtime v0.22.2 // indirect k8s.io/code-generator v0.29.3 // indirect k8s.io/controller-manager v0.25.4 // indirect k8s.io/csi-translation-lib v0.0.0 // indirect diff --git a/pkg/cli/cmds/etcd_snapshot.go b/pkg/cli/cmds/etcd_snapshot.go index b0e373e840..c885031b3a 100644 --- a/pkg/cli/cmds/etcd_snapshot.go +++ b/pkg/cli/cmds/etcd_snapshot.go @@ -21,6 +21,14 @@ var EtcdSnapshotFlags = []cli.Flag{ Destination: &AgentConfig.NodeName, }, DataDirFlag, + ServerToken, + &cli.StringFlag{ + Name: "server, s", + Usage: "(cluster) Server to connect to", + EnvVar: version.ProgramUpper + "_URL", + Value: "https://127.0.0.1:6443", + Destination: &ServerConfig.ServerURL, + }, &cli.StringFlag{ Name: "dir,etcd-snapshot-dir", Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)", diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index 18f9959cab..93cb66c16a 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -1,143 +1,84 @@ package etcdsnapshot import ( - "context" + "bytes" "encoding/json" "fmt" - "net" "os" "path/filepath" + "slices" "sort" - "strings" "text/tabwriter" "time" "github.com/erikdubbelboer/gspt" + k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/cli/cmds" - daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/clientaccess" + "github.com/k3s-io/k3s/pkg/cluster/managed" "github.com/k3s-io/k3s/pkg/etcd" "github.com/k3s-io/k3s/pkg/server" - "github.com/k3s-io/k3s/pkg/util" util2 "github.com/k3s-io/k3s/pkg/util" "github.com/pkg/errors" - "github.com/rancher/wrangler/pkg/signals" + "github.com/sirupsen/logrus" "github.com/urfave/cli" - "gopkg.in/yaml.v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/printers" ) -type etcdCommand struct { - etcd *etcd.ETCD - ctx context.Context -} - // commandSetup setups up common things needed // for each etcd command. -func commandSetup(app *cli.Context, cfg *cmds.Server, config *server.Config) (*etcdCommand, error) { - ctx := signals.SetupSignalContext() - gspt.SetProcTitle(os.Args[0]) - - nodeName := app.String("node-name") - if nodeName == "" { - h, err := os.Hostname() - if err != nil { - return nil, err - } - nodeName = h +func commandSetup(app *cli.Context, cfg *cmds.Server) (*etcd.SnapshotRequest, *clientaccess.Info, error) { + // hide process arguments from ps output, since they may contain + // database credentials or other secrets. + gspt.SetProcTitle(os.Args[0] + " etcd-snapshot") + + sr := &etcd.SnapshotRequest{} + // Operation and name are set by the command handler. + // Compression, dir, and retention take the server defaults if not overridden on the CLI. + if app.IsSet("etcd-snapshot-compress") { + sr.Compress = &cfg.EtcdSnapshotCompress } - - os.Setenv("NODE_NAME", nodeName) - - dataDir, err := server.ResolveDataDir(cfg.DataDir) - if err != nil { - return nil, err - } - - config.DisableAgent = true - config.ControlConfig.DataDir = dataDir - config.ControlConfig.BindAddress = cfg.BindAddress - config.ControlConfig.EtcdSnapshotName = cfg.EtcdSnapshotName - config.ControlConfig.EtcdSnapshotDir = cfg.EtcdSnapshotDir - config.ControlConfig.EtcdSnapshotCompress = cfg.EtcdSnapshotCompress - config.ControlConfig.EtcdListFormat = strings.ToLower(cfg.EtcdListFormat) - config.ControlConfig.EtcdS3 = cfg.EtcdS3 - config.ControlConfig.EtcdS3Endpoint = cfg.EtcdS3Endpoint - config.ControlConfig.EtcdS3EndpointCA = cfg.EtcdS3EndpointCA - config.ControlConfig.EtcdS3SkipSSLVerify = cfg.EtcdS3SkipSSLVerify - config.ControlConfig.EtcdS3AccessKey = cfg.EtcdS3AccessKey - config.ControlConfig.EtcdS3SecretKey = cfg.EtcdS3SecretKey - config.ControlConfig.EtcdS3BucketName = cfg.EtcdS3BucketName - config.ControlConfig.EtcdS3Region = cfg.EtcdS3Region - config.ControlConfig.EtcdS3Folder = cfg.EtcdS3Folder - config.ControlConfig.EtcdS3Insecure = cfg.EtcdS3Insecure - config.ControlConfig.EtcdS3Timeout = cfg.EtcdS3Timeout - config.ControlConfig.Runtime = daemonconfig.NewRuntime(nil) - config.ControlConfig.Runtime.ETCDServerCA = filepath.Join(dataDir, "tls", "etcd", "server-ca.crt") - config.ControlConfig.Runtime.ClientETCDCert = filepath.Join(dataDir, "tls", "etcd", "client.crt") - config.ControlConfig.Runtime.ClientETCDKey = filepath.Join(dataDir, "tls", "etcd", "client.key") - config.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") - - // We need to go through defaulting of cluster addresses to ensure that the etcd config for the standalone - // command uses the same endpoint selection logic as it does when starting up the full server. Specifically, - // we need to set an IPv6 service CIDR on IPv6-only or IPv6-first nodes, as the etcd default endpoints check - // the service CIDR primary addresss family to determine what loopback address to use. - nodeName, nodeIPs, err := util.GetHostnameAndIPs(cmds.AgentConfig.NodeName, cmds.AgentConfig.NodeIP) - if err != nil { - return nil, err + if app.IsSet("etcd-snapshot-dir") { + sr.Dir = &cfg.EtcdSnapshotDir } - config.ControlConfig.ServerNodeName = nodeName - - // configure ClusterIPRanges. Use default 10.42.0.0/16 or fd00:42::/56 if user did not set it - _, defaultClusterCIDR, defaultServiceCIDR, _ := util.GetDefaultAddresses(nodeIPs[0]) - if len(cfg.ClusterCIDR) == 0 { - cfg.ClusterCIDR.Set(defaultClusterCIDR) + if app.IsSet("etcd-snapshot-retention") { + sr.Retention = &cfg.EtcdSnapshotRetention } - for _, cidr := range util.SplitStringSlice(cfg.ClusterCIDR) { - _, parsed, err := net.ParseCIDR(cidr) - if err != nil { - return nil, errors.Wrapf(err, "invalid cluster-cidr %s", cidr) - } - config.ControlConfig.ClusterIPRanges = append(config.ControlConfig.ClusterIPRanges, parsed) - } - - // set ClusterIPRange to the first address (first defined IPFamily is preferred) - config.ControlConfig.ClusterIPRange = config.ControlConfig.ClusterIPRanges[0] - - // configure ServiceIPRanges. Use default 10.43.0.0/16 or fd00:43::/112 if user did not set it - if len(cfg.ServiceCIDR) == 0 { - cfg.ServiceCIDR.Set(defaultServiceCIDR) - } - for _, cidr := range util.SplitStringSlice(cfg.ServiceCIDR) { - _, parsed, err := net.ParseCIDR(cidr) - if err != nil { - return nil, errors.Wrapf(err, "invalid service-cidr %s", cidr) - } - config.ControlConfig.ServiceIPRanges = append(config.ControlConfig.ServiceIPRanges, parsed) - } - - // set ServiceIPRange to the first address (first defined IPFamily is preferred) - config.ControlConfig.ServiceIPRange = config.ControlConfig.ServiceIPRanges[0] - e := etcd.NewETCD() - if err := e.SetControlConfig(&config.ControlConfig); err != nil { - return nil, err + if cfg.EtcdS3 { + sr.S3 = &etcd.SnapshotRequestS3{} + sr.S3.AccessKey = cfg.EtcdS3AccessKey + sr.S3.Bucket = cfg.EtcdS3BucketName + sr.S3.Endpoint = cfg.EtcdS3Endpoint + sr.S3.EndpointCA = cfg.EtcdS3EndpointCA + sr.S3.Folder = cfg.EtcdS3Folder + sr.S3.Insecure = cfg.EtcdS3Insecure + sr.S3.Region = cfg.EtcdS3Region + sr.S3.SecretKey = cfg.EtcdS3SecretKey + sr.S3.SkipSSLVerify = cfg.EtcdS3SkipSSLVerify + sr.S3.Timeout = metav1.Duration{Duration: cfg.EtcdS3Timeout} } - initialized, err := e.IsInitialized() + dataDir, err := server.ResolveDataDir(cfg.DataDir) if err != nil { - return nil, err - } - if !initialized { - return nil, fmt.Errorf("etcd database not found in %s", config.ControlConfig.DataDir) + return nil, nil, err } - sc, err := server.NewContext(ctx, config, false) - if err != nil { - return nil, err + if cfg.Token == "" { + fp := filepath.Join(dataDir, "token") + tokenByte, err := os.ReadFile(fp) + if err != nil { + return nil, nil, err + } + cfg.Token = string(bytes.TrimRight(tokenByte, "\n")) } - config.ControlConfig.Runtime.K3s = sc.K3s - config.ControlConfig.Runtime.Core = sc.Core + info, err := clientaccess.ParseAndValidateToken(cmds.ServerConfig.ServerURL, cfg.Token, clientaccess.WithUser("server")) + return sr, info, err +} - return &etcdCommand{etcd: e, ctx: ctx}, nil +func wrapServerError(err error) error { + return errors.Wrap(err, "see server log for details") } // Save triggers an on-demand etcd snapshot operation @@ -149,20 +90,40 @@ func Save(app *cli.Context) error { } func save(app *cli.Context, cfg *cmds.Server) error { - var serverConfig server.Config - if len(app.Args()) > 0 { return util2.ErrCommandNoArgs } - ec, err := commandSetup(app, cfg, &serverConfig) + // Save always sets retention to 0 to disable automatic pruning. + // Prune can be run manually after save, if desired. + app.Set("etcd-snapshot-retention", "0") + + sr, info, err := commandSetup(app, cfg) + if err != nil { + return err + } + + sr.Operation = etcd.SnapshotOperationSave + sr.Name = []string{cfg.EtcdSnapshotName} + + b, err := json.Marshal(sr) + if err != nil { + return err + } + r, err := info.Post("/db/snapshot", b) if err != nil { + return wrapServerError(err) + } + resp := &managed.SnapshotResult{} + if err := json.Unmarshal(r, resp); err != nil { return err } - serverConfig.ControlConfig.EtcdSnapshotRetention = 0 // disable retention check + for _, name := range resp.Created { + logrus.Infof("Snapshot %s saved.", name) + } - return ec.etcd.Snapshot(ec.ctx) + return nil } func Delete(app *cli.Context) error { @@ -173,19 +134,42 @@ func Delete(app *cli.Context) error { } func delete(app *cli.Context, cfg *cmds.Server) error { - var serverConfig server.Config + snapshots := app.Args() + if len(snapshots) == 0 { + return errors.New("no snapshots given for removal") + } - ec, err := commandSetup(app, cfg, &serverConfig) + sr, info, err := commandSetup(app, cfg) if err != nil { return err } - snapshots := app.Args() - if len(snapshots) == 0 { - return errors.New("no snapshots given for removal") + sr.Operation = etcd.SnapshotOperationDelete + sr.Name = snapshots + + b, err := json.Marshal(sr) + if err != nil { + return err + } + r, err := info.Post("/db/snapshot", b) + if err != nil { + return wrapServerError(err) + } + resp := &managed.SnapshotResult{} + if err := json.Unmarshal(r, resp); err != nil { + return err + } + + for _, name := range resp.Deleted { + logrus.Infof("Snapshot %s deleted.", name) + } + for _, name := range snapshots { + if !slices.Contains(resp.Deleted, name) { + logrus.Warnf("Snapshot %s not found.", name) + } } - return ec.etcd.DeleteSnapshots(ec.ctx, app.Args()) + return nil } func List(app *cli.Context) error { @@ -207,30 +191,48 @@ func validEtcdListFormat(format string) bool { } func list(app *cli.Context, cfg *cmds.Server) error { - var serverConfig server.Config + if cfg.EtcdListFormat != "" && !validEtcdListFormat(cfg.EtcdListFormat) { + return errors.New("invalid output format: " + cfg.EtcdListFormat) + } - ec, err := commandSetup(app, cfg, &serverConfig) + sr, info, err := commandSetup(app, cfg) if err != nil { return err } - sf, err := ec.etcd.ListSnapshots(ec.ctx) + sr.Operation = etcd.SnapshotOperationList + + b, err := json.Marshal(sr) if err != nil { return err } + r, err := info.Post("/db/snapshot", b) + if err != nil { + return wrapServerError(err) + } - if cfg.EtcdListFormat != "" && !validEtcdListFormat(cfg.EtcdListFormat) { - return errors.New("invalid output format: " + cfg.EtcdListFormat) + sf := &k3s.ETCDSnapshotFileList{} + if err := json.Unmarshal(r, sf); err != nil { + return err } + sort.Slice(sf.Items, func(i, j int) bool { + if sf.Items[i].Status.CreationTime.Equal(sf.Items[j].Status.CreationTime) { + return sf.Items[i].Spec.SnapshotName < sf.Items[j].Spec.SnapshotName + } + return sf.Items[i].Status.CreationTime.Before(sf.Items[j].Status.CreationTime) + }) + switch cfg.EtcdListFormat { case "json": - if err := json.NewEncoder(os.Stdout).Encode(sf); err != nil { + json := printers.JSONPrinter{} + if err := json.PrintObj(sf, os.Stdout); err != nil { return err } return nil case "yaml": - if err := yaml.NewEncoder(os.Stdout).Encode(sf); err != nil { + yaml := printers.YAMLPrinter{} + if err := yaml.PrintObj(sf, os.Stdout); err != nil { return err } return nil @@ -238,23 +240,9 @@ func list(app *cli.Context, cfg *cmds.Server) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) defer w.Flush() - // Sort snapshots by creation time and key - sfKeys := make([]string, 0, len(sf)) - for k := range sf { - sfKeys = append(sfKeys, k) - } - sort.Slice(sfKeys, func(i, j int) bool { - iKey := sfKeys[i] - jKey := sfKeys[j] - if sf[iKey].CreatedAt.Equal(sf[jKey].CreatedAt) { - return iKey < jKey - } - return sf[iKey].CreatedAt.Before(sf[jKey].CreatedAt) - }) - fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") - for _, k := range sfKeys { - fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", sf[k].Name, sf[k].Location, sf[k].Size, sf[k].CreatedAt.Format(time.RFC3339)) + for _, esf := range sf.Items { + fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", esf.Spec.SnapshotName, esf.Spec.Location, esf.Status.Size.Value(), esf.Status.CreationTime.Format(time.RFC3339)) } } @@ -269,14 +257,30 @@ func Prune(app *cli.Context) error { } func prune(app *cli.Context, cfg *cmds.Server) error { - var serverConfig server.Config + sr, info, err := commandSetup(app, cfg) + if err != nil { + return err + } - ec, err := commandSetup(app, cfg, &serverConfig) + sr.Operation = etcd.SnapshotOperationPrune + sr.Name = []string{cfg.EtcdSnapshotName} + + b, err := json.Marshal(sr) + if err != nil { + return err + } + r, err := info.Post("/db/snapshot", b) if err != nil { + return wrapServerError(err) + } + resp := &managed.SnapshotResult{} + if err := json.Unmarshal(r, resp); err != nil { return err } - serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention + for _, name := range resp.Deleted { + logrus.Infof("Snapshot %s deleted.", name) + } - return ec.etcd.PruneSnapshots(ec.ctx) + return nil } diff --git a/pkg/cli/secretsencrypt/secrets_encrypt.go b/pkg/cli/secretsencrypt/secrets_encrypt.go index d43271b1fe..b0bd852501 100644 --- a/pkg/cli/secretsencrypt/secrets_encrypt.go +++ b/pkg/cli/secretsencrypt/secrets_encrypt.go @@ -66,7 +66,6 @@ func Enable(app *cli.Context) error { } func Disable(app *cli.Context) error { - if err := cmds.InitLogging(); err != nil { return err } diff --git a/pkg/clientaccess/token.go b/pkg/clientaccess/token.go index 73bc6e7322..2995d67507 100644 --- a/pkg/clientaccess/token.go +++ b/pkg/clientaccess/token.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/hex" + "encoding/json" "fmt" "io" "net/http" @@ -18,6 +19,9 @@ import ( "github.com/pkg/errors" certutil "github.com/rancher/dynamiclistener/cert" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" ) const ( @@ -291,7 +295,6 @@ func (i *Info) Get(path string, option ...ClientOption) ([]byte, error) { // Put makes a request to a subpath of info's BaseURL func (i *Info) Put(path string, body []byte, option ...ClientOption) error { - u, err := url.Parse(i.BaseURL) if err != nil { return err @@ -305,6 +308,21 @@ func (i *Info) Put(path string, body []byte, option ...ClientOption) error { return put(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token()) } +// Post makes a request to a subpath of info's BaseURL +func (i *Info) Post(path string, body []byte, option ...ClientOption) ([]byte, error) { + u, err := url.Parse(i.BaseURL) + if err != nil { + return nil, err + } + p, err := url.Parse(path) + if err != nil { + return nil, err + } + p.Scheme = u.Scheme + p.Host = u.Host + return post(p.String(), body, GetHTTPClient(i.CACerts, i.CertFile, i.KeyFile, option...), i.Username, i.Password, i.Token()) +} + // setServer sets the BaseURL and CACerts fields of the Info by connecting to the server // and storing the CA bundle. func (i *Info) setServer(server string) error { @@ -400,13 +418,8 @@ func get(u string, client *http.Client, username, password, token string) ([]byt if err != nil { return nil, err } - defer resp.Body.Close() - - if resp.StatusCode < 200 || resp.StatusCode > 299 { - return nil, fmt.Errorf("%s: %s", u, resp.Status) - } - return io.ReadAll(resp.Body) + return readBody(resp) } // put makes a request to a url using a provided client and credentials, @@ -427,14 +440,59 @@ func put(u string, body []byte, client *http.Client, username, password, token s if err != nil { return err } + + _, err = readBody(resp) + return err +} + +// post makes a request to a url using a provided client and credentials, +// returning the response body and error. +func post(u string, body []byte, client *http.Client, username, password, token string) ([]byte, error) { + req, err := http.NewRequest(http.MethodPost, u, bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + + if token != "" { + req.Header.Add("Authorization", "Bearer "+token) + } else if username != "" { + req.SetBasicAuth(username, password) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + return readBody(resp) +} + +// readBody attempts to get the body from the response. If the response status +// code is not in the 2XX range, an error is returned. An attempt is made to +// decode the error body as a metav1.Status and return a StatusError, if +// possible. +func readBody(resp *http.Response) ([]byte, error) { defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } - respBody, _ := io.ReadAll(resp.Body) - if resp.StatusCode < 200 || resp.StatusCode > 299 { - return fmt.Errorf("%s: %s %s", u, resp.Status, string(respBody)) + warnings, _ := net.ParseWarningHeaders(resp.Header["Warning"]) + for _, warning := range warnings { + if warning.Code == 299 && len(warning.Text) != 0 { + logrus.Warnf(warning.Text) + } } - return nil + if resp.StatusCode < 200 || resp.StatusCode > 299 { + status := metav1.Status{} + if err := json.Unmarshal(b, &status); err == nil && status.Kind == "Status" { + return nil, &apierrors.StatusError{ErrStatus: status} + } + return nil, fmt.Errorf("%s: %s", resp.Request.URL, resp.Status) + } + return b, nil } // FormatToken takes a username:password string or join token, and a path to a certificate bundle, and diff --git a/pkg/cluster/managed/drivers.go b/pkg/cluster/managed/drivers.go index 83f54f7519..421e7509f8 100644 --- a/pkg/cluster/managed/drivers.go +++ b/pkg/cluster/managed/drivers.go @@ -23,7 +23,7 @@ type Driver interface { Test(ctx context.Context) error Restore(ctx context.Context) error EndpointName() string - Snapshot(ctx context.Context) error + Snapshot(ctx context.Context) (*SnapshotResult, error) ReconcileSnapshotData(ctx context.Context) error GetMembersClientURLs(ctx context.Context) ([]string, error) RemoveSelf(ctx context.Context) error @@ -40,3 +40,10 @@ func Registered() []Driver { func Default() Driver { return drivers[0] } + +// SnapshotResult is returned by the Snapshot function, +// and lists the names of created and deleted snapshots. +type SnapshotResult struct { + Created []string `json:"created,omitempty"` + Deleted []string `json:"deleted,omitempty"` +} diff --git a/pkg/configfilearg/defaultparser_test.go b/pkg/configfilearg/defaultparser_test.go index 8ae8decc26..e43a0c1545 100644 --- a/pkg/configfilearg/defaultparser_test.go +++ b/pkg/configfilearg/defaultparser_test.go @@ -48,7 +48,7 @@ func Test_UnitMustParse(t *testing.T) { name: "Etcd-snapshot with config with known and unknown flags", args: []string{"k3s", "etcd-snapshot", "save"}, config: "./testdata/defaultdata.yaml", - want: []string{"k3s", "etcd-snapshot", "save", "--etcd-s3=true", "--etcd-s3-bucket=my-backup"}, + want: []string{"k3s", "etcd-snapshot", "save", "--token=12345", "--etcd-s3=true", "--etcd-s3-bucket=my-backup"}, }, { name: "Agent with known flags", diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index d54004dc42..603828a94b 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -25,6 +25,7 @@ import ( "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/control/deps" "github.com/k3s-io/k3s/pkg/daemons/executor" + "github.com/k3s-io/k3s/pkg/server/auth" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/k3s-io/kine/pkg/client" @@ -664,7 +665,9 @@ func (e *ETCD) setName(force bool) error { // handler wraps the handler with routes for database info func (e *ETCD) handler(next http.Handler) http.Handler { mux := mux.NewRouter().SkipClean(true) + mux.Use(auth.Middleware(e.config, version.Program+":server")) mux.Handle("/db/info", e.infoHandler()) + mux.Handle("/db/snapshot", e.snapshotHandler()) mux.NotFoundHandler = next return mux } @@ -673,6 +676,11 @@ func (e *ETCD) handler(next http.Handler) http.Handler { // If we can't retrieve an actual MemberList from etcd, we return a canned response with only the local node listed. func (e *ETCD) infoHandler() http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + util.SendError(fmt.Errorf("method not allowed"), rw, req, http.StatusMethodNotAllowed) + return + } + ctx, cancel := context.WithTimeout(req.Context(), 2*time.Second) defer cancel() diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index f90a28162d..6af4c4d27a 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -93,7 +93,7 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { exists, err := c.BucketExists(ctx, config.EtcdS3BucketName) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to test for existence of bucket %s", config.EtcdS3BucketName) } if !exists { return nil, fmt.Errorf("bucket %s does not exist", config.EtcdS3BucketName) @@ -280,9 +280,10 @@ func (s *S3) snapshotPrefix() string { } // snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node. -func (s *S3) snapshotRetention(ctx context.Context) error { +// Returns a list of pruned snapshot names. +func (s *S3) snapshotRetention(ctx context.Context) ([]string, error) { if s.config.EtcdSnapshotRetention < 1 { - return nil + return nil, nil } logrus.Infof("Applying snapshot retention=%d to snapshots stored in s3://%s/%s", s.config.EtcdSnapshotRetention, s.config.EtcdS3BucketName, s.snapshotPrefix()) @@ -297,7 +298,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error { } for info := range s.client.ListObjects(toCtx, s.config.EtcdS3BucketName, opts) { if info.Err != nil { - return info.Err + return nil, info.Err } // skip metadata @@ -309,7 +310,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error { } if len(snapshotFiles) <= s.config.EtcdSnapshotRetention { - return nil + return nil, nil } // sort newest-first so we can prune entries past the retention count @@ -317,21 +318,16 @@ func (s *S3) snapshotRetention(ctx context.Context) error { return snapshotFiles[j].LastModified.Before(snapshotFiles[i].LastModified) }) + deleted := []string{} for _, df := range snapshotFiles[s.config.EtcdSnapshotRetention:] { logrus.Infof("Removing S3 snapshot: s3://%s/%s", s.config.EtcdS3BucketName, df.Key) - if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { - return err - } - metadataKey := path.Join(path.Dir(df.Key), metadataDir, path.Base(df.Key)) - if err := s.client.RemoveObject(toCtx, s.config.EtcdS3BucketName, metadataKey, minio.RemoveObjectOptions{}); err != nil { - if isNotExist(err) { - return nil - } - return err + if err := s.deleteSnapshot(ctx, df.Key); err != nil { + return deleted, err } + deleted = append(deleted, df.Key) } - return nil + return deleted, nil } func (s *S3) deleteSnapshot(ctx context.Context, key string) error { diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 899ffe20c0..664352eab0 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -19,7 +19,8 @@ import ( "strings" "time" - apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + "github.com/k3s-io/k3s/pkg/cluster/managed" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" @@ -27,10 +28,7 @@ import ( "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/etcdutl/v3/snapshot" - "go.uber.org/zap" "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -204,14 +202,17 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err } // Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed -// snapshots in excess of the retention limits. This method is used in the internal cron snapshot -// system as well as used to do on-demand snapshots. -func (e *ETCD) Snapshot(ctx context.Context) error { +// snapshots in excess of the retention limits. Note that one snapshot request may result in creation and pruning +// of multiple snapshots, if S3 is enabled. +// Note that the prune step is generally disabled when snapshotting from the CLI, as there is a separate +// subcommand for prune that can be run manually if the user wants to remove old snapshots. +// Returns metadata about the new and pruned snapshots. +func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) { if err := e.preSnapshotSetup(ctx); err != nil { - return err + return nil, err } if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) { - return fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots) + return nil, fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots) } defer e.snapshotSem.Release(maxConcurrentSnapshots) @@ -230,61 +231,40 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } endpoints := getEndpoints(e.config) - var client *clientv3.Client - var err error - - // Use the internal client if possible, or create a new one - // if run from the CLI. - if e.client != nil { - client = e.client - } else { - client, err = getClient(ctx, e.config, endpoints...) - if err != nil { - return err - } - defer client.Close() - } - - status, err := client.Status(ctx, endpoints[0]) + status, err := e.client.Status(ctx, endpoints[0]) if err != nil { - return errors.Wrap(err, "failed to check etcd status for snapshot") + return nil, errors.Wrap(err, "failed to check etcd status for snapshot") } if status.IsLearner { logrus.Warnf("Unable to take snapshot: not supported for learner") - return nil + return nil, nil } snapshotDir, err := snapshotDir(e.config, true) if err != nil { - return errors.Wrap(err, "failed to get etcd-snapshot-dir") + return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir") } cfg, err := getClientConfig(ctx, e.config) if err != nil { - return errors.Wrap(err, "failed to get config for etcd snapshot") + return nil, errors.Wrap(err, "failed to get config for etcd snapshot") } tokenHash, err := util.GetTokenHash(e.config) if err != nil { - return errors.Wrap(err, "failed to get server token hash for etcd snapshot") + return nil, errors.Wrap(err, "failed to get server token hash for etcd snapshot") } nodeName := os.Getenv("NODE_NAME") now := time.Now().Round(time.Second) snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix()) snapshotPath := filepath.Join(snapshotDir, snapshotName) - logrus.Infof("Saving etcd snapshot to %s", snapshotPath) var sf *snapshotFile - lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel) - if err != nil { - return err - } - - if err := snapshot.NewV3(lg).Save(ctx, *cfg, snapshotPath); err != nil { + if err := snapshot.NewV3(e.client.GetLogger()).Save(ctx, *cfg, snapshotPath); err != nil { sf = &snapshotFile{ Name: snapshotName, Location: "", @@ -299,19 +279,23 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to sync ETCDSnapshotFile") + return nil, errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } + res := &managed.SnapshotResult{} // If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message. if sf == nil { if e.config.EtcdSnapshotCompress { zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) - if err != nil { - return errors.Wrap(err, "failed to compress snapshot") + + // ensure that the unncompressed snapshot is cleaned up even if compression fails + if err := os.Remove(snapshotPath); err != nil && !os.IsNotExist(err) { + logrus.Warnf("Failed to remove uncompress snapshot file: %v", err) } - if err := os.Remove(snapshotPath); err != nil { - return errors.Wrap(err, "failed to remove uncompressed snapshot") + + if err != nil { + return nil, errors.Wrap(err, "failed to compress snapshot") } snapshotPath = zipPath logrus.Info("Compressed snapshot: " + snapshotPath) @@ -319,8 +303,9 @@ func (e *ETCD) Snapshot(ctx context.Context) error { f, err := os.Stat(snapshotPath) if err != nil { - return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") + return nil, errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") } + sf = &snapshotFile{ Name: f.Name(), Location: "file://" + snapshotPath, @@ -334,24 +319,31 @@ func (e *ETCD) Snapshot(ctx context.Context) error { metadataSource: extraMetadata, tokenHash: tokenHash, } + res.Created = append(res.Created, sf.Name) + // Failing to save snapshot metadata is not fatal, the snapshot can still be used without it. if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil { - return errors.Wrap(err, "failed to save local snapshot metadata") + logrus.Warnf("Failed to save local snapshot metadata: %v", err) } + // If this fails, just log an error - the snapshot file will remain on disk + // and will be recorded next time the snapshot list is reconciled. if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to sync ETCDSnapshotFile") + logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err) } - if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { - return errors.Wrap(err, "failed to apply local snapshot retention policy") + // Snapshot retention may prune some files before returning an error. Failing to prune is not fatal. + deleted, err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir) + if err != nil { + logrus.Warnf("Failed to apply local snapshot retention policy: %v", err) } + res.Deleted = append(res.Deleted, deleted...) if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ - Name: filepath.Base(snapshotPath), + Name: f.Name(), NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, @@ -378,22 +370,28 @@ func (e *ETCD) Snapshot(ctx context.Context) error { if err != nil { logrus.Errorf("Error received during snapshot upload to S3: %s", err) } else { + res.Created = append(res.Created, sf.Name) logrus.Infof("S3 upload complete for %s", snapshotName) } // Attempt to apply retention even if the upload failed; failure may be due to bucket // being full or some other condition that retention policy would resolve. - if err := e.s3.snapshotRetention(ctx); err != nil { - logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) + // Snapshot retention may prune some files before returning an error. Failing to prune is not fatal. + deleted, err := e.s3.snapshotRetention(ctx) + res.Deleted = append(res.Deleted, deleted...) + if err != nil { + logrus.Warnf("Failed to apply s3 snapshot retention policy: %v", err) } } - // sf is either s3 snapshot metadata, or s3 failure record + // sf is either s3 snapshot metadata, or s3 init/upload failure record. + // If this fails, just log an error - the snapshot file will remain on s3 + // and will be recorded next time the snapshot list is reconciled. if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to sync ETCDSnapshotFile") + logrus.Warnf("Failed to sync ETCDSnapshotFile: %v", err) } } } - return e.ReconcileSnapshotData(ctx) + return res, e.ReconcileSnapshotData(ctx) } type s3Config struct { @@ -492,7 +490,7 @@ func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { // initS3IfNil initializes the S3 client // if it hasn't yet been initialized. func (e *ETCD) initS3IfNil(ctx context.Context) error { - if e.s3 == nil { + if e.config.EtcdS3 && e.s3 == nil { s3, err := NewS3(ctx, e.config) if err != nil { return err @@ -503,15 +501,20 @@ func (e *ETCD) initS3IfNil(ctx context.Context) error { return nil } -// PruneSnapshots performs a retention run with the given -// retention duration and removes expired snapshots. -func (e *ETCD) PruneSnapshots(ctx context.Context) error { +// PruneSnapshots deleted old snapshots in excess of the configured retention count. +// Returns a list of deleted snapshots. Note that snapshots may be deleted +// with a non-nil error return. +func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, error) { snapshotDir, err := snapshotDir(e.config, false) if err != nil { - return errors.Wrap(err, "failed to get etcd-snapshot-dir") + return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir") } - if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { + res := &managed.SnapshotResult{} + // Note that snapshotRetention functions may return a list of deleted files, as well as + // an error, if some snapshots are deleted before the error is encountered. + res.Deleted, err = snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir) + if err != nil { logrus.Errorf("Error applying snapshot retention policy: %v", err) } @@ -519,18 +522,24 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) error { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) } else { - if err := e.s3.snapshotRetention(ctx); err != nil { + deleted, err := e.s3.snapshotRetention(ctx) + if err != nil { logrus.Errorf("Error applying S3 snapshot retention policy: %v", err) } + res.Deleted = append(res.Deleted, deleted...) } } - return e.ReconcileSnapshotData(ctx) + return res, e.ReconcileSnapshotData(ctx) } -// ListSnapshots is an exported wrapper method that wraps an -// unexported method of the same name. -func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) { - snapshotFiles := map[string]snapshotFile{} +// ListSnapshots returns a list of snapshots. Local snapshots are always listed, +// s3 snapshots are listed if s3 is enabled. +// Snapshots are listed locally, not listed from the apiserver, so results +// are guaranteed to be in sync with what is on disk. +func (e *ETCD) ListSnapshots(ctx context.Context) (*k3s.ETCDSnapshotFileList, error) { + snapshotFiles := &k3s.ETCDSnapshotFileList{ + TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "List"}, + } if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) @@ -540,7 +549,11 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro if err != nil { return nil, err } - snapshotFiles = sfs + for k, sf := range sfs { + esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{}) + sf.toETCDSnapshotFile(esf) + snapshotFiles.Items = append(snapshotFiles.Items, *esf) + } } sfs, err := e.listLocalSnapshots() @@ -548,25 +561,30 @@ func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, erro return nil, err } for k, sf := range sfs { - snapshotFiles[k] = sf + esf := k3s.NewETCDSnapshotFile("", k, k3s.ETCDSnapshotFile{}) + sf.toETCDSnapshotFile(esf) + snapshotFiles.Items = append(snapshotFiles.Items, *esf) } - return snapshotFiles, err + return snapshotFiles, nil } // DeleteSnapshots removes the given snapshots from local storage and S3. -func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { +// Returns a list of deleted snapshots. Note that snapshots may be deleted +// with a non-nil error return. +func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*managed.SnapshotResult, error) { snapshotDir, err := snapshotDir(e.config, false) if err != nil { - return errors.Wrap(err, "failed to get etcd-snapshot-dir") + return nil, errors.Wrap(err, "failed to get etcd-snapshot-dir") } if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) - return err + return nil, err } } + res := &managed.SnapshotResult{} for _, s := range snapshots { if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil { if isNotExist(err) { @@ -575,6 +593,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { logrus.Errorf("Failed to delete local snapshot %s: %v", s, err) } } else { + res.Deleted = append(res.Deleted, s) logrus.Infof("Snapshot %s deleted locally", s) } @@ -586,12 +605,13 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err) } } else { + res.Deleted = append(res.Deleted, s) logrus.Infof("Snapshot %s deleted from S3", s) } } } - return e.ReconcileSnapshotData(ctx) + return res, e.ReconcileSnapshotData(ctx) } func (e *ETCD) deleteSnapshot(snapshotPath string) error { @@ -631,7 +651,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error { snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() esfName := generateSnapshotName(sf) - var esf *apisv1.ETCDSnapshotFile + var esf *k3s.ETCDSnapshotFile return retry.OnError(snapshotDataBackoff, func(err error) bool { return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) }, func() (err error) { @@ -641,7 +661,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error { if !apierrors.IsNotFound(err) { return err } - esf = &apisv1.ETCDSnapshotFile{ + esf = &k3s.ETCDSnapshotFile{ ObjectMeta: metav1.ObjectMeta{ Name: esfName, }, @@ -654,7 +674,7 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error { // create or update as necessary if esf.CreationTimestamp.IsZero() { - var created *apisv1.ETCDSnapshotFile + var created *k3s.ETCDSnapshotFile created, err = snapshots.Create(esf) if err == nil { // Only emit an event for the snapshot when creating the resource @@ -707,7 +727,7 @@ func generateSnapshotName(sf snapshotFile) string { // generateETCDSnapshotFileConfigMapKey generates a key that the corresponding // snapshotFile would be stored under in the legacy configmap -func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string { +func generateETCDSnapshotFileConfigMapKey(esf k3s.ETCDSnapshotFile) string { name := invalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_") if esf.Spec.S3 != nil { return "s3-" + name @@ -715,7 +735,7 @@ func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string { return "local-" + name } -func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) { +func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) { switch { case e.config.Runtime.Event == nil: case !esf.DeletionTimestamp.IsZero(): @@ -923,17 +943,17 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) { // having all the nodes take a snapshot at the exact same time can lead to excessive retry thrashing // when updating the snapshot list configmap. time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax))) - if err := e.Snapshot(ctx); err != nil { + if _, err := e.Snapshot(ctx); err != nil { logrus.Errorf("Failed to take scheduled snapshot: %v", err) } }))) } // snapshotRetention iterates through the snapshots and removes the oldest -// leaving the desired number of snapshots. -func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) error { +// leaving the desired number of snapshots. Returns a list of pruned snapshot names. +func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) ([]string, error) { if retention < 1 { - return nil + return nil, nil } logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir) @@ -953,10 +973,10 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) } return nil }); err != nil { - return err + return nil, err } if len(snapshotFiles) <= retention { - return nil + return nil, nil } // sort newest-first so we can prune entries past the retention count @@ -964,19 +984,21 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) }) + deleted := []string{} for _, df := range snapshotFiles[retention:] { snapshotPath := filepath.Join(snapshotDir, df.Name) metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name) logrus.Infof("Removing local snapshot %s", snapshotPath) if err := os.Remove(snapshotPath); err != nil { - return err + return deleted, err } if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) { - return err + return deleted, err } + deleted = append(deleted, df.Name) } - return nil + return deleted, nil } func isNotExist(err error) bool { @@ -1007,7 +1029,7 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro return os.WriteFile(metadataPath, m, 0700) } -func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { +func (sf *snapshotFile) fromETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) { if esf == nil { panic("cannot convert from nil ETCDSnapshotFile") } @@ -1067,7 +1089,7 @@ func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { } } -func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { +func (sf *snapshotFile) toETCDSnapshotFile(esf *k3s.ETCDSnapshotFile) { if esf == nil { panic("cannot convert to nil ETCDSnapshotFile") } @@ -1092,7 +1114,7 @@ func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { } else { message = string(b) } - esf.Status.Error = &apisv1.ETCDSnapshotError{ + esf.Status.Error = &k3s.ETCDSnapshotError{ Time: sf.CreatedAt, Message: &message, } @@ -1127,7 +1149,7 @@ func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName } else { esf.ObjectMeta.Labels[labelStorageNode] = "s3" - esf.Spec.S3 = &apisv1.ETCDSnapshotS3{ + esf.Spec.S3 = &k3s.ETCDSnapshotS3{ Endpoint: sf.S3.Endpoint, EndpointCA: sf.S3.EndpointCA, SkipSSLVerify: sf.S3.SkipSSLVerify, diff --git a/pkg/etcd/snapshot_handler.go b/pkg/etcd/snapshot_handler.go new file mode 100644 index 0000000000..6532f42c0f --- /dev/null +++ b/pkg/etcd/snapshot_handler.go @@ -0,0 +1,221 @@ +package etcd + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + k3s "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + "github.com/k3s-io/k3s/pkg/cluster/managed" + "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/util" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type SnapshotOperation string + +const ( + SnapshotOperationSave SnapshotOperation = "save" + SnapshotOperationList SnapshotOperation = "list" + SnapshotOperationPrune SnapshotOperation = "prune" + SnapshotOperationDelete SnapshotOperation = "delete" +) + +type SnapshotRequestS3 struct { + s3Config + Timeout metav1.Duration `json:"timeout"` + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` +} + +type SnapshotRequest struct { + Operation SnapshotOperation `json:"operation"` + Name []string `json:"name,omitempty"` + Dir *string `json:"dir,omitempty"` + Compress *bool `json:"compress,omitempty"` + Retention *int `json:"retention,omitempty"` + + S3 *SnapshotRequestS3 `json:"s3,omitempty"` + + ctx context.Context +} + +func (sr *SnapshotRequest) context() context.Context { + if sr.ctx != nil { + return sr.ctx + } + return context.Background() +} + +// snapshotHandler handles snapshot save/list/prune requests from the CLI. +func (e *ETCD) snapshotHandler() http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + sr, err := getSnapshotRequest(req) + if err != nil { + util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError) + } + switch sr.Operation { + case SnapshotOperationList: + err = e.withRequest(sr).handleList(rw, req) + case SnapshotOperationSave: + err = e.withRequest(sr).handleSave(rw, req) + case SnapshotOperationPrune: + err = e.withRequest(sr).handlePrune(rw, req) + case SnapshotOperationDelete: + err = e.withRequest(sr).handleDelete(rw, req, sr.Name) + default: + err = e.handleInvalid(rw, req) + } + if err != nil { + logrus.Warnf("Error in etcd-snapshot handler: %v", err) + } + }) +} + +func (e *ETCD) handleList(rw http.ResponseWriter, req *http.Request) error { + if err := e.initS3IfNil(req.Context()); err != nil { + util.SendError(err, rw, req, http.StatusBadRequest) + return nil + } + sf, err := e.ListSnapshots(req.Context()) + if sf == nil { + util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError) + return nil + } + sendSnapshotList(rw, req, sf) + return err +} + +func (e *ETCD) handleSave(rw http.ResponseWriter, req *http.Request) error { + if err := e.initS3IfNil(req.Context()); err != nil { + util.SendError(err, rw, req, http.StatusBadRequest) + return nil + } + sr, err := e.Snapshot(req.Context()) + if sr == nil { + util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError) + return nil + } + sendSnapshotResponse(rw, req, sr) + return err +} + +func (e *ETCD) handlePrune(rw http.ResponseWriter, req *http.Request) error { + if err := e.initS3IfNil(req.Context()); err != nil { + util.SendError(err, rw, req, http.StatusBadRequest) + return nil + } + sr, err := e.PruneSnapshots(req.Context()) + if sr == nil { + util.SendError(err, rw, req, http.StatusInternalServerError) + return nil + } + sendSnapshotResponse(rw, req, sr) + return err +} + +func (e *ETCD) handleDelete(rw http.ResponseWriter, req *http.Request, snapshots []string) error { + if err := e.initS3IfNil(req.Context()); err != nil { + util.SendError(err, rw, req, http.StatusBadRequest) + return nil + } + sr, err := e.DeleteSnapshots(req.Context(), snapshots) + if sr == nil { + util.SendError(err, rw, req, http.StatusInternalServerError) + return nil + } + sendSnapshotResponse(rw, req, sr) + return err +} + +func (e *ETCD) handleInvalid(rw http.ResponseWriter, req *http.Request) error { + util.SendErrorWithID(fmt.Errorf("invalid snapshot operation"), "etcd-snapshot", rw, req, http.StatusBadRequest) + return nil +} + +// withRequest returns a modified ETCD struct that is overriden +// with etcd snapshot config from the snapshot request. +func (e *ETCD) withRequest(sr *SnapshotRequest) *ETCD { + re := &ETCD{ + client: e.client, + config: &config.Control{ + CriticalControlArgs: e.config.CriticalControlArgs, + Runtime: e.config.Runtime, + DataDir: e.config.DataDir, + Datastore: e.config.Datastore, + EtcdSnapshotCompress: e.config.EtcdSnapshotCompress, + EtcdSnapshotName: e.config.EtcdSnapshotName, + EtcdSnapshotRetention: e.config.EtcdSnapshotRetention, + }, + name: e.name, + address: e.address, + cron: e.cron, + cancel: e.cancel, + snapshotSem: e.snapshotSem, + } + if len(sr.Name) > 0 { + re.config.EtcdSnapshotName = sr.Name[0] + } + if sr.Compress != nil { + re.config.EtcdSnapshotCompress = *sr.Compress + } + if sr.Dir != nil { + re.config.EtcdSnapshotDir = *sr.Dir + } + if sr.Retention != nil { + re.config.EtcdSnapshotRetention = *sr.Retention + } + if sr.S3 != nil { + re.config.EtcdS3 = true + re.config.EtcdS3BucketName = sr.S3.Bucket + re.config.EtcdS3AccessKey = sr.S3.AccessKey + re.config.EtcdS3SecretKey = sr.S3.SecretKey + re.config.EtcdS3Endpoint = sr.S3.Endpoint + re.config.EtcdS3EndpointCA = sr.S3.EndpointCA + re.config.EtcdS3SkipSSLVerify = sr.S3.SkipSSLVerify + re.config.EtcdS3Insecure = sr.S3.Insecure + re.config.EtcdS3Region = sr.S3.Region + re.config.EtcdS3Timeout = sr.S3.Timeout.Duration + } + return re +} + +// getSnapshotRequest unmarshalls the snapshot operation request from a client. +func getSnapshotRequest(req *http.Request) (*SnapshotRequest, error) { + if req.Method != http.MethodPost { + return nil, http.ErrNotSupported + } + sr := &SnapshotRequest{} + b, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + if err := json.Unmarshal(b, &sr); err != nil { + return nil, err + } + sr.ctx = req.Context() + return sr, nil +} + +func sendSnapshotResponse(rw http.ResponseWriter, req *http.Request, sr *managed.SnapshotResult) { + b, err := json.Marshal(sr) + if err != nil { + util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError) + return + } + rw.Header().Set("Content-Type", "application/json") + rw.Write(b) +} + +func sendSnapshotList(rw http.ResponseWriter, req *http.Request, sf *k3s.ETCDSnapshotFileList) { + b, err := json.Marshal(sf) + if err != nil { + util.SendErrorWithID(err, "etcd-snapshot", rw, req, http.StatusInternalServerError) + return + } + rw.Header().Set("Content-Type", "application/json") + rw.Write(b) +} diff --git a/pkg/server/auth.go b/pkg/server/auth/auth.go similarity index 94% rename from pkg/server/auth.go rename to pkg/server/auth/auth.go index 306b2d77b2..49056573f7 100644 --- a/pkg/server/auth.go +++ b/pkg/server/auth/auth.go @@ -1,4 +1,4 @@ -package server +package auth import ( "net/http" @@ -51,7 +51,7 @@ func doAuth(roles []string, serverConfig *config.Control, next http.Handler, rw next.ServeHTTP(rw, req) } -func authMiddleware(serverConfig *config.Control, roles ...string) mux.MiddlewareFunc { +func Middleware(serverConfig *config.Control, roles ...string) mux.MiddlewareFunc { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { doAuth(roles, serverConfig, next, rw, req) diff --git a/pkg/server/cert.go b/pkg/server/cert.go index db26f632cd..8b20dc99c1 100644 --- a/pkg/server/cert.go +++ b/pkg/server/cert.go @@ -19,6 +19,7 @@ import ( "github.com/k3s-io/k3s/pkg/cluster" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/daemons/control/deps" + "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/pkg/errors" certutil "github.com/rancher/dynamiclistener/cert" @@ -35,7 +36,7 @@ func caCertReplaceHandler(server *config.Control) http.HandlerFunc { } force, _ := strconv.ParseBool(req.FormValue("force")) if err := caCertReplace(server, req.Body, force); err != nil { - genErrorMessage(resp, http.StatusInternalServerError, err, "certificate") + util.SendErrorWithID(err, "certificate", resp, req, http.StatusInternalServerError) return } logrus.Infof("certificate: Cluster Certificate Authority data has been updated, %s must be restarted.", version.Program) diff --git a/pkg/server/router.go b/pkg/server/router.go index 984f9d71a7..72c92e92ed 100644 --- a/pkg/server/router.go +++ b/pkg/server/router.go @@ -20,6 +20,7 @@ import ( "github.com/k3s-io/k3s/pkg/cli/cmds" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/nodepassword" + "github.com/k3s-io/k3s/pkg/server/auth" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/pkg/errors" @@ -51,7 +52,7 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler prefix := "/v1-" + version.Program authed := mux.NewRouter().SkipClean(true) - authed.Use(authMiddleware(serverConfig, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup)) + authed.Use(auth.Middleware(serverConfig, version.Program+":agent", user.NodesGroup, bootstrapapi.BootstrapDefaultGroup)) authed.Path(prefix + "/serving-kubelet.crt").Handler(servingKubeletCert(serverConfig, serverConfig.Runtime.ServingKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kubelet.crt").Handler(clientKubeletCert(serverConfig, serverConfig.Runtime.ClientKubeletKey, nodeAuth)) authed.Path(prefix + "/client-kube-proxy.crt").Handler(fileHandler(serverConfig.Runtime.ClientKubeProxyCert, serverConfig.Runtime.ClientKubeProxyKey)) @@ -70,23 +71,22 @@ func router(ctx context.Context, config *Config, cfg *cmds.Server) http.Handler nodeAuthed := mux.NewRouter().SkipClean(true) nodeAuthed.NotFoundHandler = authed - nodeAuthed.Use(authMiddleware(serverConfig, user.NodesGroup)) + nodeAuthed.Use(auth.Middleware(serverConfig, user.NodesGroup)) nodeAuthed.Path(prefix + "/connect").Handler(serverConfig.Runtime.Tunnel) serverAuthed := mux.NewRouter().SkipClean(true) serverAuthed.NotFoundHandler = nodeAuthed - serverAuthed.Use(authMiddleware(serverConfig, version.Program+":server")) + serverAuthed.Use(auth.Middleware(serverConfig, version.Program+":server")) serverAuthed.Path(prefix + "/encrypt/status").Handler(encryptionStatusHandler(serverConfig)) serverAuthed.Path(prefix + "/encrypt/config").Handler(encryptionConfigHandler(ctx, serverConfig)) serverAuthed.Path(prefix + "/cert/cacerts").Handler(caCertReplaceHandler(serverConfig)) - serverAuthed.Path("/db/info").Handler(nodeAuthed) serverAuthed.Path(prefix + "/server-bootstrap").Handler(bootstrapHandler(serverConfig.Runtime)) serverAuthed.Path(prefix + "/token").Handler(tokenRequestHandler(ctx, serverConfig)) systemAuthed := mux.NewRouter().SkipClean(true) systemAuthed.NotFoundHandler = serverAuthed systemAuthed.MethodNotAllowedHandler = serverAuthed - systemAuthed.Use(authMiddleware(serverConfig, user.SystemPrivilegedGroup)) + systemAuthed.Use(auth.Middleware(serverConfig, user.SystemPrivilegedGroup)) systemAuthed.Methods(http.MethodConnect).Handler(serverConfig.Runtime.Tunnel) staticDir := filepath.Join(serverConfig.DataDir, "static") diff --git a/pkg/server/secrets-encrypt.go b/pkg/server/secrets-encrypt.go index 0ec8ac266a..0a5e12d3a2 100644 --- a/pkg/server/secrets-encrypt.go +++ b/pkg/server/secrets-encrypt.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "io" - "math/big" "net/http" "os" "strings" @@ -45,12 +44,12 @@ type EncryptionRequest struct { Skip bool `json:"skip"` } -func getEncryptionRequest(req *http.Request) (EncryptionRequest, error) { +func getEncryptionRequest(req *http.Request) (*EncryptionRequest, error) { b, err := io.ReadAll(req.Body) if err != nil { - return EncryptionRequest{}, err + return nil, err } - result := EncryptionRequest{} + result := &EncryptionRequest{} err = json.Unmarshal(b, &result) return result, err } @@ -63,14 +62,15 @@ func encryptionStatusHandler(server *config.Control) http.Handler { } status, err := encryptionStatus(server) if err != nil { - genErrorMessage(resp, http.StatusInternalServerError, err, "secrets-encrypt") + util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusInternalServerError) return } b, err := json.Marshal(status) if err != nil { - genErrorMessage(resp, http.StatusInternalServerError, err, "secrets-encrypt") + util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusInternalServerError) return } + resp.Header().Set("Content-Type", "application/json") resp.Write(b) }) } @@ -192,7 +192,7 @@ func encryptionConfigHandler(ctx context.Context, server *config.Control) http.H } if err != nil { - genErrorMessage(resp, http.StatusBadRequest, err, "secrets-encrypt") + util.SendErrorWithID(err, "secret-encrypt", resp, req, http.StatusBadRequest) return } // If a user kills the k3s server immediately after this call, we run into issues where the files @@ -464,18 +464,3 @@ func verifyEncryptionHashAnnotation(runtime *config.ControlRuntime, core core.In return nil } - -// genErrorMessage sends and logs a random error ID so that logs can be correlated -// between the REST API (which does not provide any detailed error output, to avoid -// information disclosure) and the server logs. -func genErrorMessage(resp http.ResponseWriter, statusCode int, passedErr error, component string) { - errID, err := rand.Int(rand.Reader, big.NewInt(99999)) - if err != nil { - resp.WriteHeader(http.StatusInternalServerError) - resp.Write([]byte(err.Error())) - return - } - logrus.Warnf("%s error ID %05d: %s", component, errID, passedErr.Error()) - resp.WriteHeader(statusCode) - resp.Write([]byte(fmt.Sprintf("%s error ID %05d", component, errID))) -} diff --git a/pkg/server/token.go b/pkg/server/token.go index d107bbd0ed..c5da332fa6 100644 --- a/pkg/server/token.go +++ b/pkg/server/token.go @@ -45,7 +45,7 @@ func tokenRequestHandler(ctx context.Context, server *config.Control) http.Handl return } if err = tokenRotate(ctx, server, *sTokenReq.NewToken); err != nil { - genErrorMessage(resp, http.StatusInternalServerError, err, "token") + util.SendErrorWithID(err, "token", resp, req, http.StatusInternalServerError) return } resp.WriteHeader(http.StatusOK) diff --git a/pkg/util/apierrors.go b/pkg/util/apierrors.go index 7666a835f7..aaba05b5db 100644 --- a/pkg/util/apierrors.go +++ b/pkg/util/apierrors.go @@ -1,6 +1,9 @@ package util import ( + "crypto/rand" + "fmt" + "math/big" "net/http" "github.com/k3s-io/k3s/pkg/generated/clientset/versioned/scheme" @@ -14,6 +17,15 @@ import ( var ErrNotReady = errors.New("apiserver not ready") +// SendErrorWithID sends and logs a random error ID so that logs can be correlated +// between the REST API (which does not provide any detailed error output, to avoid +// information disclosure) and the server logs. +func SendErrorWithID(err error, component string, resp http.ResponseWriter, req *http.Request, status ...int) { + errID, _ := rand.Int(rand.Reader, big.NewInt(99999)) + logrus.Errorf("%s error ID %05d: %v", component, errID, err) + SendError(fmt.Errorf("%s error ID %05d", component, errID), resp, req, status...) +} + // SendError sends a properly formatted error response func SendError(err error, resp http.ResponseWriter, req *http.Request, status ...int) { var code int diff --git a/tests/e2e/s3/s3_test.go b/tests/e2e/s3/s3_test.go index 2f0dcdb537..92a287c70c 100644 --- a/tests/e2e/s3/s3_test.go +++ b/tests/e2e/s3/s3_test.go @@ -89,21 +89,18 @@ var _ = Describe("Verify Create", Ordered, func() { It("save s3 snapshot", func() { res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("S3 bucket test exists")) - Expect(res).To(ContainSubstring("Uploading snapshot")) - Expect(res).To(ContainSubstring("S3 upload complete for")) + Expect(res).To(ContainSubstring("Snapshot on-demand-server-0")) }) It("lists saved s3 snapshot", func() { res, err := e2e.RunCmdOnNode("k3s etcd-snapshot list", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("on-demand-server-0")) + Expect(res).To(ContainSubstring("file:///var/lib/rancher/k3s/server/db/snapshots/on-demand-server-0")) }) It("save 3 more s3 snapshots", func() { for _, i := range []string{"1", "2", "3"} { res, err := e2e.RunCmdOnNode("k3s etcd-snapshot save --name special-"+i, serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("Uploading snapshot")) - Expect(res).To(ContainSubstring("S3 upload complete for special-" + i)) + Expect(res).To(ContainSubstring("Snapshot special-" + i + "-server-0")) } }) It("lists saved s3 snapshot", func() { @@ -117,18 +114,12 @@ var _ = Describe("Verify Create", Ordered, func() { It("delete first on-demand s3 snapshot", func() { _, err := e2e.RunCmdOnNode("sudo k3s etcd-snapshot ls >> ./snapshotname.txt", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - snapshotName, err := e2e.RunCmdOnNode("grep -Eo 'on-demand-server-0-([0-9]+)' ./snapshotname.txt |head -1", serverNodeNames[0]) + snapshotName, err := e2e.RunCmdOnNode("grep -Eo 'on-demand-server-0-([0-9]+)' ./snapshotname.txt | head -1", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) res, err := e2e.RunCmdOnNode("sudo k3s etcd-snapshot delete "+snapshotName, serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("Reconciling ETCDSnapshotFile resources")) - Expect(res).To(ContainSubstring("Snapshot " + strings.TrimSpace(snapshotName) + " deleted from S3")) - Expect(res).To(ContainSubstring("Reconciliation of ETCDSnapshotFile resources complete")) + Expect(res).To(ContainSubstring("Snapshot " + strings.TrimSpace(snapshotName) + " deleted")) }) - - // TODO, there is currently a bug that prevents pruning on s3 snapshots that are not prefixed with "on-demand" - // https://github.com/rancher/rke2/issues/3714 - // Once fixed, ensure that the snapshots list are actually reduced to 2 It("prunes s3 snapshots", func() { _, err := e2e.RunCmdOnNode("k3s etcd-snapshot save", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) @@ -136,11 +127,8 @@ var _ = Describe("Verify Create", Ordered, func() { Expect(err).NotTo(HaveOccurred()) res, err := e2e.RunCmdOnNode("k3s etcd-snapshot prune --snapshot-retention 2", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) - Expect(res).To(ContainSubstring("Reconciliation of ETCDSnapshotFile resources complete")) - - _, err = e2e.RunCmdOnNode("k3s etcd-snapshot ls|grep 'on-demand'|wc -l>count", serverNodeNames[0]) - Expect(err).NotTo(HaveOccurred()) - res, err = e2e.RunCmdOnNode("grep '^[4]$' ./count", serverNodeNames[0]) + // There should now be 4 on-demand snapshots - 2 local, and 2 on s3 + res, err = e2e.RunCmdOnNode("k3s etcd-snapshot ls 2>/dev/null | grep on-demand | wc -l", serverNodeNames[0]) Expect(err).NotTo(HaveOccurred()) Expect(strings.TrimSpace(res)).To(Equal("4")) }) diff --git a/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go b/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go index 1d7c9b5ea2..b4977639b6 100644 --- a/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go +++ b/tests/integration/etcdsnapshot/etcdsnapshot_int_test.go @@ -58,13 +58,13 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) snapshotName := reg.FindString(lsResult) Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted")) }) }) When("saving a custom name", func() { It("saves an etcd snapshot with a custom name", func() { Expect(testutil.K3sCmd("etcd-snapshot", "save --name ALIVEBEEF")). - To(ContainSubstring("Saving etcd snapshot to /var/lib/rancher/k3s/server/db/snapshots/ALIVEBEEF")) + To(ContainSubstring("Snapshot ALIVEBEEF-")) }) It("deletes that snapshot", func() { lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls") @@ -73,7 +73,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) snapshotName := reg.FindString(lsResult) Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted")) }) }) When("using etcd snapshot prune", func() { @@ -98,7 +98,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { }) It("prunes snapshots down to 2", func() { Expect(testutil.K3sCmd("etcd-snapshot", "prune --snapshot-retention 2 --name PRUNE_TEST")). - To(ContainSubstring("Removing local snapshot")) + To(ContainSubstring(" deleted.")) lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls") Expect(err).ToNot(HaveOccurred()) reg, err := regexp.Compile(`(?m):///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`) @@ -113,7 +113,7 @@ var _ = Describe("etcd snapshots", Ordered, func() { Expect(err).ToNot(HaveOccurred()) for _, snapshotName := range reg.FindAllString(lsResult, -1) { Expect(testutil.K3sCmd("etcd-snapshot", "delete", snapshotName)). - To(ContainSubstring("Snapshot " + snapshotName + " deleted locally")) + To(ContainSubstring("Snapshot " + snapshotName + " deleted")) } }) })