Create a checkpoint only at the end of Compact call (#8067)

* Create a checkpoint only at the end of Compact call

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix Bartek's offline reviews

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Introduce TruncateInMemory and TruncateWAL

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Small enhancements and test fixing attempts

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix tests

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add TestOneCheckpointPerCompactCall

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Don't truncate WAL on block compaction

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Simplified the algo.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Better protection around calling truncateWAL, truncate WAL on Head compaction error

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

Co-authored-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/8083/head
Bartlomiej Plotka 4 years ago committed by GitHub
parent d9cd913219
commit 2fe1e9fa93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -172,7 +172,7 @@ type BlockMetaCompaction struct {
// ULIDs of all source head blocks that went into the block. // ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"` Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples // Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload. // so it should be deleted on the next reloadBlocks.
Deletable bool `json:"deletable,omitempty"` Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create // Short descriptions of the direct blocks that were used to create
// this block. // this block.

@ -1235,7 +1235,7 @@ func TestCancelCompactions(t *testing.T) {
} }
} }
// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction // TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction
// deletes the resulting block to avoid creatings blocks with the same time range. // deletes the resulting block to avoid creatings blocks with the same time range.
func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
tests := map[string]func(*DB) int{ tests := map[string]func(*DB) int{
@ -1265,7 +1265,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reload())
testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload") testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reloadBlocks")
return len(blocks) return len(blocks)
}, },
@ -1281,7 +1281,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
expBlocks := bootStrap(db) expBlocks := bootStrap(db)
// Create a block that will trigger the reload to fail. // Create a block that will trigger the reloadBlocks to fail.
blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300))
lastBlockIndex := path.Join(blockPath, indexFilename) lastBlockIndex := path.Join(blockPath, indexFilename)
actBlocks, err := blockDirs(db.Dir()) actBlocks, err := blockDirs(db.Dir())
@ -1289,15 +1289,15 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block. testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block.
testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reloadBlocks' count metrics mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch")
// Do the compaction and check the metrics. // Do the compaction and check the metrics.
// Compaction should succeed, but the reload should fail and // Compaction should succeed, but the reloadBlocks should fail and
// the new block created from the compaction should be deleted. // the new block created from the compaction should be deleted.
testutil.NotOk(t, db.Compact()) testutil.NotOk(t, db.Compact())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch")
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch")

