mirror of https://github.com/k3s-io/k3s
Fix race condition when multiple nodes reconcile S3 snapshots
Don't delete s3 etcdsnapshotfiles if they are missing from s3 but less than a minute old, its possible the other node just finished uploading it and the object key has not yet become visible.
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
(cherry picked from commit 0826ebc142
)
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/11074/head
parent
4d0a6134a0
commit
12b9e7d4b1
|
@ -416,7 +416,7 @@ func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix st
|
||||||
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)
|
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)
|
||||||
|
|
||||||
key := path.Base(df.Key)
|
key := path.Base(df.Key)
|
||||||
if err := c.DeleteSnapshot(ctx, key); err != nil {
|
if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
|
||||||
return deleted, err
|
return deleted, err
|
||||||
}
|
}
|
||||||
deleted = append(deleted, key)
|
deleted = append(deleted, key)
|
||||||
|
@ -431,14 +431,27 @@ func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
key = path.Join(c.etcdS3.Folder, key)
|
key = path.Join(c.etcdS3.Folder, key)
|
||||||
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{})
|
_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
|
||||||
if err == nil || snapshot.IsNotExist(err) {
|
if err == nil {
|
||||||
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
|
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
|
||||||
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) {
|
return err
|
||||||
err = merr
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check for and try to delete the metadata regardless of whether or not the
|
||||||
|
// snapshot existed, just to ensure that things are cleaned up in the case of
|
||||||
|
// ephemeral errors. Metadata delete errors are only exposed if the object
|
||||||
|
// exists and fails to delete.
|
||||||
|
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
|
||||||
|
_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
|
||||||
|
if merr == nil {
|
||||||
|
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// return error from snapshot StatObject call, so that callers can determine
|
||||||
|
// if the object was actually deleted or not by checking for a NotFound error.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -41,6 +42,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
errorTTL = 24 * time.Hour
|
errorTTL = 24 * time.Hour
|
||||||
|
s3ReconcileTTL = time.Minute
|
||||||
snapshotListPageSize = 20
|
snapshotListPageSize = 20
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -363,7 +365,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, e.ReconcileSnapshotData(ctx)
|
return res, e.reconcileSnapshotData(ctx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
// listLocalSnapshots provides a list of the currently stored
|
// listLocalSnapshots provides a list of the currently stored
|
||||||
|
@ -464,7 +466,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err
|
||||||
res.Deleted = append(res.Deleted, deleted...)
|
res.Deleted = append(res.Deleted, deleted...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return res, e.ReconcileSnapshotData(ctx)
|
return res, e.reconcileSnapshotData(ctx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
|
// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
|
||||||
|
@ -555,7 +557,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, e.ReconcileSnapshotData(ctx)
|
return res, e.reconcileSnapshotData(ctx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ETCD) deleteSnapshot(snapshotPath string) error {
|
func (e *ETCD) deleteSnapshot(snapshotPath string) error {
|
||||||
|
@ -647,9 +649,17 @@ func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
|
// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
|
||||||
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
|
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
|
||||||
// and reconcile snapshots from S3.
|
// list S3 snapshots and reconcile snapshots from S3.
|
||||||
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
|
return e.reconcileSnapshotData(ctx, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
|
||||||
|
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
|
||||||
|
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
|
||||||
|
// the provided SnapshotResult are deleted, even if they are within a retention window.
|
||||||
|
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error {
|
||||||
// make sure the core.Factory is initialized. There can
|
// make sure the core.Factory is initialized. There can
|
||||||
// be a race between this core code startup.
|
// be a race between this core code startup.
|
||||||
for e.config.Runtime.Core == nil {
|
for e.config.Runtime.Core == nil {
|
||||||
|
@ -726,6 +736,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
|
||||||
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
|
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
|
||||||
snapshotPager.PageSize = snapshotListPageSize
|
snapshotPager.PageSize = snapshotListPageSize
|
||||||
|
now := time.Now().Round(time.Second)
|
||||||
|
|
||||||
// List all snapshots matching the selector
|
// List all snapshots matching the selector
|
||||||
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
|
||||||
|
@ -742,10 +753,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
// exists in both and names match, don't need to sync
|
// exists in both and names match, don't need to sync
|
||||||
delete(snapshotFiles, sfKey)
|
delete(snapshotFiles, sfKey)
|
||||||
} else {
|
} else {
|
||||||
// doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it
|
// doesn't exist on disk/s3
|
||||||
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) {
|
||||||
|
// snapshot has been intentionally deleted, skip checking for expiration
|
||||||
|
} else if esf.Status.Error != nil && esf.Status.Error.Time != nil {
|
||||||
expires := esf.Status.Error.Time.Add(errorTTL)
|
expires := esf.Status.Error.Time.Add(errorTTL)
|
||||||
if time.Now().Before(expires) {
|
if now.Before(expires) {
|
||||||
|
// it's an error that hasn't expired yet, leave it
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else if esf.Spec.S3 != nil {
|
||||||
|
expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL)
|
||||||
|
if now.Before(expires) {
|
||||||
|
// it's an s3 snapshot that's only just been created, leave it to prevent a race condition
|
||||||
|
// when multiple nodes are uploading snapshots at the same time.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -754,6 +775,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
} else {
|
} else {
|
||||||
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
|
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
|
||||||
}
|
}
|
||||||
|
// otherwise remove it
|
||||||
logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
|
logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
|
||||||
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
|
||||||
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
|
||||||
|
@ -817,18 +839,17 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
|
||||||
now := time.Now().Round(time.Second).Format(time.RFC3339)
|
|
||||||
patch := []map[string]string{
|
patch := []map[string]string{
|
||||||
{
|
{
|
||||||
"op": "add",
|
"op": "add",
|
||||||
"value": now,
|
"value": now.Format(time.RFC3339),
|
||||||
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
|
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if e.config.EtcdS3 != nil {
|
if e.config.EtcdS3 != nil {
|
||||||
patch = append(patch, map[string]string{
|
patch = append(patch, map[string]string{
|
||||||
"op": "add",
|
"op": "add",
|
||||||
"value": now,
|
"value": now.Format(time.RFC3339),
|
||||||
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"),
|
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue