package etcd import ( "archive/zip" "context" "crypto/sha256" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "io" "math/rand" "net/http" "os" "path/filepath" "runtime" "sort" "strconv" "strings" "time" apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/daemons/config" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/pkg/errors" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.uber.org/zap" "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" "k8s.io/utils/pointer" ) const ( maxConcurrentSnapshots = 1 compressedExtension = ".zip" metadataDir = ".metadata" errorTTL = 24 * time.Hour ) var ( snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" labelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node" annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp" annotationS3Reconciled = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp" annotationTokenHash = "etcd." + version.Program + ".cattle.io/snapshot-token-hash" // snapshotDataBackoff will retry at increasing steps for up to ~30 seconds. // If the ConfigMap update fails, the list won't be reconciled again until next time // the server starts, so we should be fairly persistent in retrying. snapshotDataBackoff = wait.Backoff{ Steps: 9, Duration: 10 * time.Millisecond, Factor: 3.0, Jitter: 0.1, } // cronLogger wraps logrus's Printf output as cron-compatible logger cronLogger = cron.VerbosePrintfLogger(logrus.StandardLogger()) ) // snapshotDir ensures that the snapshot directory exists, and then returns its path. func snapshotDir(config *config.Control, create bool) (string, error) { if config.EtcdSnapshotDir == "" { // we have to create the snapshot dir if we are using // the default snapshot dir if it doesn't exist defaultSnapshotDir := filepath.Join(config.DataDir, "db", "snapshots") s, err := os.Stat(defaultSnapshotDir) if err != nil { if create && os.IsNotExist(err) { if err := os.MkdirAll(defaultSnapshotDir, 0700); err != nil { return "", err } return defaultSnapshotDir, nil } return "", err } if s.IsDir() { return defaultSnapshotDir, nil } } return config.EtcdSnapshotDir, nil } // preSnapshotSetup checks to see if the necessary components are in place // to perform an Etcd snapshot. This is necessary primarily for on-demand // snapshots since they're performed before normal Etcd setup is completed. func (e *ETCD) preSnapshotSetup(ctx context.Context) error { if e.snapshotSem == nil { e.snapshotSem = semaphore.NewWeighted(maxConcurrentSnapshots) } return nil } // compressSnapshot compresses the given snapshot and provides the // caller with the path to the file. func (e *ETCD) compressSnapshot(snapshotDir, snapshotName, snapshotPath string, now time.Time) (string, error) { logrus.Info("Compressing etcd snapshot file: " + snapshotName) zippedSnapshotName := snapshotName + compressedExtension zipPath := filepath.Join(snapshotDir, zippedSnapshotName) zf, err := os.Create(zipPath) if err != nil { return "", err } defer zf.Close() zipWriter := zip.NewWriter(zf) defer zipWriter.Close() uncompressedPath := filepath.Join(snapshotDir, snapshotName) fileToZip, err := os.Open(uncompressedPath) if err != nil { os.Remove(zipPath) return "", err } defer fileToZip.Close() info, err := fileToZip.Stat() if err != nil { os.Remove(zipPath) return "", err } header, err := zip.FileInfoHeader(info) if err != nil { os.Remove(zipPath) return "", err } header.Name = snapshotName header.Method = zip.Deflate header.Modified = now writer, err := zipWriter.CreateHeader(header) if err != nil { os.Remove(zipPath) return "", err } _, err = io.Copy(writer, fileToZip) return zipPath, err } // decompressSnapshot decompresses the given snapshot and provides the caller // with the full path to the uncompressed snapshot. func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, error) { logrus.Info("Decompressing etcd snapshot file: " + snapshotFile) r, err := zip.OpenReader(snapshotFile) if err != nil { return "", err } defer r.Close() var decompressed *os.File for _, sf := range r.File { decompressed, err = os.OpenFile(strings.Replace(sf.Name, compressedExtension, "", -1), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, sf.Mode()) if err != nil { return "", err } defer decompressed.Close() ss, err := sf.Open() if err != nil { return "", err } defer ss.Close() if _, err := io.Copy(decompressed, ss); err != nil { os.Remove(decompressed.Name()) return "", err } } return decompressed.Name(), nil } // Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed // snapshots in excess of the retention limits. This method is used in the internal cron snapshot // system as well as used to do on-demand snapshots. func (e *ETCD) Snapshot(ctx context.Context) error { if err := e.preSnapshotSetup(ctx); err != nil { return err } if !e.snapshotSem.TryAcquire(maxConcurrentSnapshots) { return fmt.Errorf("%d snapshots already in progress", maxConcurrentSnapshots) } defer e.snapshotSem.Release(maxConcurrentSnapshots) // make sure the core.Factory is initialized before attempting to add snapshot metadata var extraMetadata *v1.ConfigMap if e.config.Runtime.Core == nil { logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshotExtraMetadataConfigMapName) } else { logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil { logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) } else { logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) extraMetadata = snapshotExtraMetadataConfigMap } } endpoints := getEndpoints(e.config) var client *clientv3.Client var err error // Use the internal client if possible, or create a new one // if run from the CLI. if e.client != nil { client = e.client } else { client, err = getClient(ctx, e.config, endpoints...) if err != nil { return err } defer client.Close() } status, err := client.Status(ctx, endpoints[0]) if err != nil { return errors.Wrap(err, "failed to check etcd status for snapshot") } if status.IsLearner { logrus.Warnf("Unable to take snapshot: not supported for learner") return nil } snapshotDir, err := snapshotDir(e.config, true) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } cfg, err := getClientConfig(ctx, e.config) if err != nil { return errors.Wrap(err, "failed to get config for etcd snapshot") } tokenHash, err := util.GetTokenHash(e.config) if err != nil { return errors.Wrap(err, "failed to get server token hash for etcd snapshot") } nodeName := os.Getenv("NODE_NAME") now := time.Now().Round(time.Second) snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix()) snapshotPath := filepath.Join(snapshotDir, snapshotName) logrus.Infof("Saving etcd snapshot to %s", snapshotPath) var sf *snapshotFile lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel) if err != nil { return err } if err := snapshot.NewV3(lg).Save(ctx, *cfg, snapshotPath); err != nil { sf = &snapshotFile{ Name: snapshotName, Location: "", NodeName: nodeName, CreatedAt: &metav1.Time{ Time: now, }, Status: failedSnapshotStatus, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, metadataSource: extraMetadata, } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } // If the snapshot attempt was successful, sf will be nil as we did not set it to store the error message. if sf == nil { if e.config.EtcdSnapshotCompress { zipPath, err := e.compressSnapshot(snapshotDir, snapshotName, snapshotPath, now) if err != nil { return errors.Wrap(err, "failed to compress snapshot") } if err := os.Remove(snapshotPath); err != nil { return errors.Wrap(err, "failed to remove uncompressed snapshot") } snapshotPath = zipPath logrus.Info("Compressed snapshot: " + snapshotPath) } f, err := os.Stat(snapshotPath) if err != nil { return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") } sf = &snapshotFile{ Name: f.Name(), Location: "file://" + snapshotPath, NodeName: nodeName, CreatedAt: &metav1.Time{ Time: now, }, Status: successfulSnapshotStatus, Size: f.Size(), Compressed: e.config.EtcdSnapshotCompress, metadataSource: extraMetadata, tokenHash: tokenHash, } if err := saveSnapshotMetadata(snapshotPath, extraMetadata); err != nil { return errors.Wrap(err, "failed to save local snapshot metadata") } if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { return errors.Wrap(err, "failed to apply local snapshot retention policy") } if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ Name: filepath.Base(snapshotPath), NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, }, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, Status: failedSnapshotStatus, S3: &s3Config{ Endpoint: e.config.EtcdS3Endpoint, EndpointCA: e.config.EtcdS3EndpointCA, SkipSSLVerify: e.config.EtcdS3SkipSSLVerify, Bucket: e.config.EtcdS3BucketName, Region: e.config.EtcdS3Region, Folder: e.config.EtcdS3Folder, Insecure: e.config.EtcdS3Insecure, }, metadataSource: extraMetadata, } } else { logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) // upload will return a snapshotFile even on error - if there was an // error, it will be reflected in the status and message. sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now) if err != nil { logrus.Errorf("Error received during snapshot upload to S3: %s", err) } else { logrus.Infof("S3 upload complete for %s", snapshotName) } // Attempt to apply retention even if the upload failed; failure may be due to bucket // being full or some other condition that retention policy would resolve. if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) } } // sf is either s3 snapshot metadata, or s3 failure record if err := e.addSnapshotData(*sf); err != nil { return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } } return e.ReconcileSnapshotData(ctx) } type s3Config struct { Endpoint string `json:"endpoint,omitempty"` EndpointCA string `json:"endpointCA,omitempty"` SkipSSLVerify bool `json:"skipSSLVerify,omitempty"` Bucket string `json:"bucket,omitempty"` Region string `json:"region,omitempty"` Folder string `json:"folder,omitempty"` Insecure bool `json:"insecure,omitempty"` } type snapshotStatus string const ( successfulSnapshotStatus snapshotStatus = "successful" failedSnapshotStatus snapshotStatus = "failed" ) // snapshotFile represents a single snapshot and it's // metadata. type snapshotFile struct { Name string `json:"name"` // Location contains the full path of the snapshot. For // local paths, the location will be prefixed with "file://". Location string `json:"location,omitempty"` Metadata string `json:"metadata,omitempty"` Message string `json:"message,omitempty"` NodeName string `json:"nodeName,omitempty"` CreatedAt *metav1.Time `json:"createdAt,omitempty"` Size int64 `json:"size,omitempty"` Status snapshotStatus `json:"status,omitempty"` S3 *s3Config `json:"s3Config,omitempty"` Compressed bool `json:"compressed"` // these fields are used for the internal representation of the snapshot // to populate other fields before serialization to the legacy configmap. metadataSource *v1.ConfigMap `json:"-"` nodeSource string `json:"-"` tokenHash string `json:"-"` } // listLocalSnapshots provides a list of the currently stored // snapshots on disk along with their relevant // metadata. func (e *ETCD) listLocalSnapshots() (map[string]snapshotFile, error) { nodeName := os.Getenv("NODE_NAME") snapshots := make(map[string]snapshotFile) snapshotDir, err := snapshotDir(e.config, true) if err != nil { return snapshots, errors.Wrap(err, "failed to get the snapshot dir") } if err := filepath.Walk(snapshotDir, func(path string, file os.FileInfo, err error) error { if file.IsDir() || err != nil { return err } basename, compressed := strings.CutSuffix(file.Name(), compressedExtension) ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) if err != nil { ts = file.ModTime().Unix() } // try to read metadata from disk; don't warn if it is missing as it will not exist // for snapshot files from old releases or if there was no metadata provided. var metadata string metadataFile := filepath.Join(filepath.Dir(path), "..", metadataDir, file.Name()) if m, err := os.ReadFile(metadataFile); err == nil { logrus.Debugf("Loading snapshot metadata from %s", metadataFile) metadata = base64.StdEncoding.EncodeToString(m) } sf := snapshotFile{ Name: file.Name(), Location: "file://" + filepath.Join(snapshotDir, file.Name()), NodeName: nodeName, Metadata: metadata, CreatedAt: &metav1.Time{ Time: time.Unix(ts, 0), }, Size: file.Size(), Status: successfulSnapshotStatus, Compressed: compressed, } sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf return nil }); err != nil { return nil, err } return snapshots, nil } // initS3IfNil initializes the S3 client // if it hasn't yet been initialized. func (e *ETCD) initS3IfNil(ctx context.Context) error { if e.s3 == nil { s3, err := NewS3(ctx, e.config) if err != nil { return err } e.s3 = s3 } return nil } // PruneSnapshots performs a retention run with the given // retention duration and removes expired snapshots. func (e *ETCD) PruneSnapshots(ctx context.Context) error { snapshotDir, err := snapshotDir(e.config, false) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { logrus.Errorf("Error applying snapshot retention policy: %v", err) } if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) } else { if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Error applying S3 snapshot retention policy: %v", err) } } } return e.ReconcileSnapshotData(ctx) } // ListSnapshots is an exported wrapper method that wraps an // unexported method of the same name. func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]snapshotFile, error) { snapshotFiles := map[string]snapshotFile{} if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) return nil, err } sfs, err := e.s3.listSnapshots(ctx) if err != nil { return nil, err } snapshotFiles = sfs } sfs, err := e.listLocalSnapshots() if err != nil { return nil, err } for k, sf := range sfs { snapshotFiles[k] = sf } return snapshotFiles, err } // DeleteSnapshots removes the given snapshots from local storage and S3. func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { snapshotDir, err := snapshotDir(e.config, false) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) return err } } for _, s := range snapshots { if err := e.deleteSnapshot(filepath.Join(snapshotDir, s)); err != nil { if isNotExist(err) { logrus.Infof("Snapshot %s not found locally", s) } else { logrus.Errorf("Failed to delete local snapshot %s: %v", s, err) } } else { logrus.Infof("Snapshot %s deleted locally", s) } if e.config.EtcdS3 { if err := e.s3.deleteSnapshot(ctx, s); err != nil { if isNotExist(err) { logrus.Infof("Snapshot %s not found in S3", s) } else { logrus.Errorf("Failed to delete S3 snapshot %s: %v", s, err) } } else { logrus.Infof("Snapshot %s deleted from S3", s) } } } return e.ReconcileSnapshotData(ctx) } func (e *ETCD) deleteSnapshot(snapshotPath string) error { dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir) filename := filepath.Base(snapshotPath) metadataPath := filepath.Join(dir, filename) err := os.Remove(snapshotPath) if err == nil || os.IsNotExist(err) { if merr := os.Remove(metadataPath); err != nil && !isNotExist(err) { err = merr } } return err } func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { if sf.metadataSource != nil { if m, err := json.Marshal(sf.metadataSource.Data); err != nil { logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) } else { sf.Metadata = base64.StdEncoding.EncodeToString(m) } } return json.Marshal(sf) } // addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource // of the same name. Resources will be created or updated as necessary. func (e *ETCD) addSnapshotData(sf snapshotFile) error { // make sure the K3s factory is initialized. for e.config.Runtime.K3s == nil { runtime.Gosched() } snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() esfName := generateSnapshotName(sf) var esf *apisv1.ETCDSnapshotFile return retry.OnError(snapshotDataBackoff, func(err error) bool { return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) }, func() (err error) { // Get current object or create new one esf, err = snapshots.Get(esfName, metav1.GetOptions{}) if err != nil { if !apierrors.IsNotFound(err) { return err } esf = &apisv1.ETCDSnapshotFile{ ObjectMeta: metav1.ObjectMeta{ Name: esfName, }, } } // mutate object existing := esf.DeepCopyObject() sf.toETCDSnapshotFile(esf) // create or update as necessary if esf.CreationTimestamp.IsZero() { var created *apisv1.ETCDSnapshotFile created, err = snapshots.Create(esf) if err == nil { // Only emit an event for the snapshot when creating the resource e.emitEvent(created) } } else if !equality.Semantic.DeepEqual(existing, esf) { _, err = snapshots.Update(esf) } return err }) } // generateSnapshotConfigMapKey generates a derived name for the snapshot that is safe for use // as a configmap key. func generateSnapshotConfigMapKey(sf snapshotFile) string { name := invalidKeyChars.ReplaceAllString(sf.Name, "_") if sf.NodeName == "s3" { return "s3-" + name } return "local-" + name } // generateSnapshotName generates a derived name for the snapshot that is safe for use // as a resource name. func generateSnapshotName(sf snapshotFile) string { name := strings.ToLower(sf.Name) nodename := sf.nodeSource if nodename == "" { nodename = sf.NodeName } // Include a digest of the hostname and location to ensure unique resource // names. Snapshots should already include the hostname, but this ensures we // don't accidentally hide records if a snapshot with the same name somehow // exists on multiple nodes. digest := sha256.Sum256([]byte(nodename + sf.Location)) // If the lowercase filename isn't usable as a resource name, and short enough that we can include a prefix and suffix, // generate a safe name derived from the hostname and timestamp. if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 || len(name)+13 > validation.DNS1123SubdomainMaxLength { nodename, _, _ := strings.Cut(nodename, ".") name = fmt.Sprintf("etcd-snapshot-%s-%d", nodename, sf.CreatedAt.Unix()) if sf.Compressed { name += compressedExtension } } if sf.NodeName == "s3" { return "s3-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] } return "local-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] } // generateETCDSnapshotFileConfigMapKey generates a key that the corresponding // snapshotFile would be stored under in the legacy configmap func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string { name := invalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_") if esf.Spec.S3 != nil { return "s3-" + name } return "local-" + name } func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) { switch { case e.config.Runtime.Event == nil: case !esf.DeletionTimestamp.IsZero(): e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotDeleted", "Snapshot %s deleted", esf.Spec.SnapshotName) case esf.Status.Error != nil: message := fmt.Sprintf("Failed to save snapshot %s on %s", esf.Spec.SnapshotName, esf.Spec.NodeName) if esf.Status.Error.Message != nil { message += ": " + *esf.Status.Error.Message } e.config.Runtime.Event.Event(esf, v1.EventTypeWarning, "ETCDSnapshotFailed", message) default: e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotCreated", "Snapshot %s saved on %s", esf.Spec.SnapshotName, esf.Spec.NodeName) } } // ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. // It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots // and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // make sure the core.Factory is initialized. There can // be a race between this core code startup. for e.config.Runtime.Core == nil { runtime.Gosched() } logrus.Infof("Reconciling ETCDSnapshotFile resources") defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete") // Get snapshots from local filesystem snapshotFiles, err := e.listLocalSnapshots() if err != nil { return err } nodeNames := []string{os.Getenv("NODE_NAME")} // Get snapshots from S3 if e.config.EtcdS3 { if err := e.initS3IfNil(ctx); err != nil { logrus.Warnf("Unable to initialize S3 client: %v", err) return err } if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) } else { for k, v := range s3Snapshots { snapshotFiles[k] = v } nodeNames = append(nodeNames, "s3") } } // Try to load metadata from the legacy configmap, in case any local or s3 snapshots // were created by an old release that does not write the metadata alongside the snapshot file. snapshotConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } if snapshotConfigMap != nil { for sfKey, sf := range snapshotFiles { logrus.Debugf("Found snapshotFile for %s with key %s", sf.Name, sfKey) // if the configmap has data for this snapshot, and local metadata is empty, // deserialize the value from the configmap and attempt to load it. if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.metadataSource == nil { sfTemp := &snapshotFile{} if err := json.Unmarshal([]byte(cmSnapshotValue), sfTemp); err != nil { logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err) continue } sf.Metadata = sfTemp.Metadata snapshotFiles[sfKey] = sf } } } labelSelector := &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{{ Key: labelStorageNode, Operator: metav1.LabelSelectorOpIn, Values: nodeNames, }}, } selector, err := metav1.LabelSelectorAsSelector(labelSelector) if err != nil { return err } // List all snapshots matching the selector snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) if err != nil { return err } // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync. // If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes. // The one exception to the last rule is failed snapshots - these must be retained for a period of time. for _, esf := range esfList.Items { sfKey := generateETCDSnapshotFileConfigMapKey(esf) logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey) if sf, ok := snapshotFiles[sfKey]; ok && generateSnapshotName(sf) == esf.Name { // exists in both and names match, don't need to sync delete(snapshotFiles, sfKey) } else { // doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it if esf.Status.Error != nil && esf.Status.Error.Time != nil { expires := esf.Status.Error.Time.Add(errorTTL) if time.Now().Before(expires) { continue } } if ok { logrus.Debugf("Name of ETCDSnapshotFile for snapshotFile with key %s does not match: %s vs %s", sfKey, generateSnapshotName(sf), esf.Name) } else { logrus.Debugf("Key %s not found in snapshotFile list", sfKey) } logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName) if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } } } // Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created for _, sf := range snapshotFiles { logrus.Infof("Creating ETCDSnapshotFile for %s", sf.Name) if err := e.addSnapshotData(sf); err != nil { logrus.Errorf("Failed to create ETCDSnapshotFile: %v", err) } } // List all snapshots in Kubernetes not stored on S3 or a current etcd node. // These snapshots are local to a node that no longer runs etcd and cannot be restored. // If the node rejoins later and has local snapshots, it will reconcile them itself. labelSelector.MatchExpressions[0].Operator = metav1.LabelSelectorOpNotIn labelSelector.MatchExpressions[0].Values = []string{"s3"} // Get a list of all etcd nodes currently in the cluster and add them to the selector nodes := e.config.Runtime.Core.Core().V1().Node() etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) if err != nil { return err } for _, node := range nodeList.Items { labelSelector.MatchExpressions[0].Values = append(labelSelector.MatchExpressions[0].Values, node.Name) } selector, err = metav1.LabelSelectorAsSelector(labelSelector) if err != nil { return err } // List and remove all snapshots stored on nodes that do not match the selector esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) if err != nil { return err } for _, esf := range esfList.Items { if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err) } } // Update our Node object to note the timestamp of the snapshot storages that have been reconciled now := time.Now().Round(time.Second).Format(time.RFC3339) patch := []map[string]string{ { "op": "add", "value": now, "path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"), }, } if e.config.EtcdS3 { patch = append(patch, map[string]string{ "op": "add", "value": now, "path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"), }) } b, err := json.Marshal(patch) if err != nil { return err } _, err = nodes.Patch(nodeNames[0], types.JSONPatchType, b) return err } // setSnapshotFunction schedules snapshots at the configured interval. func (e *ETCD) setSnapshotFunction(ctx context.Context) { skipJob := cron.SkipIfStillRunning(cronLogger) e.cron.AddJob(e.config.EtcdSnapshotCron, skipJob(cron.FuncJob(func() { // Add a small amount of jitter to the actual snapshot execution. On clusters with multiple servers, // having all the nodes take a snapshot at the exact same time can lead to excessive retry thrashing // when updating the snapshot list configmap. time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax))) if err := e.Snapshot(ctx); err != nil { logrus.Errorf("Failed to take scheduled snapshot: %v", err) } }))) } // snapshotRetention iterates through the snapshots and removes the oldest // leaving the desired number of snapshots. func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) error { if retention < 1 { return nil } logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir) var snapshotFiles []snapshotFile if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { if info.IsDir() || err != nil { return err } if strings.HasPrefix(info.Name(), snapshotPrefix) { basename, compressed := strings.CutSuffix(info.Name(), compressedExtension) ts, err := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) if err != nil { ts = info.ModTime().Unix() } snapshotFiles = append(snapshotFiles, snapshotFile{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) } return nil }); err != nil { return err } if len(snapshotFiles) <= retention { return nil } // sort newest-first so we can prune entries past the retention count sort.Slice(snapshotFiles, func(i, j int) bool { return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) }) for _, df := range snapshotFiles[retention:] { snapshotPath := filepath.Join(snapshotDir, df.Name) metadataPath := filepath.Join(snapshotDir, "..", metadataDir, df.Name) logrus.Infof("Removing local snapshot %s", snapshotPath) if err := os.Remove(snapshotPath); err != nil { return err } if err := os.Remove(metadataPath); err != nil && !os.IsNotExist(err) { return err } } return nil } func isNotExist(err error) bool { if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) { return true } return false } // saveSnapshotMetadata writes extra metadata to disk. // The upload is silently skipped if no extra metadata is provided. func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) error { if extraMetadata == nil || len(extraMetadata.Data) == 0 { return nil } dir := filepath.Join(filepath.Dir(snapshotPath), "..", metadataDir) filename := filepath.Base(snapshotPath) metadataPath := filepath.Join(dir, filename) logrus.Infof("Saving snapshot metadata to %s", metadataPath) m, err := json.Marshal(extraMetadata.Data) if err != nil { return err } if err := os.MkdirAll(dir, 0700); err != nil { return err } return os.WriteFile(metadataPath, m, 0700) } func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { if esf == nil { panic("cannot convert from nil ETCDSnapshotFile") } sf.Name = esf.Spec.SnapshotName sf.Location = esf.Spec.Location sf.CreatedAt = esf.Status.CreationTime sf.nodeSource = esf.Spec.NodeName sf.Compressed = strings.HasSuffix(esf.Spec.SnapshotName, compressedExtension) if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse { sf.Status = successfulSnapshotStatus } else { sf.Status = failedSnapshotStatus } if esf.Status.Size != nil { sf.Size = esf.Status.Size.Value() } if esf.Status.Error != nil { if esf.Status.Error.Time != nil { sf.CreatedAt = esf.Status.Error.Time } message := "etcd snapshot failed" if esf.Status.Error.Message != nil { message = *esf.Status.Error.Message } sf.Message = base64.StdEncoding.EncodeToString([]byte(message)) } if len(esf.Spec.Metadata) > 0 { if b, err := json.Marshal(esf.Spec.Metadata); err != nil { logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err) } else { sf.Metadata = base64.StdEncoding.EncodeToString(b) } } if tokenHash := esf.Annotations[annotationTokenHash]; tokenHash != "" { sf.tokenHash = tokenHash } if esf.Spec.S3 == nil { sf.NodeName = esf.Spec.NodeName } else { sf.NodeName = "s3" sf.S3 = &s3Config{ Endpoint: esf.Spec.S3.Endpoint, EndpointCA: esf.Spec.S3.EndpointCA, SkipSSLVerify: esf.Spec.S3.SkipSSLVerify, Bucket: esf.Spec.S3.Bucket, Region: esf.Spec.S3.Region, Folder: esf.Spec.S3.Prefix, Insecure: esf.Spec.S3.Insecure, } } } func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { if esf == nil { panic("cannot convert to nil ETCDSnapshotFile") } esf.Spec.SnapshotName = sf.Name esf.Spec.Location = sf.Location esf.Status.CreationTime = sf.CreatedAt esf.Status.ReadyToUse = pointer.Bool(sf.Status == successfulSnapshotStatus) esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI) if sf.nodeSource != "" { esf.Spec.NodeName = sf.nodeSource } else { esf.Spec.NodeName = sf.NodeName } if sf.Message != "" { var message string b, err := base64.StdEncoding.DecodeString(sf.Message) if err != nil { logrus.Warnf("Failed to decode error message for %s: %v", sf.Name, err) message = "etcd snapshot failed" } else { message = string(b) } esf.Status.Error = &apisv1.ETCDSnapshotError{ Time: sf.CreatedAt, Message: &message, } } if sf.metadataSource != nil { esf.Spec.Metadata = sf.metadataSource.Data } else if sf.Metadata != "" { metadata, err := base64.StdEncoding.DecodeString(sf.Metadata) if err != nil { logrus.Warnf("Failed to decode metadata for %s: %v", sf.Name, err) } else { if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil { logrus.Warnf("Failed to unmarshal metadata for %s: %v", sf.Name, err) } } } if esf.ObjectMeta.Labels == nil { esf.ObjectMeta.Labels = map[string]string{} } if esf.ObjectMeta.Annotations == nil { esf.ObjectMeta.Annotations = map[string]string{} } if sf.tokenHash != "" { esf.ObjectMeta.Annotations[annotationTokenHash] = sf.tokenHash } if sf.S3 == nil { esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName } else { esf.ObjectMeta.Labels[labelStorageNode] = "s3" esf.Spec.S3 = &apisv1.ETCDSnapshotS3{ Endpoint: sf.S3.Endpoint, EndpointCA: sf.S3.EndpointCA, SkipSSLVerify: sf.S3.SkipSSLVerify, Bucket: sf.S3.Bucket, Region: sf.S3.Region, Prefix: sf.S3.Folder, Insecure: sf.S3.Insecure, } } }