diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index eb3e5cdac7..fc3894b37f 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -576,6 +576,7 @@ func (e *ETCD) Register(handler http.Handler) (http.Handler, error) { e.config.Runtime.LeaderElectedClusterControllerStarts[version.Program+"-etcd"] = func(ctx context.Context) { registerEndpointsHandlers(ctx, e) registerMemberHandlers(ctx, e) + registerSnapshotHandlers(ctx, e) } } diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index ebe5abaf1d..d96b536d29 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -144,6 +144,7 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf }, Compressed: strings.HasSuffix(snapshot, compressedExtension), metadataSource: extraMetadata, + nodeSource: s.nodeName, } uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) @@ -338,8 +339,9 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) defer cancel() opts := minio.ListObjectsOptions{ - Prefix: s.config.EtcdS3Folder, - Recursive: true, + Prefix: s.config.EtcdS3Folder, + Recursive: true, + WithMetadata: true, } objects := s.client.ListObjects(ctx, s.config.EtcdS3BucketName, opts) @@ -389,6 +391,7 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) }, Status: successfulSnapshotStatus, Compressed: compressed, + nodeSource: obj.UserMetadata[nodeNameKey], } sfKey := generateSnapshotConfigMapKey(sf) snapshots[sfKey] = sf diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 3b436e26ac..4c710d7b51 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -3,7 +3,9 @@ package etcd import ( "archive/zip" "context" + "crypto/sha256" "encoding/base64" + "encoding/hex" "encoding/json" "fmt" "io" @@ -17,7 +19,9 @@ import ( "strings" "time" + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/pkg/errors" @@ -29,22 +33,30 @@ import ( "go.uber.org/zap" "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "k8s.io/utils/pointer" ) const ( maxConcurrentSnapshots = 1 - pruneStepSize = 5 compressedExtension = ".zip" metadataDir = ".metadata" + errorTTL = 24 * time.Hour ) var ( snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" - snapshotConfigMapName = version.Program + "-etcd-snapshots" + labelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node" + annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp" + annotationS3Reconciled = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp" // snapshotDataBackoff will retry at increasing steps for up to ~30 seconds. // If the ConfigMap update fails, the list won't be reconciled again until next time @@ -170,7 +182,7 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err defer ss.Close() if _, err := io.Copy(decompressed, ss); err != nil { - os.Remove("") + os.Remove(decompressed.Name()) return "", err } } @@ -265,12 +277,11 @@ func (e *ETCD) Snapshot(ctx context.Context) error { Status: failedSnapshotStatus, Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), Size: 0, - Compressed: e.config.EtcdSnapshotCompress, metadataSource: extraMetadata, } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot failure data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } @@ -310,7 +321,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { @@ -352,7 +363,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) @@ -397,7 +408,10 @@ type snapshotFile struct { S3 *s3Config `json:"s3Config,omitempty"` Compressed bool `json:"compressed"` + // these fields are used for the internal representation of the snapshot + // to populate other fields before serialization to the legacy configmap. metadataSource *v1.ConfigMap `json:"-"` + nodeSource string `json:"-"` } // listLocalSnapshots provides a list of the currently stored @@ -541,7 +555,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { } if e.config.EtcdS3 { - if err := e.s3.deleteSnapshot(s); err != nil { + if err := e.s3.deleteSnapshot(ctx, s); err != nil { if isNotExist(err) { logrus.Infof("Snapshot %s not found in S3", s) } else { @@ -582,62 +596,55 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { return json.Marshal(sf) } -// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata -// available at the time. +// addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource +// of the same name. Resources will be created or updated as necessary. func (e *ETCD) addSnapshotData(sf snapshotFile) error { - // make sure the core.Factory is initialized. There can - // be a race between this core code startup. - for e.config.Runtime.Core == nil { + // make sure the K3s factory is initialized. + for e.config.Runtime.K3s == nil { runtime.Gosched() } - sfKey := generateSnapshotConfigMapKey(sf) - marshalledSnapshotFile, err := marshalSnapshotFile(sf) - if err != nil { - return err - } + snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + esfName := generateSnapshotName(sf) - pruneCount := pruneStepSize - var lastErr error + var esf *apisv1.ETCDSnapshotFile return retry.OnError(snapshotDataBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - - if apierrors.IsNotFound(getErr) { - cm := v1.ConfigMap{ + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + }, func() (err error) { + // Get current object or create new one + esf, err = snapshots.Get(esfName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + esf = &apisv1.ETCDSnapshotFile{ ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, + Name: esfName, }, - Data: map[string]string{sfKey: string(marshalledSnapshotFile)}, } - _, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm) - return err - } - - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = make(map[string]string) } - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err + // mutate object + existing := esf.DeepCopyObject() + sf.toETCDSnapshotFile(esf) + + // create or update as necessary + if esf.CreationTimestamp.IsZero() { + var created *apisv1.ETCDSnapshotFile + created, err = snapshots.Create(esf) + if err == nil { + // Only emit an event for the snapshot when creating the resource + e.emitEvent(created) } - pruneCount += pruneStepSize + } else if !equality.Semantic.DeepEqual(existing, esf) { + _, err = snapshots.Update(esf) } - - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile) - - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr + return err }) } +// generateSnapshotConfigMapKey generates a derived name for the snapshot that is safe for use +// as a configmap key. func generateSnapshotConfigMapKey(sf snapshotFile) string { name := invalidKeyChars.ReplaceAllString(sf.Name, "_") if sf.NodeName == "s3" { @@ -646,33 +653,61 @@ func generateSnapshotConfigMapKey(sf snapshotFile) string { return "local-" + name } -// pruneConfigMap drops the oldest entries from the configMap. -// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. -func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { - if pruneCount > len(snapshotConfigMap.Data) { - return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") +// generateSnapshotName generates a derived name for the snapshot that is safe for use +// as a resource name. +func generateSnapshotName(sf snapshotFile) string { + name := strings.ToLower(sf.Name) + nodename := sf.nodeSource + if nodename == "" { + nodename = sf.NodeName } - - var snapshotFiles []snapshotFile - retention := len(snapshotConfigMap.Data) - pruneCount - for name := range snapshotConfigMap.Data { - basename, compressed := strings.CutSuffix(name, compressedExtension) - ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) - snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) + // Include a digest of the hostname and location to ensure unique resource + // names. Snapshots should already include the hostname, but this ensures we + // don't accidentally hide records if a snapshot with the same name somehow + // exists on multiple nodes. + digest := sha256.Sum256([]byte(nodename + sf.Location)) + // If the lowercase filename isn't usable as a resource name, and short enough that we can include a prefix and suffix, + // generate a safe name derived from the hostname and timestamp. + if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 || len(name)+13 > validation.DNS1123SubdomainMaxLength { + nodename, _, _ := strings.Cut(nodename, ".") + name = fmt.Sprintf("etcd-snapshot-%s-%d", nodename, sf.CreatedAt.Unix()) + if sf.Compressed { + name += compressedExtension + } } + if sf.NodeName == "s3" { + return "s3-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] + } + return "local-" + name + "-" + hex.EncodeToString(digest[0:])[0:6] +} - // sort newest-first so we can prune entries past the retention count - sort.Slice(snapshotFiles, func(i, j int) bool { - return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) - }) +// generateETCDSnapshotFileConfigMapKey generates a key that the corresponding +// snapshotFile would be stored under in the legacy configmap +func generateETCDSnapshotFileConfigMapKey(esf apisv1.ETCDSnapshotFile) string { + name := invalidKeyChars.ReplaceAllString(esf.Spec.SnapshotName, "_") + if esf.Spec.S3 != nil { + return "s3-" + name + } + return "local-" + name +} - for _, snapshotFile := range snapshotFiles[retention:] { - delete(snapshotConfigMap.Data, snapshotFile.Name) +func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) { + switch { + case e.config.Runtime.Event == nil: + case !esf.DeletionTimestamp.IsZero(): + e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotDeleted", "Snapshot %s deleted", esf.Spec.SnapshotName) + case esf.Status.Error != nil: + message := fmt.Sprintf("Failed to save snapshot %s on %s", esf.Spec.SnapshotName, esf.Spec.NodeName) + if esf.Status.Error.Message != nil { + message += ": " + *esf.Status.Error.Message + } + e.config.Runtime.Event.Event(esf, v1.EventTypeWarning, "ETCDSnapshotFailed", message) + default: + e.config.Runtime.Event.Eventf(esf, v1.EventTypeNormal, "ETCDSnapshotCreated", "Snapshot %s saved on %s", esf.Spec.SnapshotName, esf.Spec.NodeName) } - return nil } -// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap. +// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. // It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots // and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { @@ -682,167 +717,171 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { runtime.Gosched() } - logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName) - defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName) + logrus.Infof("Reconciling ETCDSnapshotFile resources") + defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete") - pruneCount := pruneStepSize - var lastErr error - return retry.OnError(retry.DefaultBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - if apierrors.IsNotFound(getErr) { - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, - }, - } - cm, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(cm) - if err != nil { - return err - } - snapshotConfigMap = cm - } + // Get snapshots from local filesystem + snapshotFiles, err := e.listLocalSnapshots() + if err != nil { + return err + } - logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation) - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = map[string]string{} - } + nodeNames := []string{os.Getenv("NODE_NAME")} - snapshotFiles, err := e.listLocalSnapshots() - if err != nil { + // Get snapshots from S3 + if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { return err } - // s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental - // clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details - var s3ListSuccessful bool - - if e.config.EtcdS3 { - if err := e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client: %v", err) - return err + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { + logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) + } else { + for k, v := range s3Snapshots { + snapshotFiles[k] = v } + nodeNames = append(nodeNames, "s3") + } + } - if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { - logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) - } else { - for k, v := range s3Snapshots { - snapshotFiles[k] = v + // Try to load metadata from the legacy configmap, in case any local or s3 snapshots + // were created by an old release that does not write the metadata alongside the snapshot file. + snapshotConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + if snapshotConfigMap != nil { + for sfKey, sf := range snapshotFiles { + logrus.Debugf("Found snapshotFile for %s with key %s", sf.Name, sfKey) + // if the configmap has data for this snapshot, and local metadata is empty, + // deserialize the value from the configmap and attempt to load it. + if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.metadataSource == nil { + sfTemp := &snapshotFile{} + if err := json.Unmarshal([]byte(cmSnapshotValue), sfTemp); err != nil { + logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err) + continue } - s3ListSuccessful = true + sf.Metadata = sfTemp.Metadata + snapshotFiles[sfKey] = sf } } + } - nodeName := os.Getenv("NODE_NAME") + labelSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: labelStorageNode, + Operator: metav1.LabelSelectorOpIn, + Values: nodeNames, + }}, + } - // deletedSnapshots is a map[string]string where key is the configmap key and the value is the marshalled snapshot file - // it will be populated below with snapshots that are either from S3 or on the local node. Notably, deletedSnapshots will - // not contain snapshots that are in the "failed" status - deletedSnapshots := make(map[string]string) - // failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap - // These are stored unmarshaled so we can sort based on name. - var failedSnapshots []snapshotFile - var failedS3Snapshots []snapshotFile + selector, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return err + } - // remove entries for this node and s3 (if S3 is enabled) only - for k, v := range snapshotConfigMap.Data { - var sf snapshotFile - if err := json.Unmarshal([]byte(v), &sf); err != nil { - return err + // List all snapshots matching the selector + snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return err + } + + // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync. + // If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes. + // The one exception to the last rule is failed snapshots - these must be retained for a period of time. + for _, esf := range esfList.Items { + sfKey := generateETCDSnapshotFileConfigMapKey(esf) + logrus.Debugf("Found ETCDSnapshotFile for %s with key %s", esf.Spec.SnapshotName, sfKey) + if sf, ok := snapshotFiles[sfKey]; ok && generateSnapshotName(sf) == esf.Name { + // exists in both and names match, don't need to sync + delete(snapshotFiles, sfKey) + } else { + // doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it + if esf.Status.Error != nil && esf.Status.Error.Time != nil { + expires := esf.Status.Error.Time.Add(errorTTL) + if time.Now().Before(expires) { + continue + } + } + if ok { + logrus.Debugf("Name of ETCDSnapshotFile for snapshotFile with key %s does not match: %s vs %s", sfKey, generateSnapshotName(sf), esf.Name) + } else { + logrus.Debugf("Key %s not found in snapshotFile list", sfKey) } - if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != failedSnapshotStatus { - // Only delete the snapshot if the snapshot was not failed - // sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status - deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { - // Handle locally failed snapshots. - failedSnapshots = append(failedSnapshots, sf) - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { - // If we're operating against S3, we can clean up failed S3 snapshots that failed on this node. - failedS3Snapshots = append(failedS3Snapshots, sf) - delete(snapshotConfigMap.Data, k) + logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName) + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } } + } - // Apply the failed snapshot retention policy to locally failed snapshots - if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedSnapshots, func(i, j int) bool { - return failedSnapshots[j].CreatedAt.Before(failedSnapshots[i].CreatedAt) - }) - - for _, dfs := range failedSnapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotConfigMapKey(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) - if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } + // Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created + for _, sf := range snapshotFiles { + logrus.Infof("Creating ETCDSnapshotFile for %s", sf.Name) + if err := e.addSnapshotData(sf); err != nil { + logrus.Errorf("Failed to create ETCDSnapshotFile: %v", err) } + } - // Apply the failed snapshot retention policy to the S3 snapshots - if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedS3Snapshots, func(i, j int) bool { - return failedS3Snapshots[j].CreatedAt.Before(failedS3Snapshots[i].CreatedAt) - }) + // List all snapshots in Kubernetes not stored on S3 or a current etcd node. + // These snapshots are local to a node that no longer runs etcd and cannot be restored. + // If the node rejoins later and has local snapshots, it will reconcile them itself. + labelSelector.MatchExpressions[0].Operator = metav1.LabelSelectorOpNotIn + labelSelector.MatchExpressions[0].Values = []string{"s3"} - for _, dfs := range failedS3Snapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotConfigMapKey(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) - if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } - } + // Get a list of all etcd nodes currently in the cluster and add them to the selector + nodes := e.config.Runtime.Core.Core().V1().Node() + etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} + nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) + if err != nil { + return err + } - // save the local entries to the ConfigMap if they are still on disk or in S3. - for _, snapshot := range snapshotFiles { - var sf snapshotFile - sfKey := generateSnapshotConfigMapKey(snapshot) - if v, ok := deletedSnapshots[sfKey]; ok { - // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it - if err := json.Unmarshal([]byte(v), &sf); err != nil { - logrus.Errorf("Error unmarshaling snapshot file: %v", err) - // use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing) - sf = snapshot - } - } else { - sf = snapshot - } + for _, node := range nodeList.Items { + labelSelector.MatchExpressions[0].Values = append(labelSelector.MatchExpressions[0].Values, node.Name) + } - sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. - marshalledSnapshot, err := marshalSnapshotFile(sf) - if err != nil { - logrus.Warnf("Failed to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } + selector, err = metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return err + } - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err - } - pruneCount += pruneStepSize + // List and remove all snapshots stored on nodes that do not match the selector + esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return err + } + + for _, esf := range esfList.Items { + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err) } + } - logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data)) - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr - }) + // Update our Node object to note the timestamp of the snapshot storages that have been reconciled + now := time.Now().Round(time.Second).Format(time.RFC3339) + patch := []map[string]string{ + { + "op": "add", + "value": now, + "path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"), + }, + } + if e.config.EtcdS3 { + patch = append(patch, map[string]string{ + "op": "add", + "value": now, + "path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"), + }) + } + b, err := json.Marshal(patch) + if err != nil { + return err + } + _, err = nodes.Patch(nodeNames[0], types.JSONPatchType, b) + return err } // setSnapshotFunction schedules snapshots at the configured interval. @@ -866,7 +905,7 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } - logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix, snapshotDir) + logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir) var snapshotFiles []snapshotFile if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { @@ -909,11 +948,6 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } -func isTooLargeError(err error) bool { - // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. - return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) -} - func isNotExist(err error) bool { if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) { return true @@ -941,3 +975,123 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro } return os.WriteFile(metadataPath, m, 0700) } + +func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + if esf == nil { + panic("cannot convert from nil ETCDSnapshotFile") + } + + sf.Name = esf.Spec.SnapshotName + sf.Location = esf.Spec.Location + sf.CreatedAt = esf.Status.CreationTime + sf.nodeSource = esf.Spec.NodeName + sf.Compressed = strings.HasSuffix(esf.Spec.SnapshotName, compressedExtension) + + if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse { + sf.Status = successfulSnapshotStatus + } else { + sf.Status = failedSnapshotStatus + } + + if esf.Status.Size != nil { + sf.Size = esf.Status.Size.Value() + } + + if esf.Status.Error != nil { + if esf.Status.Error.Time != nil { + sf.CreatedAt = esf.Status.Error.Time + } + message := "etcd snapshot failed" + if esf.Status.Error.Message != nil { + message = *esf.Status.Error.Message + } + sf.Message = base64.StdEncoding.EncodeToString([]byte(message)) + } + + if len(esf.Spec.Metadata) > 0 { + if b, err := json.Marshal(esf.Spec.Metadata); err != nil { + logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err) + } else { + sf.Metadata = base64.StdEncoding.EncodeToString(b) + } + } + + if esf.Spec.S3 == nil { + sf.NodeName = esf.Spec.NodeName + } else { + sf.NodeName = "s3" + sf.S3 = &s3Config{ + Endpoint: esf.Spec.S3.Endpoint, + EndpointCA: esf.Spec.S3.EndpointCA, + SkipSSLVerify: esf.Spec.S3.SkipSSLVerify, + Bucket: esf.Spec.S3.Bucket, + Region: esf.Spec.S3.Region, + Folder: esf.Spec.S3.Prefix, + Insecure: esf.Spec.S3.Insecure, + } + } +} + +func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + if esf == nil { + panic("cannot convert to nil ETCDSnapshotFile") + } + esf.Spec.SnapshotName = sf.Name + esf.Spec.Location = sf.Location + esf.Status.CreationTime = sf.CreatedAt + esf.Status.ReadyToUse = pointer.Bool(sf.Status == successfulSnapshotStatus) + esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI) + + if sf.nodeSource != "" { + esf.Spec.NodeName = sf.nodeSource + } else { + esf.Spec.NodeName = sf.NodeName + } + + if sf.Message != "" { + var message string + b, err := base64.StdEncoding.DecodeString(sf.Message) + if err != nil { + logrus.Warnf("Failed to decode error message for %s: %v", sf.Name, err) + message = "etcd snapshot failed" + } else { + message = string(b) + } + esf.Status.Error = &apisv1.ETCDSnapshotError{ + Time: sf.CreatedAt, + Message: &message, + } + } + + if sf.metadataSource != nil { + esf.Spec.Metadata = sf.metadataSource.Data + } else if sf.Metadata != "" { + metadata, err := base64.StdEncoding.DecodeString(sf.Metadata) + if err != nil { + logrus.Warnf("Failed to decode metadata for %s: %v", sf.Name, err) + } else { + if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil { + logrus.Warnf("Failed to unmarshal metadata for %s: %v", sf.Name, err) + } + } + } + + if esf.ObjectMeta.Labels == nil { + esf.ObjectMeta.Labels = map[string]string{} + } + + if sf.S3 == nil { + esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName + } else { + esf.ObjectMeta.Labels[labelStorageNode] = "s3" + esf.Spec.S3 = &apisv1.ETCDSnapshotS3{ + Endpoint: sf.S3.Endpoint, + EndpointCA: sf.S3.EndpointCA, + SkipSSLVerify: sf.S3.SkipSSLVerify, + Bucket: sf.S3.Bucket, + Region: sf.S3.Region, + Prefix: sf.S3.Folder, + Insecure: sf.S3.Insecure, + } + } +} diff --git a/pkg/etcd/snapshot_controller.go b/pkg/etcd/snapshot_controller.go new file mode 100644 index 0000000000..7da376741b --- /dev/null +++ b/pkg/etcd/snapshot_controller.go @@ -0,0 +1,312 @@ +package etcd + +import ( + "context" + "sort" + "strconv" + "strings" + "time" + + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1" + "github.com/k3s-io/k3s/pkg/util" + "github.com/k3s-io/k3s/pkg/version" + "github.com/pkg/errors" + controllerv1 "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + + "github.com/sirupsen/logrus" +) + +const ( + pruneStepSize = 4 + reconcileKey = "_reconcile_" + reconcileInterval = 600 * time.Minute +) + +var ( + snapshotConfigMapName = version.Program + "-etcd-snapshots" +) + +type etcdSnapshotHandler struct { + ctx context.Context + etcd *ETCD + snapshots controllersv1.ETCDSnapshotFileController + configmaps controllerv1.ConfigMapController +} + +func registerSnapshotHandlers(ctx context.Context, etcd *ETCD) { + snapshots := etcd.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + e := &etcdSnapshotHandler{ + ctx: ctx, + etcd: etcd, + snapshots: snapshots, + configmaps: etcd.config.Runtime.Core.Core().V1().ConfigMap(), + } + + logrus.Infof("Starting managed etcd snapshot ConfigMap controller") + snapshots.OnChange(ctx, "managed-etcd-snapshots-controller", e.sync) + snapshots.OnRemove(ctx, "managed-etcd-snapshots-controller", e.onRemove) + go wait.JitterUntil(func() { snapshots.Enqueue(reconcileKey) }, reconcileInterval, 0.04, false, ctx.Done()) +} + +func (e *etcdSnapshotHandler) sync(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + if key == reconcileKey { + return nil, e.reconcile() + } + if esf == nil || !esf.DeletionTimestamp.IsZero() { + return nil, nil + } + + sf := snapshotFile{} + sf.fromETCDSnapshotFile(esf) + sfKey := generateSnapshotConfigMapKey(sf) + m, err := marshalSnapshotFile(sf) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal snapshot ConfigMap data") + } + marshalledSnapshot := string(m) + + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrap(err, "failed to get snapshot ConfigMap") + } + snapshotConfigMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + } + + if snapshotConfigMap.Data[sfKey] != marshalledSnapshot { + if snapshotConfigMap.Data == nil { + snapshotConfigMap.Data = map[string]string{} + } + snapshotConfigMap.Data[sfKey] = marshalledSnapshot + + // Try to create or update the ConfigMap. If it is too large, prune old entries + // until it fits, or until it cannot be pruned any further. + pruneCount := pruneStepSize + err = retry.OnError(snapshotDataBackoff, isTooLargeError, func() (err error) { + if snapshotConfigMap.CreationTimestamp.IsZero() { + _, err = e.configmaps.Create(snapshotConfigMap) + } else { + _, err = e.configmaps.Update(snapshotConfigMap) + } + + if isTooLargeError(err) { + logrus.Warnf("Snapshot ConfigMap is too large, attempting to elide %d of %d entries to reduce size", pruneCount, len(snapshotConfigMap.Data)) + if perr := pruneConfigMap(snapshotConfigMap, pruneCount); perr != nil { + err = perr + } + // if the entry we're trying to add just got pruned, give up on adding it, + // as it is always going to get pushed off due to being too old to keep. + if _, ok := snapshotConfigMap.Data[sfKey]; !ok { + logrus.Warnf("Snapshot %s has been elided from ConfigMap to reduce size; not requeuing", key) + return nil + } + + pruneCount += pruneStepSize + } + return err + }) + } + + if err != nil { + err = errors.Wrap(err, "failed to sync snapshot to ConfigMap") + } + + return nil, err +} + +func (e *etcdSnapshotHandler) onRemove(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + if esf == nil { + return nil, nil + } + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, errors.Wrap(err, "failed to get snapshot ConfigMap") + } + + sfKey := generateETCDSnapshotFileConfigMapKey(*esf) + if _, ok := snapshotConfigMap.Data[sfKey]; ok { + delete(snapshotConfigMap.Data, sfKey) + if _, err := e.configmaps.Update(snapshotConfigMap); err != nil { + return nil, errors.Wrap(err, "failed to remove snapshot from ConfigMap") + } + } + e.etcd.emitEvent(esf) + return nil, nil +} + +func (e *etcdSnapshotHandler) reconcile() error { + logrus.Infof("Reconciling snapshot ConfigMap data") + + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "failed to get snapshot ConfigMap") + } + snapshotConfigMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + } + + // Get a list of all etcd nodes currently in the cluster. + // We will use this list to prune local entries for any node that does not exist. + nodes := e.etcd.config.Runtime.Core.Core().V1().Node() + etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} + nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) + if err != nil { + return err + } + + // Once a node has set the reconcile annotation, it is considered to have + // migrated to using ETCDSnapshotFile resources, and any old configmap + // entries for it can be pruned. Until the annotation is set, we will leave + // its entries alone. + syncedNodes := map[string]bool{} + for _, node := range nodeList.Items { + if _, ok := node.Annotations[annotationLocalReconciled]; ok { + syncedNodes[node.Name] = true + } + if _, ok := node.Annotations[annotationS3Reconciled]; ok { + syncedNodes["s3"] = true + } + } + + if len(syncedNodes) == 0 { + return errors.New("no nodes have reconciled ETCDSnapshotFile resources") + } + + // Get a list of existing snapshots + snapshotList, err := e.snapshots.List(metav1.ListOptions{}) + if err != nil { + return err + } + + snapshots := map[string]*apisv1.ETCDSnapshotFile{} + for i := range snapshotList.Items { + esf := &snapshotList.Items[i] + if esf.DeletionTimestamp.IsZero() { + sfKey := generateETCDSnapshotFileConfigMapKey(*esf) + snapshots[sfKey] = esf + } + } + + // Make a copy of the configmap for change detection + existing := snapshotConfigMap.DeepCopyObject() + + // Delete any keys missing from synced storages, or associated with missing nodes + for key := range snapshotConfigMap.Data { + if strings.HasPrefix(key, "s3-") { + // If a node has syncd s3 and the key is missing then delete it + if syncedNodes["s3"] && snapshots[key] == nil { + delete(snapshotConfigMap.Data, key) + } + } else if s, ok := strings.CutPrefix(key, "local-"); ok { + // If a matching node has synced and the key is missing then delete it + // If a matching node does not exist, delete the key + // A node is considered to match the snapshot if the snapshot name matches the node name + // after trimming the leading local- prefix and trailing timestamp and extension. + s, _ = strings.CutSuffix(s, ".zip") + s = strings.TrimRight(s, "-012345678") + var matchingNode bool + for _, node := range nodeList.Items { + if strings.HasSuffix(s, node.Name) { + if syncedNodes[node.Name] && snapshots[key] == nil { + delete(snapshotConfigMap.Data, key) + } + matchingNode = true + break + } + } + if !matchingNode { + delete(snapshotConfigMap.Data, key) + } + } + } + + // Ensure keys for existing snapshots + for sfKey, esf := range snapshots { + sf := snapshotFile{} + sf.fromETCDSnapshotFile(esf) + m, err := marshalSnapshotFile(sf) + if err != nil { + logrus.Warnf("Failed to marshal snapshot ConfigMap data for %s", sfKey) + continue + } + marshalledSnapshot := string(m) + snapshotConfigMap.Data[sfKey] = marshalledSnapshot + } + + // If the configmap didn't change, don't bother updating it + if equality.Semantic.DeepEqual(existing, snapshotConfigMap) { + return nil + } + + // Try to create or update the ConfigMap. If it is too large, prune old entries + // until it fits, or until it cannot be pruned any further. + pruneCount := pruneStepSize + return retry.OnError(snapshotDataBackoff, isTooLargeError, func() (err error) { + if snapshotConfigMap.CreationTimestamp.IsZero() { + _, err = e.configmaps.Create(snapshotConfigMap) + } else { + _, err = e.configmaps.Update(snapshotConfigMap) + } + + if isTooLargeError(err) { + logrus.Warnf("Snapshot ConfigMap is too large, attempting to elide %d of %d entries to reduce size", pruneCount, len(snapshotConfigMap.Data)) + if perr := pruneConfigMap(snapshotConfigMap, pruneCount); perr != nil { + err = perr + } + pruneCount += pruneStepSize + } + return err + }) +} + +// pruneConfigMap drops the oldest entries from the configMap. +// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. +func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { + if pruneCount >= len(snapshotConfigMap.Data) { + return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") + } + + var snapshotFiles []snapshotFile + retention := len(snapshotConfigMap.Data) - pruneCount + for name := range snapshotConfigMap.Data { + basename, compressed := strings.CutSuffix(name, compressedExtension) + ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) + } + + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) + }) + + for _, snapshotFile := range snapshotFiles[retention:] { + delete(snapshotConfigMap.Data, snapshotFile.Name) + } + return nil +} + +func isTooLargeError(err error) bool { + // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. + return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) +}