From 2fe1e9fa93772fd831b1c040968bb748eac12795 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 19 Oct 2020 17:27:08 +0200 Subject: [PATCH] Create a checkpoint only at the end of Compact call (#8067) * Create a checkpoint only at the end of Compact call Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Fix Bartek's offline reviews Signed-off-by: Ganesh Vernekar * Introduce TruncateInMemory and TruncateWAL Signed-off-by: Ganesh Vernekar * Small enhancements and test fixing attempts Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar * Add TestOneCheckpointPerCompactCall Signed-off-by: Ganesh Vernekar * Don't truncate WAL on block compaction Signed-off-by: Ganesh Vernekar * Simplified the algo. Signed-off-by: Bartlomiej Plotka * Better protection around calling truncateWAL, truncate WAL on Head compaction error Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- tsdb/block.go | 2 +- tsdb/compact_test.go | 12 ++-- tsdb/db.go | 122 ++++++++++++++++++++++---------------- tsdb/db_test.go | 135 ++++++++++++++++++++++++++++++++++++++++--- tsdb/head.go | 47 +++++++++++---- tsdb/head_test.go | 6 +- 6 files changed, 247 insertions(+), 77 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index 0df30e846..c6de088d3 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -172,7 +172,7 @@ type BlockMetaCompaction struct { // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` // Indicates that during compaction it resulted in a block without any samples - // so it should be deleted on the next reload. + // so it should be deleted on the next reloadBlocks. Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 160c25fff..9ba0c6449 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1235,7 +1235,7 @@ func TestCancelCompactions(t *testing.T) { } } -// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction // deletes the resulting block to avoid creatings blocks with the same time range. func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { tests := map[string]func(*DB) int{ @@ -1265,7 +1265,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) - testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload") + testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reloadBlocks") return len(blocks) }, @@ -1281,7 +1281,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { expBlocks := bootStrap(db) - // Create a block that will trigger the reload to fail. + // Create a block that will trigger the reloadBlocks to fail. blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) @@ -1289,15 +1289,15 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block. testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reloadBlocks' count metrics mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch") // Do the compaction and check the metrics. - // Compaction should succeed, but the reload should fail and + // Compaction should succeed, but the reloadBlocks should fail and // the new block created from the compaction should be deleted. testutil.NotOk(t, db.Compact()) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch") diff --git a/tsdb/db.go b/tsdb/db.go index 9d8308f06..94cd401bb 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -216,7 +216,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { }) m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_failures_total", - Help: "Number of times the database failed to reload block data from disk.", + Help: "Number of times the database failed to reloadBlocks block data from disk.", }) m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_triggered_total", @@ -769,20 +769,26 @@ func (a dbAppender) Commit() error { } // Compact data if possible. After successful compaction blocks are reloaded -// which will also trigger blocks to be deleted that fall out of the retention -// window. -// If no blocks are compacted, the retention window state doesn't change. Thus, -// this is sufficient to reliably delete old data. -// Old blocks are only deleted on reload based on the new block's parent information. -// See DB.reload documentation for further information. -func (db *DB) Compact() (err error) { +// which will also delete the blocks that fall out of the retention window. +// Old blocks are only deleted on reloadBlocks based on the new block's parent information. +// See DB.reloadBlocks documentation for further information. +func (db *DB) Compact() (returnErr error) { db.cmtx.Lock() defer db.cmtx.Unlock() defer func() { - if err != nil { + if returnErr != nil { db.metrics.compactionsFailed.Inc() } }() + + lastBlockMaxt := int64(math.MinInt64) + defer func() { + var merr tsdb_errors.MultiError + merr.Add(returnErr) + merr.Add(errors.Wrap(db.head.truncateWAL(lastBlockMaxt), "WAL truncation in Compact defer")) + returnErr = merr.Err() + }() + // Check whether we have pending head blocks that are ready to be persisted. // They have the highest priority. for { @@ -804,55 +810,59 @@ func (db *DB) Compact() (err error) { // so in order to make sure that overlaps are evaluated // consistently, we explicitly remove the last value // from the block interval here. - head := NewRangeHead(db.head, mint, maxt-1) - if err := db.compactHead(head); err != nil { - return err + if err := db.compactHead(NewRangeHead(db.head, mint, maxt-1)); err != nil { + return errors.Wrap(err, "compact head") } + // Consider only successful compactions for WAL truncation. + lastBlockMaxt = maxt + } + + // Clear some disk space before compacting blocks, especially important + // when Head compaction happened over a long time range. + if err := db.head.truncateWAL(lastBlockMaxt); err != nil { + return errors.Wrap(err, "WAL truncation in Compact") } return db.compactBlocks() } -// CompactHead compacts the given the RangeHead. -func (db *DB) CompactHead(head *RangeHead) (err error) { +// CompactHead compacts the given RangeHead. +func (db *DB) CompactHead(head *RangeHead) error { db.cmtx.Lock() defer db.cmtx.Unlock() - return db.compactHead(head) + if err := db.compactHead(head); err != nil { + return errors.Wrap(err, "compact head") + } + + if err := db.head.truncateWAL(head.BlockMaxTime()); err != nil { + return errors.Wrap(err, "WAL truncation") + } + return nil } -// compactHead compacts the given the RangeHead. +// compactHead compacts the given RangeHead. // The compaction mutex should be held before calling this method. -func (db *DB) compactHead(head *RangeHead) (err error) { - // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). - // Because of this block intervals are always +1 than the total samples it includes. - maxt := head.MaxTime() + 1 - uid, err := db.compactor.Write(db.dir, head, head.MinTime(), maxt, nil) +func (db *DB) compactHead(head *RangeHead) error { + uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) if err != nil { return errors.Wrap(err, "persist head block") } runtime.GC() - - if err := db.reload(); err != nil { + if err := db.reloadBlocks(); err != nil { if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { var merr tsdb_errors.MultiError - merr.Add(errors.Wrap(err, "reload blocks")) - merr.Add(errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reload:%s", uid)) + merr.Add(errors.Wrap(err, "reloadBlocks blocks")) + merr.Add(errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reloadBlocks:%s", uid)) return merr.Err() } - return errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reloadBlocks blocks") } - if (uid == ulid.ULID{}) { - // Compaction resulted in an empty block. - // Head truncating during db.reload() depends on the persisted blocks and - // in this case no new block will be persisted so manually truncate the head. - if err = db.head.Truncate(maxt); err != nil { - return errors.Wrap(err, "head truncate failed (in compact)") - } + if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil { + return errors.Wrap(err, "head memory truncate") } runtime.GC() - return nil } @@ -881,11 +891,11 @@ func (db *DB) compactBlocks() (err error) { } runtime.GC() - if err := db.reload(); err != nil { + if err := db.reloadBlocks(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) + return errors.Wrapf(err, "delete compacted block after failed db reloadBlocks:%s", uid) } - return errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reloadBlocks blocks") } runtime.GC() } @@ -904,9 +914,23 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { return nil, false } -// reload blocks and trigger head truncation if new blocks appeared. +// reload reloads blocks and truncates the head and its WAL. +func (db *DB) reload() error { + if err := db.reloadBlocks(); err != nil { + return errors.Wrap(err, "reloadBlocks") + } + if len(db.blocks) == 0 { + return nil + } + if err := db.head.Truncate(db.blocks[len(db.blocks)-1].MaxTime()); err != nil { + return errors.Wrap(err, "head truncate") + } + return nil +} + +// reloadBlocks reloads blocks without touching head. // Blocks that are obsolete due to replacement or retention will be deleted. -func (db *DB) reload() (err error) { +func (db *DB) reloadBlocks() (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -989,7 +1013,7 @@ func (db *DB) reload() (err error) { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String()) + level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) } // Append blocks to old, deletable blocks, so we can close them. @@ -999,15 +1023,9 @@ func (db *DB) reload() (err error) { } } if err := db.deleteBlocks(deletable); err != nil { - return err + return errors.Wrapf(err, "delete %v blocks", len(deletable)) } - - // Garbage collect data in the head if the most recent persisted block - // covers data of its current time range. - if len(toLoad) == 0 { - return nil - } - return errors.Wrap(db.head.Truncate(toLoad[len(toLoad)-1].Meta().MaxTime), "head truncate failed") + return nil } func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { @@ -1020,7 +1038,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po for _, bDir := range bDirs { meta, _, err := readMetaFile(bDir) if err != nil { - level.Error(l).Log("msg", "Failed to read meta.json for a block during reload. Skipping", "dir", bDir, "err", err) + level.Error(l).Log("msg", "Failed to read meta.json for a block during reloadBlocks. Skipping", "dir", bDir, "err", err) continue } @@ -1502,7 +1520,11 @@ func (db *DB) CleanTombstones() (err error) { newUIDs = append(newUIDs, *uid) } } - return errors.Wrap(db.reload(), "reload blocks") + + if err := db.reloadBlocks(); err != nil { + return errors.Wrap(err, "reload blocks") + } + return nil } func isBlockDir(fi os.FileInfo) bool { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e1969c512..023db63af 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -151,7 +151,7 @@ func TestDB_reloadOrder(t *testing.T) { createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) } - testutil.Ok(t, db.reload()) + testutil.Ok(t, db.reloadBlocks()) blocks := db.Blocks() testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) @@ -1195,11 +1195,11 @@ func TestTimeRetention(t *testing.T) { createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime)) } - testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new blocks. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. db.opts.RetentionDuration = blocks[2].MaxTime - blocks[1].MinTime - testutil.Ok(t, db.reload()) + testutil.Ok(t, db.reloadBlocks()) expBlocks := blocks[1:] actBlocks := db.Blocks() @@ -1249,7 +1249,7 @@ func TestSizeRetention(t *testing.T) { testutil.Ok(t, headApp.Commit()) // Test that registered size matches the actual disk size. - testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new db size. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. walSize, err := db.Head().wal.Size() @@ -1277,8 +1277,8 @@ func TestSizeRetention(t *testing.T) { // Check total size, total count and check that the oldest block was deleted. firstBlockSize := db.Blocks()[0].Size() sizeLimit := actSize - firstBlockSize - db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. - testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new db size. expBlocks := blocks[1:] actBlocks := db.Blocks() @@ -1811,7 +1811,7 @@ func TestNoEmptyBlocks(t *testing.T) { } oldBlocks := db.Blocks() - testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new blocks. testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.Compact()) @@ -2838,3 +2838,124 @@ func TestOpen_VariousBlockStates(t *testing.T) { } testutil.Equals(t, len(expectedIgnoredDirs), ignored) } + +func TestOneCheckpointPerCompactCall(t *testing.T) { + blockRange := int64(1000) + tsdbCfg := &Options{ + RetentionDuration: blockRange * 1000, + NoLockfile: true, + MinBlockDuration: blockRange, + MaxBlockDuration: blockRange, + } + + tmpDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + testutil.Ok(t, err) + t.Cleanup(func() { + testutil.Ok(t, db.Close()) + }) + db.DisableCompactions() + + // Case 1: Lot's of uncompacted data in Head. + + lbls := labels.Labels{labels.Label{Name: "foo_d", Value: "choco_bar"}} + // Append samples spanning 59 block ranges. + app := db.Appender(context.Background()) + for i := int64(0); i < 60; i++ { + _, err := app.Add(lbls, blockRange*i, rand.Float64()) + testutil.Ok(t, err) + _, err = app.Add(lbls, (blockRange*i)+blockRange/2, rand.Float64()) + testutil.Ok(t, err) + // Rotate the WAL file so that there is >3 files for checkpoint to happen. + testutil.Ok(t, db.head.wal.NextSegment()) + } + testutil.Ok(t, app.Commit()) + + // Check the existing WAL files. + first, last, err := wal.Segments(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 0, first) + testutil.Equals(t, 60, last) + + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) + testutil.Ok(t, db.Compact()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) + + // As the data spans for 59 blocks, 58 go to disk and 1 remains in Head. + testutil.Equals(t, 58, len(db.Blocks())) + // Though WAL was truncated only once, head should be truncated after each compaction. + testutil.Equals(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal)) + + // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). + first, last, err = wal.Segments(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 40, first) + testutil.Equals(t, 61, last) + + // The first checkpoint would be for first 2/3rd of WAL, hence till 39. + // That should be the last checkpoint. + _, cno, err := wal.LastCheckpoint(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 39, cno) + + // Case 2: Old blocks on disk. + // The above blocks will act as old blocks. + + // Creating a block to cover the data in the Head so that + // Head will skip the data during replay and start fresh. + blocks := db.Blocks() + newBlockMint := blocks[len(blocks)-1].Meta().MaxTime + newBlockMaxt := db.Head().MaxTime() + 1 + testutil.Ok(t, db.Close()) + + createBlock(t, db.dir, genSeries(1, 1, newBlockMint, newBlockMaxt)) + + db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + testutil.Ok(t, err) + db.DisableCompactions() + + // 1 block more. + testutil.Equals(t, 59, len(db.Blocks())) + // No series in Head because of this new block. + testutil.Equals(t, 0, int(db.head.NumSeries())) + + // Adding sample way into the future. + app = db.Appender(context.Background()) + _, err = app.Add(lbls, blockRange*120, rand.Float64()) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + // The mint of head is the last block maxt, that means the gap between mint and maxt + // of Head is too large. This will trigger many compactions. + testutil.Equals(t, newBlockMaxt, db.head.MinTime()) + + // Another WAL file was rotated. + first, last, err = wal.Segments(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 40, first) + testutil.Equals(t, 62, last) + + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) + testutil.Ok(t, db.Compact()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal)) + + // No new blocks should be created as there was not data in between the new samples and the blocks. + testutil.Equals(t, 59, len(db.Blocks())) + + // The compaction should have only truncated first 2/3 of WAL (while also rotating the files). + first, last, err = wal.Segments(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 55, first) + testutil.Equals(t, 63, last) + + // The first checkpoint would be for first 2/3rd of WAL, hence till 54. + // That should be the last checkpoint. + _, cno, err = wal.LastCheckpoint(db.head.wal.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, 54, cno) +} diff --git a/tsdb/head.go b/tsdb/head.go index 3a577763d..56b597f3d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -52,11 +52,12 @@ var ( // Head handles reads and writes of time series data within a time window. type Head struct { - chunkRange atomic.Int64 - numSeries atomic.Uint64 - minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. - minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. - lastSeriesID atomic.Uint64 + chunkRange atomic.Int64 + numSeries atomic.Uint64 + minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. + minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. + lastWALTruncationTime atomic.Int64 + lastSeriesID atomic.Uint64 metrics *headMetrics wal *wal.WAL @@ -323,6 +324,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int h.chunkRange.Store(chunkRange) h.minTime.Store(math.MaxInt64) h.maxTime.Store(math.MinInt64) + h.lastWALTruncationTime.Store(math.MinInt64) h.metrics = newHeadMetrics(h, r) if pool == nil { @@ -776,8 +778,20 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun return mmappedChunks } -// Truncate removes old data before mint from the head. +// Truncate removes old data before mint from the head and WAL. func (h *Head) Truncate(mint int64) (err error) { + initialize := h.MinTime() == math.MaxInt64 + if err := h.truncateMemory(mint); err != nil { + return err + } + if initialize { + return nil + } + return h.truncateWAL(mint) +} + +// truncateMemory removes old data before mint from the head. +func (h *Head) truncateMemory(mint int64) (err error) { defer func() { if err != nil { h.metrics.headTruncateFail.Inc() @@ -813,11 +827,16 @@ func (h *Head) Truncate(mint int64) (err error) { if err := h.chunkDiskMapper.Truncate(mint); err != nil { return errors.Wrap(err, "truncate chunks.HeadReadWriter") } + return nil +} - if h.wal == nil { +// truncateWAL removes old data before mint from the WAL. +func (h *Head) truncateWAL(mint int64) error { + if h.wal == nil || mint <= h.lastWALTruncationTime.Load() { return nil } - start = time.Now() + start := time.Now() + h.lastWALTruncationTime.Store(mint) first, last, err := wal.Segments(h.wal.Dir()) if err != nil { @@ -825,8 +844,7 @@ func (h *Head) Truncate(mint int64) (err error) { } // Start a new segment, so low ingestion volume TSDB don't have more WAL than // needed. - err = h.wal.NextSegment() - if err != nil { + if err := h.wal.NextSegment(); err != nil { return errors.Wrap(err, "next segment") } last-- // Never consider last segment for checkpoint. @@ -950,10 +968,19 @@ func (h *RangeHead) MinTime() int64 { return h.mint } +// MaxTime returns the max time of actual data fetch-able from the head. +// This controls the chunks time range which is closed [b.MinTime, b.MaxTime]. func (h *RangeHead) MaxTime() int64 { return h.maxt } +// BlockMaxTime returns the max time of the potential block created from this head. +// It's different to MaxTime as we need to add +1 millisecond to block maxt because block +// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. +func (h *RangeHead) BlockMaxTime() int64 { + return h.MaxTime() + 1 +} + func (h *RangeHead) NumSeries() uint64 { return h.head.NumSeries() } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 75e20add9..62ab33baa 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -579,14 +579,14 @@ func TestHeadDeleteSimple(t *testing.T) { } testutil.Ok(t, app.Commit()) - // Compare the samples for both heads - before and after the reload. - reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. + // Compare the samples for both heads - before and after the reloadBlocks. + reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. testutil.Ok(t, err) reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil) testutil.Ok(t, err) testutil.Ok(t, reloadedHead.Init(0)) - // Compare the query results for both heads - before and after the reload. + // Compare the query results for both heads - before and after the reloadBlocks. Outer: for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())