@ -216,7 +216,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
}) })
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_reloads_failures_total", Name: "prometheus_tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload block data from disk.", Help: "Number of times the database failed to reloadBlocks block data from disk.",
}) })
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{ m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_triggered_total", Name: "prometheus_tsdb_compactions_triggered_total",
@ -769,20 +769,26 @@ func (a dbAppender) Commit() error {
} }
// Compact data if possible. After successful compaction blocks are reloaded // Compact data if possible. After successful compaction blocks are reloaded
// which will also trigger blocks to be deleted that fall out of the retention // which will also delete the blocks that fall out of the retention window.
// window. // Old blocks are only deleted on reloadBlocks based on the new block's parent information.
// If no blocks are compacted, the retention window state doesn't change. Thus, // See DB.reloadBlocks documentation for further information.
// this is sufficient to reliably delete old data. func (db *DB) Compact() (returnErr error) {
// Old blocks are only deleted on reload based on the new block's parent information.
// See DB.reload documentation for further information.
func (db *DB) Compact() (err error) {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
defer func() { defer func() {
if err != nil { if returnErr != nil {
db.metrics.compactionsFailed.Inc() db.metrics.compactionsFailed.Inc()
} }
}() }()
lastBlockMaxt := int64(math.MinInt64)
defer func() {
var merr tsdb_errors.MultiError
merr.Add(returnErr)
merr.Add(errors.Wrap(db.head.truncateWAL(lastBlockMaxt), "WAL truncation in Compact defer"))
returnErr = merr.Err()
}()
// Check whether we have pending head blocks that are ready to be persisted. // Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority. // They have the highest priority.
for { for {
@ -804,55 +810,59 @@ func (db *DB) Compact() (err error) {
// so in order to make sure that overlaps are evaluated // so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value // consistently, we explicitly remove the last value
// from the block interval here. // from the block interval here.
head := NewRangeHead(db.head, mint, maxt-1) if err := db.compactHead(NewRangeHead(db.head, mint, maxt-1)); err != nil {
if err := db.compactHead(head); err != nil { return errors.Wrap(err, "compact head")
return err
} }
// Consider only successful compactions for WAL truncation.
lastBlockMaxt = maxt
}
// Clear some disk space before compacting blocks, especially important
// when Head compaction happened over a long time range.
if err := db.head.truncateWAL(lastBlockMaxt); err != nil {
return errors.Wrap(err, "WAL truncation in Compact")
} }
return db.compactBlocks() return db.compactBlocks()
} }
// CompactHead compacts the given the RangeHead. // CompactHead compacts the given RangeHead.
func (db *DB) CompactHead(head *RangeHead) (err error) { func (db *DB) CompactHead(head *RangeHead) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
return db.compactHead(head) if err := db.compactHead(head); err != nil {
return errors.Wrap(err, "compact head")
}
if err := db.head.truncateWAL(head.BlockMaxTime()); err != nil {
return errors.Wrap(err, "WAL truncation")
}
return nil
} }
// compactHead compacts the given the RangeHead. // compactHead compacts the given RangeHead.
// The compaction mutex should be held before calling this method. // The compaction mutex should be held before calling this method.
func (db *DB) compactHead(head *RangeHead) (err error) { func (db *DB) compactHead(head *RangeHead) error {
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
// Because of this block intervals are always +1 than the total samples it includes.
maxt := head.MaxTime() + 1
uid, err := db.compactor.Write(db.dir, head, head.MinTime(), maxt, nil)
if err != nil { if err != nil {
return errors.Wrap(err, "persist head block") return errors.Wrap(err, "persist head block")
} }
runtime.GC() runtime.GC()
if err := db.reloadBlocks(); err != nil {
if err := db.reload(); err != nil {
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
var merr tsdb_errors.MultiError var merr tsdb_errors.MultiError
merr.Add(errors.Wrap(err, "reload blocks")) merr.Add(errors.Wrap(err, "reloadBlocks blocks"))
merr.Add(errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reload:%s", uid)) merr.Add(errors.Wrapf(errRemoveAll, "delete persisted head block after failed db reloadBlocks:%s", uid))
return merr.Err() return merr.Err()
} }
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reloadBlocks blocks")
} }
if (uid == ulid.ULID{}) { if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
// Compaction resulted in an empty block. return errors.Wrap(err, "head memory truncate")
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
} }
runtime.GC() runtime.GC()
return nil return nil
} }
@ -881,11 +891,11 @@ func (db *DB) compactBlocks() (err error) {
} }
runtime.GC() runtime.GC()
if err := db.reload(); err != nil { if err := db.reloadBlocks(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil {
return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) return errors.Wrapf(err, "delete compacted block after failed db reloadBlocks:%s", uid)
} }
return errors.Wrap(err, "reload blocks") return errors.Wrap(err, "reloadBlocks blocks")
} }
runtime.GC() runtime.GC()
} }
@ -904,9 +914,23 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
return nil, false return nil, false
} }
// reload blocks and trigger head truncation if new blocks appeared. // reload reloads blocks and truncates the head and its WAL.
func (db *DB) reload() error {
if err := db.reloadBlocks(); err != nil {
return errors.Wrap(err, "reloadBlocks")
}
if len(db.blocks) == 0 {
return nil
}
if err := db.head.Truncate(db.blocks[len(db.blocks)-1].MaxTime()); err != nil {
return errors.Wrap(err, "head truncate")
}
return nil
}
// reloadBlocks reloads blocks without touching head.
// Blocks that are obsolete due to replacement or retention will be deleted. // Blocks that are obsolete due to replacement or retention will be deleted.
func (db *DB) reload() (err error) { func (db *DB) reloadBlocks() (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
db.metrics.reloadsFailed.Inc() db.metrics.reloadsFailed.Inc()
@ -989,7 +1013,7 @@ func (db *DB) reload() (err error) {
blockMetas = append(blockMetas, b.Meta()) blockMetas = append(blockMetas, b.Meta())
} }
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String()) level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String())
} }
// Append blocks to old, deletable blocks, so we can close them. // Append blocks to old, deletable blocks, so we can close them.
@ -999,15 +1023,9 @@ func (db *DB) reload() (err error) {
} }
} }
if err := db.deleteBlocks(deletable); err != nil { if err := db.deleteBlocks(deletable); err != nil {
return err return errors.Wrapf(err, "delete %v blocks", len(deletable))
} }
return nil
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(toLoad) == 0 {
return nil
}
return errors.Wrap(db.head.Truncate(toLoad[len(toLoad)-1].Meta().MaxTime), "head truncate failed")
} }
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
@ -1020,7 +1038,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
for _, bDir := range bDirs { for _, bDir := range bDirs {
meta, _, err := readMetaFile(bDir) meta, _, err := readMetaFile(bDir)
if err != nil { if err != nil {
level.Error(l).Log("msg", "Failed to read meta.json for a block during reload. Skipping", "dir", bDir, "err", err) level.Error(l).Log("msg", "Failed to read meta.json for a block during reloadBlocks. Skipping", "dir", bDir, "err", err)
continue continue
} }
@ -1502,7 +1520,11 @@ func (db *DB) CleanTombstones() (err error) {
newUIDs = append(newUIDs, *uid) newUIDs = append(newUIDs, *uid)
} }
} }
return errors.Wrap(db.reload(), "reload blocks")
if err := db.reloadBlocks(); err != nil {
return errors.Wrap(err, "reload blocks")
}
return nil
} }
func isBlockDir(fi os.FileInfo) bool { func isBlockDir(fi os.FileInfo) bool {

@ -151,7 +151,7 @@ func TestDB_reloadOrder(t *testing.T) {
createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reloadBlocks())
blocks := db.Blocks() blocks := db.Blocks()
testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
@ -1195,11 +1195,11 @@ func TestTimeRetention(t *testing.T) {
createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime)) createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime))
} }
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
db.opts.RetentionDuration = blocks[2].MaxTime - blocks[1].MinTime db.opts.RetentionDuration = blocks[2].MaxTime - blocks[1].MinTime
testutil.Ok(t, db.reload()) testutil.Ok(t, db.reloadBlocks())
expBlocks := blocks[1:] expBlocks := blocks[1:]
actBlocks := db.Blocks() actBlocks := db.Blocks()
@ -1249,7 +1249,7 @@ func TestSizeRetention(t *testing.T) {
testutil.Ok(t, headApp.Commit()) testutil.Ok(t, headApp.Commit())
// Test that registered size matches the actual disk size. // Test that registered size matches the actual disk size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size. testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
walSize, err := db.Head().wal.Size() walSize, err := db.Head().wal.Size()
@ -1277,8 +1277,8 @@ func TestSizeRetention(t *testing.T) {
// Check total size, total count and check that the oldest block was deleted. // Check total size, total count and check that the oldest block was deleted.
firstBlockSize := db.Blocks()[0].Size() firstBlockSize := db.Blocks()[0].Size()
sizeLimit := actSize - firstBlockSize sizeLimit := actSize - firstBlockSize
db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
testutil.Ok(t, db.reload()) // Reload the db to register the new db size. testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new db size.
expBlocks := blocks[1:] expBlocks := blocks[1:]
actBlocks := db.Blocks() actBlocks := db.Blocks()
@ -1811,7 +1811,7 @@ func TestNoEmptyBlocks(t *testing.T) {
} }
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. testutil.Ok(t, db.reloadBlocks()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.Compact()) testutil.Ok(t, db.Compact())
@ -2838,3 +2838,124 @@ func TestOpen_VariousBlockStates(t *testing.T) {
} }
testutil.Equals(t, len(expectedIgnoredDirs), ignored) testutil.Equals(t, len(expectedIgnoredDirs), ignored)
} }
func TestOneCheckpointPerCompactCall(t *testing.T) {
blockRange := int64(1000)
tsdbCfg := &Options{
RetentionDuration: blockRange * 1000,
NoLockfile: true,
MinBlockDuration: blockRange,
MaxBlockDuration: blockRange,
}
tmpDir, err := ioutil.TempDir("", "test")
testutil.Ok(t, err)
t.Cleanup(func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
})
db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
testutil.Ok(t, err)
t.Cleanup(func() {
testutil.Ok(t, db.Close())
})
db.DisableCompactions()
// Case 1: Lot's of uncompacted data in Head.
lbls := labels.Labels{labels.Label{Name: "foo_d", Value: "choco_bar"}}
// Append samples spanning 59 block ranges.
app := db.Appender(context.Background())
for i := int64(0); i < 60; i++ {
_, err := app.Add(lbls, blockRange*i, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbls, (blockRange*i)+blockRange/2, rand.Float64())
testutil.Ok(t, err)
// Rotate the WAL file so that there is >3 files for checkpoint to happen.
testutil.Ok(t, db.head.wal.NextSegment())
}
testutil.Ok(t, app.Commit())
// Check the existing WAL files.
first, last, err := wal.Segments(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 0, first)
testutil.Equals(t, 60, last)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
testutil.Ok(t, db.Compact())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
// As the data spans for 59 blocks, 58 go to disk and 1 remains in Head.
testutil.Equals(t, 58, len(db.Blocks()))
// Though WAL was truncated only once, head should be truncated after each compaction.
testutil.Equals(t, 58.0, prom_testutil.ToFloat64(db.head.metrics.headTruncateTotal))
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
first, last, err = wal.Segments(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 40, first)
testutil.Equals(t, 61, last)
// The first checkpoint would be for first 2/3rd of WAL, hence till 39.
// That should be the last checkpoint.
_, cno, err := wal.LastCheckpoint(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 39, cno)
// Case 2: Old blocks on disk.
// The above blocks will act as old blocks.
// Creating a block to cover the data in the Head so that
// Head will skip the data during replay and start fresh.
blocks := db.Blocks()
newBlockMint := blocks[len(blocks)-1].Meta().MaxTime
newBlockMaxt := db.Head().MaxTime() + 1
testutil.Ok(t, db.Close())
createBlock(t, db.dir, genSeries(1, 1, newBlockMint, newBlockMaxt))
db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg)
testutil.Ok(t, err)
db.DisableCompactions()
// 1 block more.
testutil.Equals(t, 59, len(db.Blocks()))
// No series in Head because of this new block.
testutil.Equals(t, 0, int(db.head.NumSeries()))
// Adding sample way into the future.
app = db.Appender(context.Background())
_, err = app.Add(lbls, blockRange*120, rand.Float64())
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
// The mint of head is the last block maxt, that means the gap between mint and maxt
// of Head is too large. This will trigger many compactions.
testutil.Equals(t, newBlockMaxt, db.head.MinTime())
// Another WAL file was rotated.
first, last, err = wal.Segments(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 40, first)
testutil.Equals(t, 62, last)
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
testutil.Ok(t, db.Compact())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.checkpointCreationTotal))
// No new blocks should be created as there was not data in between the new samples and the blocks.
testutil.Equals(t, 59, len(db.Blocks()))
// The compaction should have only truncated first 2/3 of WAL (while also rotating the files).
first, last, err = wal.Segments(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 55, first)
testutil.Equals(t, 63, last)
// The first checkpoint would be for first 2/3rd of WAL, hence till 54.
// That should be the last checkpoint.
_, cno, err = wal.LastCheckpoint(db.head.wal.Dir())
testutil.Ok(t, err)
testutil.Equals(t, 54, cno)
}

