Fix unsequential m-map files (#7414)

* Fix unsequential m-map files

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/7417/head
Ganesh Vernekar 5 years ago committed by GitHub
parent 5d7e3e9706
commit 48fae12b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -670,8 +670,8 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
var removedFiles []int var removedFiles []int
for _, seq := range chkFileIndices { for _, seq := range chkFileIndices {
if seq == cdm.curFileSequence { if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint {
continue break
} }
if cdm.mmappedChunkFiles[seq].maxt < mint { if cdm.mmappedChunkFiles[seq].maxt < mint {
removedFiles = append(removedFiles, seq) removedFiles = append(removedFiles, seq)
@ -680,7 +680,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
cdm.readPathMtx.RUnlock() cdm.readPathMtx.RUnlock()
var merr tsdb_errors.MultiError var merr tsdb_errors.MultiError
// Cut a new file only if the current file has some chunks.
if cdm.curFileNumBytes > HeadChunkFileHeaderSize {
merr.Add(cdm.CutNewFile()) merr.Add(cdm.CutNewFile())
}
merr.Add(cdm.deleteFiles(removedFiles)) merr.Add(cdm.deleteFiles(removedFiles))
return merr.Err() return merr.Err()
} }

@ -156,6 +156,11 @@ func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
} }
// TestHeadReadWriter_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestHeadReadWriter_Truncate(t *testing.T) { func TestHeadReadWriter_Truncate(t *testing.T) {
hrw, close := testHeadReadWriter(t) hrw, close := testHeadReadWriter(t)
defer func() { defer func() {
@ -163,10 +168,6 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
close() close()
}() }()
testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "")
timeRange := 0 timeRange := 0
fileTimeStep := 100 fileTimeStep := 100
totalFiles := 7 totalFiles := 7
@ -204,15 +205,15 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
cutFile(i) cutFile(i)
} }
// Verifying the the remaining files. // Verifying the files.
verifyRemainingFiles := func(remainingFiles, startIndex int) { verifyFiles := func(remainingFiles, startIndex int) {
//t.Helper() t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name()) files, err := ioutil.ReadDir(hrw.dir.Name())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, remainingFiles, len(files)) testutil.Equals(t, remainingFiles, len(files), "files on disk")
testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles)) testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
testutil.Equals(t, remainingFiles, len(hrw.closers)) testutil.Equals(t, remainingFiles, len(hrw.closers), "closers")
for i := 1; i <= totalFiles; i++ { for i := 1; i <= totalFiles; i++ {
_, ok := hrw.mmappedChunkFiles[i] _, ok := hrw.mmappedChunkFiles[i]
@ -225,12 +226,12 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
} }
// Verify the number of segments. // Verify the number of segments.
verifyRemainingFiles(totalFiles, 1) verifyFiles(totalFiles, 1)
// Truncating files. // Truncating files.
testutil.Ok(t, hrw.Truncate(timeToTruncate)) testutil.Ok(t, hrw.Truncate(timeToTruncate))
totalFiles++ // Truncation creates a new file. totalFiles++ // Truncation creates a new file as the last file is not empty.
verifyRemainingFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation) verifyFiles(totalFiles-filesDeletedAfter1stTruncation, startIndexAfter1stTruncation)
addChunk() // Add a chunk so that new file is not truncated. addChunk() // Add a chunk so that new file is not truncated.
dir := hrw.dir.Name() dir := hrw.dir.Name()
@ -245,14 +246,95 @@ func TestHeadReadWriter_Truncate(t *testing.T) {
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "") testutil.Assert(t, hrw.fileMaxtSet, "")
// Truncating files after restart. // Truncating files after restart. As the last file was empty, this creates no new files.
testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart)) testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart))
totalFiles++ // Truncation creates a new file. verifyFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation)
verifyRemainingFiles(totalFiles-filesDeletedAfter2ndTruncation, startIndexAfter2ndTruncation)
// First chunk after restart creates a new file.
addChunk()
totalFiles++
// Truncating till current time should not delete the current active file. // Truncating till current time should not delete the current active file.
testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep))) testutil.Ok(t, hrw.Truncate(int64(timeRange+fileTimeStep)))
verifyRemainingFiles(2, totalFiles) // One file was the active file and one was newly created. verifyFiles(2, totalFiles) // One file is the active file and one was newly created.
}
// TestHeadReadWriter_Truncate_NoUnsequentialFiles tests
// that truncation leaves no unsequential files on disk, mainly under the following case
// * There is an empty file in between the sequence while the truncation
// deletes files only upto a sequence before that (i.e. stops deleting
// after it has found a file that is not deletable).
// This tests https://github.com/prometheus/prometheus/issues/7412 where
// the truncation used to check all the files for deletion and end up
// deleting empty files in between and breaking the sequence.
func TestHeadReadWriter_Truncate_NoUnsequentialFiles(t *testing.T) {
hrw, close := testHeadReadWriter(t)
defer func() {
testutil.Ok(t, hrw.Close())
close()
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
testutil.Ok(t, err)
timeRange += step
}
emptyFile := func() {
testutil.Ok(t, hrw.CutNewFile())
}
nonEmptyFile := func() {
emptyFile()
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
emptyFile() // 4.
nonEmptyFile() // 5.
emptyFile() // 6.
// Verifying the files.
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name())
testutil.Ok(t, err)
testutil.Equals(t, len(remainingFiles), len(files), "files on disk")
testutil.Equals(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
testutil.Equals(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
testutil.Equals(t, true, ok)
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6})
// Truncating files till 2. It should not delete anything after 3 (inclusive)
// though files 4 and 6 are empty.
file2Maxt := hrw.mmappedChunkFiles[2].maxt
testutil.Ok(t, hrw.Truncate(file2Maxt+1))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
addChunk()
// Truncate creates another file as 6 is not empty now.
testutil.Ok(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6, 7})
dir := hrw.dir.Name()
testutil.Ok(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
testutil.Ok(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
} }
func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) { func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) {
@ -260,6 +342,9 @@ func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) {
testutil.Ok(t, err) testutil.Ok(t, err)
hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool()) hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool())
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, !hrw.fileMaxtSet, "")
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
testutil.Assert(t, hrw.fileMaxtSet, "")
return hrw, func() { return hrw, func() {
testutil.Ok(t, os.RemoveAll(tmpdir)) testutil.Ok(t, os.RemoveAll(tmpdir))
} }

Loading…
Cancel
Save