From a61a31a5d7ba4e70f0eb55c13a33b16e6db2e073 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 30 Jan 2017 09:42:38 +0100 Subject: [PATCH] compaction: add fast-path for compacting mem blocks --- compact.go | 47 ++++++++++++++++++++++++++++++++++------------- db.go | 13 ++++++------- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/compact.go b/compact.go index b4aad2c7d..412a24788 100644 --- a/compact.go +++ b/compact.go @@ -50,7 +50,6 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { type compactorOptions struct { maxBlockRange uint64 - maxSize uint64 } func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { @@ -65,29 +64,51 @@ type compactionInfo struct { mint, maxt int64 } -// pick returns a range [i, j] in the blocks that are suitable to be compacted +const compactionBlocksLen = 4 + +// pick returns a range [i, j) in the blocks that are suitable to be compacted // into a single block at position i. func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { - - last := len(bs) - 1 if len(bs) == 0 { return 0, 0, false } - // Make sure we always compact the last block if unpersisted. - if bs[last].generation == 0 { - if len(bs) >= 3 && c.match(bs[last-2:last+1]) { - return last - 2, last, true + // First, we always compact pending in-memory blocks – oldest first. + for i, b := range bs { + if b.generation > 0 { + continue } - return last, last, true + // Directly compact into 2nd generation with previous generation 1 blocks. + if i+1 >= compactionBlocksLen { + match := true + for _, pb := range bs[i-compactionBlocksLen+1 : i] { + match = match && pb.generation == 1 + } + if match { + return i - compactionBlocksLen + 1, i + 1, true + } + } + // If we have enough generation 0 blocks to directly move to the + // 2nd generation, skip generation 1. + if len(bs)-i >= compactionBlocksLen { + // Guard against the newly compacted block becoming larger than + // the previous one. + if i == 0 || bs[i-1].generation >= 2 { + return i, i + compactionBlocksLen, true + } + } + + // No optimizations possible, naiively compact the new block. + return i, i + 1, true } - for i := len(bs); i-3 >= 0; i -= 3 { - tpl := bs[i-3 : i] - if c.match(tpl) { - return i - 3, i - 1, true + // Then we care about compacting multiple blocks, starting with the oldest. + for i := 0; i < len(bs)-compactionBlocksLen; i += compactionBlocksLen { + if c.match(bs[i : i+2]) { + return i, i + compactionBlocksLen, true } } + return 0, 0, false } diff --git a/db.go b/db.go index 19b7873e1..f65b673f2 100644 --- a/db.go +++ b/db.go @@ -140,7 +140,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) { } db.compactor = newCompactor(r, &compactorOptions{ maxBlockRange: opts.MaxBlockRange, - maxSize: 1 << 29, // 512MB }) if err := db.initBlocks(); err != nil { @@ -202,7 +201,7 @@ func (db *DB) run() { continue } db.logger.Log("msg", "picked", "i", i, "j", j) - for k := i; k <= j; k++ { + for k := i; k < j; k++ { db.logger.Log("k", k, "generation", infos[k].generation) } @@ -230,10 +229,10 @@ func (db *DB) getBlock(i int) Block { return db.heads[i-len(db.persisted)] } -// removeBlocks removes the blocks in range [i, j] from the list of persisted +// removeBlocks removes the blocks in range [i, j) from the list of persisted // and head blocks. The blocks are not closed and their files not deleted. func (db *DB) removeBlocks(i, j int) { - for k := i; k <= j; k++ { + for k := i; k < j; k++ { if i < len(db.persisted) { db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) } else { @@ -253,14 +252,14 @@ func (db *DB) blocks() (bs []Block) { return bs } -// compact block in range [i, j] into a temporary directory and atomically +// compact block in range [i, j) into a temporary directory and atomically // swap the blocks out on successful completion. func (db *DB) compact(i, j int) error { - if j < i { + if j <= i { return errors.New("invalid compaction block range") } var blocks []Block - for k := i; k <= j; k++ { + for k := i; k < j; k++ { blocks = append(blocks, db.getBlock(k)) } var (