From cfe30a7b6238f1e1e8c937da387ca9f3603cda03 Mon Sep 17 00:00:00 2001 From: Julien Pivotto Date: Tue, 21 Jul 2020 10:39:02 +0200 Subject: [PATCH] TSDB: Use t.Cleanup to delete temporary files (#7620) Signed-off-by: Julien Pivotto --- tsdb/compact_test.go | 6 +-- tsdb/db_test.go | 96 ++++++++++++++------------------------- tsdb/head_test.go | 105 +++++++++++++++++-------------------------- 3 files changed, 77 insertions(+), 130 deletions(-) diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 9ed29b413..fdad0fd8c 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -900,10 +900,9 @@ func BenchmarkCompactionFromHead(b *testing.B) { // This is needed for unit tests that rely on // checking state before and after a compaction. func TestDisableAutoCompactions(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() blockRange := db.compactor.(*LeveledCompactor).ranges[0] @@ -1057,10 +1056,9 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { for title, bootStrap := range tests { t.Run(title, func(t *testing.T) { - db, closeFn := openTestDB(t, nil, []int64{1, 100}) + db := openTestDB(t, nil, []int64{1, 100}) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() db.DisableCompactions() diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 236061889..5e43cbd1b 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -53,7 +53,7 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } -func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func()) { +func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { tmpdir, err := ioutil.TempDir("", "test") testutil.Ok(t, err) @@ -65,10 +65,11 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func() } testutil.Ok(t, err) - // Do not close the test database by default as it will deadlock on test failures. - return db, func() { + // Do not Close() the test database by default as it will deadlock on test failures. + t.Cleanup(func() { testutil.Ok(t, os.RemoveAll(tmpdir)) - } + }) + return db } // query runs a matcher query against the querier and fully expands its data. @@ -106,10 +107,9 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str // Ensure that blocks are held in memory in their time order // and not in ULID order as they are read from the directory. func TestDB_reloadOrder(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() metas := []BlockMeta{ @@ -133,10 +133,9 @@ func TestDB_reloadOrder(t *testing.T) { } func TestDataAvailableOnlyAfterCommit(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -164,8 +163,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { // TestNoPanicAfterWALCorrutpion ensures that querying the db after a WAL corruption doesn't cause a panic. // https://github.com/prometheus/prometheus/issues/7548 func TestNoPanicAfterWALCorrutpion(t *testing.T) { - db, closeFn := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil) - t.Cleanup(closeFn) + db := openTestDB(t, &Options{WALSegmentSize: 32 * 1024}, nil) // Append until the the first mmaped head chunk. // This is to ensure that all samples can be read from the mmaped chunks when the WAL is corrupted. @@ -223,10 +221,9 @@ func TestNoPanicAfterWALCorrutpion(t *testing.T) { } func TestDataNotAvailableAfterRollback(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -246,10 +243,9 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { } func TestDBAppenderAddRef(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app1 := db.Appender() @@ -301,10 +297,9 @@ func TestDBAppenderAddRef(t *testing.T) { } func TestAppendEmptyLabelsIgnored(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app1 := db.Appender() @@ -354,10 +349,9 @@ func TestDeleteSimple(t *testing.T) { Outer: for _, c := range cases { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -414,10 +408,9 @@ Outer: } func TestAmendDatapointCausesError(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -432,10 +425,9 @@ func TestAmendDatapointCausesError(t *testing.T) { } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -449,10 +441,9 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -466,10 +457,9 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { } func TestEmptyLabelsetCausesError(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -479,10 +469,9 @@ func TestEmptyLabelsetCausesError(t *testing.T) { } func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() // Append AmendedValue. @@ -522,8 +511,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { } func TestDB_Snapshot(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) - defer closeFn() + db := openTestDB(t, nil, nil) // append data app := db.Appender() @@ -574,8 +562,7 @@ func TestDB_Snapshot(t *testing.T) { // that are outside the set block time range. // See https://github.com/prometheus/prometheus/issues/5105 func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) - defer closeFn() + db := openTestDB(t, nil, nil) app := db.Appender() mint := int64(1414141414000) @@ -628,8 +615,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) { numSamples := int64(10) - db, closeFn := openTestDB(t, nil, nil) - defer closeFn() + db := openTestDB(t, nil, nil) app := db.Appender() @@ -771,10 +757,9 @@ func TestDB_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -875,8 +860,7 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) - defer closeFn() + db := openTestDB(t, nil, nil) dirDb := db.Dir() @@ -953,8 +937,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) { opts := DefaultOptions() opts.WALSegmentSize = segmentSize - db, closeFn := openTestDB(t, opts, nil) - defer closeFn() + db := openTestDB(t, opts, nil) app := db.Appender() for i := int64(0); i < 155; i++ { @@ -973,8 +956,7 @@ func TestWALSegmentSizeOptions(t *testing.T) { func TestTombstoneClean(t *testing.T) { numSamples := int64(10) - db, closeFn := openTestDB(t, nil, nil) - defer closeFn() + db := openTestDB(t, nil, nil) app := db.Appender() @@ -1071,10 +1053,9 @@ func TestTombstoneClean(t *testing.T) { // When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so // if TombstoneClean leaves any blocks behind these will overlap. func TestTombstoneCleanFail(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() var expectedBlockDirs []string @@ -1151,10 +1132,9 @@ func (*mockCompactorFailing) Compact(string, []string, []*Block) (ulid.ULID, err } func TestTimeRetention(t *testing.T) { - db, closeFn := openTestDB(t, nil, []int64{1000}) + db := openTestDB(t, nil, []int64{1000}) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() blocks := []*BlockMeta{ @@ -1183,10 +1163,9 @@ func TestTimeRetention(t *testing.T) { } func TestSizeRetention(t *testing.T) { - db, closeFn := openTestDB(t, nil, []int64{100}) + db := openTestDB(t, nil, []int64{100}) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() blocks := []*BlockMeta{ @@ -1283,12 +1262,11 @@ func TestSizeRetentionMetric(t *testing.T) { } for _, c := range cases { - db, closeFn := openTestDB(t, &Options{ + db := openTestDB(t, &Options{ MaxBytes: c.maxBytes, }, []int64{100}) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() actMaxBytes := int64(prom_testutil.ToFloat64(db.metrics.maxBytes)) @@ -1297,10 +1275,9 @@ func TestSizeRetentionMetric(t *testing.T) { } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() labelpairs := []labels.Labels{ @@ -1483,10 +1460,9 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { // Regression test for https://github.com/prometheus/tsdb/issues/347 func TestChunkAtBlockBoundary(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -1540,10 +1516,9 @@ func TestChunkAtBlockBoundary(t *testing.T) { } func TestQuerierWithBoundaryChunks(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() app := db.Appender() @@ -1691,10 +1666,9 @@ func TestInitializeHeadTimestamp(t *testing.T) { } func TestNoEmptyBlocks(t *testing.T) { - db, closeFn := openTestDB(t, nil, []int64{100}) + db := openTestDB(t, nil, []int64{100}) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() db.DisableCompactions() @@ -1849,10 +1823,9 @@ func TestDB_LabelNames(t *testing.T) { testutil.Ok(t, err) } for _, tst := range tests { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() appendSamples(db, 0, 4, tst.sampleLabels1) @@ -1896,10 +1869,9 @@ func TestDB_LabelNames(t *testing.T) { } func TestCorrectNumTombstones(t *testing.T) { - db, closeFn := openTestDB(t, nil, nil) + db := openTestDB(t, nil, nil) defer func() { testutil.Ok(t, db.Close()) - closeFn() }() blockRange := db.compactor.(*LeveledCompactor).ranges[0] diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6c5e46658..2432ae060 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -39,10 +39,26 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) +func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) { + dir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize, nil) + testutil.Ok(t, err) + + testutil.Ok(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(dir)) + }) + return h, wlog +} + func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _, closer := newTestHead(b, 10000, false) - defer closer() + h, _ := newTestHead(b, 10000, false) defer func() { testutil.Ok(b, h.Close()) }() @@ -210,8 +226,7 @@ func TestHead_ReadWAL(t *testing.T) { }, } - head, w, closer := newTestHead(t, 1000, compress) - defer closer() + head, w := newTestHead(t, 1000, compress) defer func() { testutil.Ok(t, head.Close()) }() @@ -247,8 +262,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w, closer := newTestHead(t, 1000, false) - defer closer() + head, w := newTestHead(t, 1000, false) testutil.Ok(t, head.Init(0)) @@ -305,8 +319,7 @@ func TestHead_WALMultiRef(t *testing.T) { } func TestHead_Truncate(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -457,8 +470,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { {Ref: 50, T: 90, V: 1}, }, } - head, w, closer := newTestHead(t, 1000, compress) - defer closer() + head, w := newTestHead(t, 1000, compress) defer func() { testutil.Ok(t, head.Close()) }() @@ -525,8 +537,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { for _, c := range cases { - head, w, closer := newTestHead(t, 1000, compress) - defer closer() + head, w := newTestHead(t, 1000, compress) app := head.Appender() for _, smpl := range smplsAll { @@ -603,8 +614,7 @@ func TestHeadDeleteSimple(t *testing.T) { } func TestDeleteUntilCurMax(t *testing.T) { - hb, _, closer := newTestHead(t, 1000000, false) - defer closer() + hb, _ := newTestHead(t, 1000000, false) defer func() { testutil.Ok(t, hb.Close()) }() @@ -657,8 +667,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { numSamples := 10000 // Enough samples to cause a checkpoint. - hb, w, closer := newTestHead(t, int64(numSamples)*10, false) - defer closer() + hb, w := newTestHead(t, int64(numSamples)*10, false) for i := 0; i < numSamples; i++ { app := hb.Appender() @@ -748,8 +757,7 @@ func TestDelete_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } - hb, _, closer := newTestHead(t, 100000, false) - defer closer() + hb, _ := newTestHead(t, 100000, false) defer func() { testutil.Ok(t, hb.Close()) }() @@ -989,8 +997,7 @@ func TestMemSeries_append(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1044,8 +1051,7 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1100,8 +1106,7 @@ func TestGCSeriesAccess(t *testing.T) { } func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1131,8 +1136,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { } func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1165,8 +1169,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { func TestHead_LogRollback(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { - h, w, closer := newTestHead(t, 1000, compress) - defer closer() + h, w := newTestHead(t, 1000, compress) defer func() { testutil.Ok(t, h.Close()) }() @@ -1364,8 +1367,7 @@ func TestHeadReadWriterRepair(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - h, wlog, closer := newTestHead(t, 1000, false) - defer closer() + h, wlog := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1395,8 +1397,7 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { } func TestAddDuplicateLabelName(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1472,7 +1473,7 @@ func TestMemSeriesIsolation(t *testing.T) { } // Test isolation without restart of Head. - hb, _, closer := newTestHead(t, 1000, false) + hb, _ := newTestHead(t, 1000, false) i := addSamples(hb) testIsolation(hb, i) @@ -1532,11 +1533,9 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Equals(t, 1002, lastValue(hb, 1003)) testutil.Ok(t, hb.Close()) - closer() // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. - hb, w, closer := newTestHead(t, 1000, false) - defer closer() + hb, w := newTestHead(t, 1000, false) i = addSamples(hb) testutil.Ok(t, hb.Close()) @@ -1582,8 +1581,7 @@ func TestMemSeriesIsolation(t *testing.T) { func TestIsolationRollback(t *testing.T) { // Rollback after a failed append and test if the low watermark has progressed anyway. - hb, _, closer := newTestHead(t, 1000, false) - defer closer() + hb, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, hb.Close()) }() @@ -1610,8 +1608,7 @@ func TestIsolationRollback(t *testing.T) { } func TestIsolationLowWatermarkMonotonous(t *testing.T) { - hb, _, closer := newTestHead(t, 1000, false) - defer closer() + hb, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, hb.Close()) }() @@ -1644,8 +1641,7 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) { } func TestIsolationAppendIDZeroIsNoop(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1666,8 +1662,7 @@ func TestHeadSeriesChunkRace(t *testing.T) { } func TestIsolationWithoutAdd(t *testing.T) { - hb, _, closer := newTestHead(t, 1000, false) - defer closer() + hb, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, hb.Close()) }() @@ -1765,8 +1760,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { } func testHeadSeriesChunkRace(t *testing.T) { - h, _, closer := newTestHead(t, 1000, false) - defer closer() + h, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, h.Close()) }() @@ -1800,25 +1794,8 @@ func testHeadSeriesChunkRace(t *testing.T) { wg.Wait() } -func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL, func()) { - dir, err := ioutil.TempDir("", "test") - testutil.Ok(t, err) - wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) - testutil.Ok(t, err) - - h, err := NewHead(nil, nil, wlog, chunkRange, dir, nil, DefaultStripeSize, nil) - testutil.Ok(t, err) - - testutil.Ok(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) - - return h, wlog, func() { - testutil.Ok(t, os.RemoveAll(dir)) - } -} - func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { - head, _, closer := newTestHead(t, 1000, false) - defer closer() + head, _ := newTestHead(t, 1000, false) defer func() { testutil.Ok(t, head.Close()) }()