@ -52,11 +52,12 @@ var (
// Head handles reads and writes of time series data within a time window. // Head handles reads and writes of time series data within a time window.
type Head struct { type Head struct {
chunkRange atomic.Int64 chunkRange atomic.Int64
numSeries atomic.Uint64 numSeries atomic.Uint64
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head.
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
lastSeriesID atomic.Uint64 lastWALTruncationTime atomic.Int64
lastSeriesID atomic.Uint64
metrics *headMetrics metrics *headMetrics
wal *wal.WAL wal *wal.WAL
@ -323,6 +324,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
h.chunkRange.Store(chunkRange) h.chunkRange.Store(chunkRange)
h.minTime.Store(math.MaxInt64) h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64) h.maxTime.Store(math.MinInt64)
h.lastWALTruncationTime.Store(math.MinInt64)
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
if pool == nil { if pool == nil {
@ -776,8 +778,20 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun
return mmappedChunks return mmappedChunks
} }
// Truncate removes old data before mint from the head. // Truncate removes old data before mint from the head and WAL.
func (h *Head) Truncate(mint int64) (err error) { func (h *Head) Truncate(mint int64) (err error) {
initialize := h.MinTime() == math.MaxInt64
if err := h.truncateMemory(mint); err != nil {
return err
}
if initialize {
return nil
}
return h.truncateWAL(mint)
}
// truncateMemory removes old data before mint from the head.
func (h *Head) truncateMemory(mint int64) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
h.metrics.headTruncateFail.Inc() h.metrics.headTruncateFail.Inc()
@ -813,11 +827,16 @@ func (h *Head) Truncate(mint int64) (err error) {
if err := h.chunkDiskMapper.Truncate(mint); err != nil { if err := h.chunkDiskMapper.Truncate(mint); err != nil {
return errors.Wrap(err, "truncate chunks.HeadReadWriter") return errors.Wrap(err, "truncate chunks.HeadReadWriter")
} }
return nil
}
if h.wal == nil { // truncateWAL removes old data before mint from the WAL.
func (h *Head) truncateWAL(mint int64) error {
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
return nil return nil
} }
start = time.Now() start := time.Now()
h.lastWALTruncationTime.Store(mint)
first, last, err := wal.Segments(h.wal.Dir()) first, last, err := wal.Segments(h.wal.Dir())
if err != nil { if err != nil {
@ -825,8 +844,7 @@ func (h *Head) Truncate(mint int64) (err error) {
} }
// Start a new segment, so low ingestion volume TSDB don't have more WAL than // Start a new segment, so low ingestion volume TSDB don't have more WAL than
// needed. // needed.
err = h.wal.NextSegment() if err := h.wal.NextSegment(); err != nil {
if err != nil {
return errors.Wrap(err, "next segment") return errors.Wrap(err, "next segment")
} }
last-- // Never consider last segment for checkpoint. last-- // Never consider last segment for checkpoint.
@ -950,10 +968,19 @@ func (h *RangeHead) MinTime() int64 {
return h.mint return h.mint
} }
// MaxTime returns the max time of actual data fetch-able from the head.
// This controls the chunks time range which is closed [b.MinTime, b.MaxTime].
func (h *RangeHead) MaxTime() int64 { func (h *RangeHead) MaxTime() int64 {
return h.maxt return h.maxt
} }
// BlockMaxTime returns the max time of the potential block created from this head.
// It's different to MaxTime as we need to add +1 millisecond to block maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
func (h *RangeHead) BlockMaxTime() int64 {
return h.MaxTime() + 1
}
func (h *RangeHead) NumSeries() uint64 { func (h *RangeHead) NumSeries() uint64 {
return h.head.NumSeries() return h.head.NumSeries()
} }

@ -579,14 +579,14 @@ func TestHeadDeleteSimple(t *testing.T) {
} }
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
// Compare the samples for both heads - before and after the reload. // Compare the samples for both heads - before and after the reloadBlocks.
reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
testutil.Ok(t, err) testutil.Ok(t, err)
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil) reloadedHead, err := NewHead(nil, nil, reloadedW, 1000, reloadedW.Dir(), nil, DefaultStripeSize, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0)) testutil.Ok(t, reloadedHead.Init(0))
// Compare the query results for both heads - before and after the reload. // Compare the query results for both heads - before and after the reloadBlocks.
Outer: Outer:
for _, h := range []*Head{head, reloadedHead} { for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())

Loading…
Cancel
Save