From 8944520cccd173e5d7143913758e54c2a8bced15 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 8 Sep 2021 19:53:44 +0530 Subject: [PATCH] Fix deletion of old snapshots (#9314) Signed-off-by: Ganesh Vernekar --- tsdb/head_test.go | 106 ++++++++++++++++++++++++++++------------------ tsdb/head_wal.go | 25 ++++++++--- 2 files changed, 83 insertions(+), 48 deletions(-) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b57a076bc..b09e59ae9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -25,6 +25,7 @@ import ( "path/filepath" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -2586,6 +2587,32 @@ func TestChunkSnapshot(t *testing.T) { require.Equal(t, expExemplars, actExemplars) } + var ( + wlast, woffset int + err error + ) + + closeHeadAndCheckSnapshot := func() { + require.NoError(t, head.Close()) + + _, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot) + require.NoError(t, err) + require.Equal(t, wlast, sidx) + require.Equal(t, woffset, soffset) + } + + openHeadAndCheckReplay := func() { + w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) + require.NoError(t, err) + head, err = NewHead(nil, nil, w, head.opts, nil) + require.NoError(t, err) + require.NoError(t, head.Init(math.MinInt64)) + + checkSamples() + checkTombstones() + checkExemplars() + } + { // Initial data that goes into snapshot. // Add some initial samples with >=1 m-map chunk. app := head.Appender(context.Background()) @@ -2630,31 +2657,16 @@ func TestChunkSnapshot(t *testing.T) { } // These references should be the ones used for the snapshot. - wlast, woffset, err := head.wal.LastSegmentAndOffset() + wlast, woffset, err = head.wal.LastSegmentAndOffset() require.NoError(t, err) - { // Creating snapshot and verifying it. + { + // Creating snapshot and verifying it. head.opts.EnableMemorySnapshotOnShutdown = true - require.NoError(t, head.Close()) // This will create a snapshot. + closeHeadAndCheckSnapshot() // This will create a snapshot. - _, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot) - require.NoError(t, err) - require.Equal(t, wlast, sidx) - require.Equal(t, woffset, soffset) - } - - { // Test the replay of snapshot. - // Create new Head which should replay this snapshot. - w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) - require.NoError(t, err) - head, err = NewHead(nil, nil, w, head.opts, nil) - require.NoError(t, err) - require.NoError(t, head.Init(math.MinInt64)) - - // Test query for snapshot replay. - checkSamples() - checkTombstones() - checkExemplars() + // Test the replay of snapshot. + openHeadAndCheckReplay() } { // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk. @@ -2700,30 +2712,42 @@ func TestChunkSnapshot(t *testing.T) { } } - { // Close Head and verify that new snapshot was not created. + { + // Close Head and verify that new snapshot was not created. head.opts.EnableMemorySnapshotOnShutdown = false - require.NoError(t, head.Close()) // This should not create a snapshot. + closeHeadAndCheckSnapshot() // This should not create a snapshot. - _, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot) - require.NoError(t, err) - require.Equal(t, wlast, sidx) - require.Equal(t, woffset, soffset) - } - - { // Test the replay of snapshot, m-map chunks, and WAL. - // Create new Head to replay snapshot, m-map chunks, and WAL. - w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false) - require.NoError(t, err) + // Test the replay of snapshot, m-map chunks, and WAL. head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot. - head, err = NewHead(nil, nil, w, head.opts, nil) - require.NoError(t, err) - require.NoError(t, head.Init(math.MinInt64)) - - // Test query when data is replayed from snapshot, m-map chunks, and WAL. - checkSamples() - checkTombstones() - checkExemplars() + openHeadAndCheckReplay() } + + // Creating another snapshot should delete the older snapshot and replay still works fine. + wlast, woffset, err = head.wal.LastSegmentAndOffset() + require.NoError(t, err) + + { + // Close Head and verify that new snapshot was created. + closeHeadAndCheckSnapshot() + + // Verify that there is only 1 snapshot. + files, err := ioutil.ReadDir(head.opts.ChunkDirRoot) + require.NoError(t, err) + snapshots := 0 + for i := len(files) - 1; i >= 0; i-- { + fi := files[i] + if strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { + snapshots++ + require.Equal(t, chunkSnapshotDir(wlast, woffset), fi.Name()) + } + } + require.Equal(t, 1, snapshots) + + // Test the replay of snapshot. + head.opts.EnableMemorySnapshotOnShutdown = true // Enabled to read from snapshot. + openHeadAndCheckReplay() + } + } func TestSnapshotError(t *testing.T) { diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index ead81d468..6a647dd85 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -568,7 +568,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { return stats, nil } - snapshotName := fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) + snapshotName := chunkSnapshotDir(wlast, woffset) cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName) cpdirtmp := cpdir + ".tmp" @@ -690,7 +690,7 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { return stats, errors.Wrap(err, "rename chunk snapshot directory") } - if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, cslast, csoffset); err != nil { + if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil { // Leftover old chunk snapshots do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher chunk snapshot exists. @@ -699,6 +699,10 @@ func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) { return stats, nil } +func chunkSnapshotDir(wlast, woffset int) string { + return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset) +} + func (h *Head) performChunkSnapshot() error { level.Info(h.logger).Log("msg", "creating chunk snapshot") startTime := time.Now() @@ -723,8 +727,9 @@ func LastChunkSnapshot(dir string) (string, int, int, error) { if err != nil { return "", 0, 0, err } - // Traverse list backwards since there may be multiple chunk snapshots left. - for i := len(files) - 1; i >= 0; i-- { + maxIdx, maxOffset := -1, -1 + maxFileName := "" + for i := 0; i < len(files); i++ { fi := files[i] if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) { @@ -749,9 +754,15 @@ func LastChunkSnapshot(dir string) (string, int, int, error) { continue } - return filepath.Join(dir, fi.Name()), idx, offset, nil + if idx > maxIdx || (idx == maxIdx && offset > maxOffset) { + maxIdx, maxOffset = idx, offset + maxFileName = filepath.Join(dir, fi.Name()) + } } - return "", 0, 0, record.ErrNotFound + if maxFileName == "" { + return "", 0, 0, record.ErrNotFound + } + return maxFileName, maxIdx, maxOffset, nil } // DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index. @@ -782,7 +793,7 @@ func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error { continue } - if idx <= maxIndex && offset < maxOffset { + if idx < maxIndex || (idx == maxIndex && offset < maxOffset) { if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil { errs.Add(err) }