Browse Source

Fix race condition when multiple nodes reconcile S3 snapshots

Don't delete s3 etcdsnapshotfiles if they are missing from s3 but less than a minute old, its possible the other node just finished uploading it and the object key has not yet become visible.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 0826ebc142)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11075/head
Brad Davidson 2 months ago committed by Brad Davidson
parent
commit
e20fb2a766
  1. 25
      pkg/etcd/s3/s3.go
  2. 43
      pkg/etcd/snapshot.go

25
pkg/etcd/s3/s3.go

@ -416,7 +416,7 @@ func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix st
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)
key := path.Base(df.Key)
if err := c.DeleteSnapshot(ctx, key); err != nil {
if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
return deleted, err
}
deleted = append(deleted, key)
@ -431,14 +431,27 @@ func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
defer cancel()
key = path.Join(c.etcdS3.Folder, key)
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{})
if err == nil || snapshot.IsNotExist(err) {
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) {
err = merr
_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
if err == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
// check for and try to delete the metadata regardless of whether or not the
// snapshot existed, just to ensure that things are cleaned up in the case of
// ephemeral errors. Metadata delete errors are only exposed if the object
// exists and fails to delete.
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
if merr == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
// return error from snapshot StatObject call, so that callers can determine
// if the object was actually deleted or not by checking for a NotFound error.
return err
}

43
pkg/etcd/snapshot.go

@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strconv"
"strings"
@ -41,6 +42,7 @@ import (
const (
errorTTL = 24 * time.Hour
s3ReconcileTTL = time.Minute
snapshotListPageSize = 20
)
@ -363,7 +365,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
}
}
return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}
// listLocalSnapshots provides a list of the currently stored
@ -464,7 +466,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err
res.Deleted = append(res.Deleted, deleted...)
}
}
return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}
// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
@ -555,7 +557,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
}
}
return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}
func (e *ETCD) deleteSnapshot(snapshotPath string) error {
@ -647,9 +649,17 @@ func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
}
// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
// and reconcile snapshots from S3.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3.
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
return e.reconcileSnapshotData(ctx, nil)
}
// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
// the provided SnapshotResult are deleted, even if they are within a retention window.
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
@ -726,6 +736,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize
now := time.Now().Round(time.Second)
// List all snapshots matching the selector
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
@ -742,10 +753,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// exists in both and names match, don't need to sync
delete(snapshotFiles, sfKey)
} else {
// doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
// doesn't exist on disk/s3
if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) {
// snapshot has been intentionally deleted, skip checking for expiration
} else if esf.Status.Error != nil && esf.Status.Error.Time != nil {
expires := esf.Status.Error.Time.Add(errorTTL)
if time.Now().Before(expires) {
if now.Before(expires) {
// it's an error that hasn't expired yet, leave it
return nil
}
} else if esf.Spec.S3 != nil {
expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL)
if now.Before(expires) {
// it's an s3 snapshot that's only just been created, leave it to prevent a race condition
// when multiple nodes are uploading snapshots at the same time.
return nil
}
}
@ -754,6 +775,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
} else {
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
}
// otherwise remove it
logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
@ -817,18 +839,17 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
now := time.Now().Round(time.Second).Format(time.RFC3339)
patch := []map[string]string{
{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
},
}
if e.config.EtcdS3 != nil {
patch = append(patch, map[string]string{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"),
})
}

Loading…
Cancel
Save