From 031d22df9e43f1c3014b8344337e133099902ae8 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 30 Jun 2023 14:59:59 +0200 Subject: [PATCH] Fix race condition in ChunkDiskMapper.Truncate() (#12500) * Fix race condition in ChunkDiskMapper.Truncate() Signed-off-by: Marco Pracucci * Added unit test Signed-off-by: Marco Pracucci * Update tsdb/chunks/head_chunks.go Co-authored-by: Ganesh Vernekar Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci Co-authored-by: Ganesh Vernekar --- tsdb/chunks/head_chunks.go | 20 +++++++++---- tsdb/chunks/head_chunks_test.go | 52 +++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index bcdab2125..d73eb36f8 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -948,12 +948,22 @@ func (cdm *ChunkDiskMapper) Truncate(fileNo uint32) error { if len(chkFileIndices) == len(removedFiles) { // All files were deleted. Reset the current sequence. cdm.evtlPosMtx.Lock() - if err == nil { - cdm.evtlPos.setSeq(0) - } else { - // In case of error, set it to the last file number on the disk that was not deleted. - cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) + + // We can safely reset the sequence only if the write queue is empty. If it's not empty, + // then there may be a job in the queue that will create a new segment file with an ID + // generated before the sequence reset. + // + // The queueIsEmpty() function must be called while holding the cdm.evtlPosMtx to avoid + // a race condition with WriteChunk(). + if cdm.writeQueue == nil || cdm.writeQueue.queueIsEmpty() { + if err == nil { + cdm.evtlPos.setSeq(0) + } else { + // In case of error, set it to the last file number on the disk that was not deleted. + cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) + } } + cdm.evtlPosMtx.Unlock() } diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 20a4c2064..68c133088 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,7 +19,9 @@ import ( "math/rand" "os" "strconv" + "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -356,6 +358,56 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { verifyFiles([]int{5, 6, 7}) } +func TestChunkDiskMapper_Truncate_WriteQueueRaceCondition(t *testing.T) { + hrw := createChunkDiskMapper(t, "") + t.Cleanup(func() { + require.NoError(t, hrw.Close()) + }) + + // This test should only run when the queue is enabled. + if hrw.writeQueue == nil { + t.Skip("This test should only run when the queue is enabled") + } + + // Add an artificial delay in the writeChunk function to easily trigger the race condition. + origWriteChunk := hrw.writeQueue.writeChunk + hrw.writeQueue.writeChunk = func(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, isOOO, cutFile bool) error { + time.Sleep(100 * time.Millisecond) + return origWriteChunk(seriesRef, mint, maxt, chk, ref, isOOO, cutFile) + } + + wg := sync.WaitGroup{} + wg.Add(2) + + // Write a chunk. Since the queue is enabled, the chunk will be written asynchronously (with the artificial delay). + ref := hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { + defer wg.Done() + require.NoError(t, err) + }) + + seq, _ := ref.Unpack() + require.Equal(t, 1, seq) + + // Truncate, simulating that all chunks from segment files before 1 can be dropped. + require.NoError(t, hrw.Truncate(1)) + + // Request to cut a new file when writing the next chunk. If there's a race condition, cutting a new file will + // allow us to detect there's actually an issue with the sequence number (because it's checked when a new segment + // file is created). + hrw.CutNewFile() + + // Write another chunk. This will cut a new file. + ref = hrw.WriteChunk(1, 0, 10, randomChunk(t), false, func(err error) { + defer wg.Done() + require.NoError(t, err) + }) + + seq, _ = ref.Unpack() + require.Equal(t, 2, seq) + + wg.Wait() +} + // TestHeadReadWriter_TruncateAfterIterateChunksError tests for // https://github.com/prometheus/prometheus/issues/7753 func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {