diff --git a/compact.go b/compact.go index b58acba43..af6ab72df 100644 --- a/compact.go +++ b/compact.go @@ -13,7 +13,16 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +type Compactor interface { + Plan() ([][]string, error) + + Write(dir string, bs ...Block) error + + Compact(dirs ...string) error +} + type compactor struct { + dir string metrics *compactorMetrics opts *compactorOptions } @@ -54,8 +63,9 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { +func newCompactor(dir string, r prometheus.Registerer, opts *compactorOptions) *compactor { return &compactor{ + dir: dir, opts: opts, metrics: newCompactorMetrics(r), } @@ -69,61 +79,55 @@ type compactionInfo struct { const compactionBlocksLen = 3 -// pick returns a range [i, j) in the blocks that are suitable to be compacted -// into a single block at position i. -func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) { - if len(bs) == 0 { - return 0, 0, false +func (c *compactor) Plan() ([][]string, error) { + dirs, err := blockDirs(c.dir) + if err != nil { + return nil, err } - // First, we always compact pending in-memory blocks – oldest first. - for i, b := range bs { - if b.generation > 0 { - continue - } - // Directly compact into 2nd generation with previous generation 1 blocks. - if i+1 >= compactionBlocksLen { - match := true - for _, pb := range bs[i-compactionBlocksLen+1 : i] { - match = match && pb.generation == 1 - } - if match { - return i - compactionBlocksLen + 1, i + 1, true - } + var bs []*BlockMeta + + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return nil, err } - // If we have enough generation 0 blocks to directly move to the - // 2nd generation, skip generation 1. - if len(bs)-i >= compactionBlocksLen { - // Guard against the newly compacted block becoming larger than - // the previous one. - if i == 0 || bs[i-1].generation >= 2 { - return i, i + compactionBlocksLen, true - } + if meta.Compaction.Generation > 0 { + bs = append(bs, meta) } + } - // No optimizations possible, naiively compact the new block. - return i, i + 1, true + if len(bs) == 0 { + return nil, nil + } + + sliceDirs := func(i, j int) [][]string { + var res []string + for k := i; k < j; k++ { + res = append(res, dirs[k]) + } + return [][]string{res} } // Then we care about compacting multiple blocks, starting with the oldest. for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen { if c.match(bs[i : i+3]) { - return i, i + compactionBlocksLen, true + return sliceDirs(i, i+compactionBlocksLen), nil } } - return 0, 0, false + return nil, nil } -func (c *compactor) match(bs []compactionInfo) bool { - g := bs[0].generation +func (c *compactor) match(bs []*BlockMeta) bool { + g := bs[0].Compaction.Generation for _, b := range bs { - if b.generation != g { + if b.Compaction.Generation != g { return false } } - return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange + return uint64(bs[len(bs)-1].MaxTime-bs[0].MinTime) <= c.opts.maxBlockRange } var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -136,11 +140,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.ULID = ulid.MustNew(ulid.Now(), entropy) - g := m0.Compaction.Generation - if g == 0 && len(blocks) > 1 { - g++ - } - res.Compaction.Generation = g + 1 + res.Compaction.Generation = m0.Compaction.Generation + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples @@ -148,14 +148,26 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { return res } -func (c *compactor) compact(dir string, blocks ...Block) (err error) { - start := time.Now() - defer func() { +func (c *compactor) Compact(dirs ...string) (err error) { + var blocks []Block + for _, d := range dirs { + b, err := newPersistedBlock(d) + if err != nil { + return err + } + blocks = append(blocks, b) + } + + return c.Write(dirs[0], blocks...) +} + +func (c *compactor) Write(dir string, blocks ...Block) (err error) { + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() } - c.metrics.duration.Observe(time.Since(start).Seconds()) - }() + c.metrics.duration.Observe(time.Since(t).Seconds()) + }(time.Now()) if err = os.RemoveAll(dir); err != nil { return err diff --git a/db.go b/db.go index f967b044c..463433bc8 100644 --- a/db.go +++ b/db.go @@ -96,9 +96,10 @@ type DB struct { mtx sync.RWMutex persisted []*persistedBlock heads []*headBlock + seqBlocks map[int]Block headGen uint8 - compactor *compactor + compactor Compactor compactc chan struct{} donec chan struct{} @@ -171,14 +172,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } - db.compactor = newCompactor(r, &compactorOptions{ + db.compactor = newCompactor(dir, r, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) - if err := db.initBlocks(); err != nil { + if err := db.reloadBlocks(); err != nil { return nil, err } - go db.run() return db, nil @@ -200,35 +200,8 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - var seqs []int - var infos []compactionInfo - for _, b := range db.compactable() { - m := b.Meta() - - infos = append(infos, compactionInfo{ - generation: m.Compaction.Generation, - mint: m.MinTime, - maxt: m.MaxTime, - seq: m.Sequence, - }) - seqs = append(seqs, m.Sequence) - } - - i, j, ok := db.compactor.pick(infos) - if !ok { - continue - } - db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j])) - - if err := db.compact(i, j); err != nil { + if err := db.compact(); err != nil { db.logger.Log("msg", "compaction failed", "err", err) - continue - } - db.logger.Log("msg", "compaction completed") - // Trigger another compaction in case there's more work to do. - select { - case db.compactc <- struct{}{}: - default: } case <-db.stopc: @@ -237,150 +210,165 @@ func (db *DB) run() { } } -func (db *DB) getBlock(i int) Block { - if i < len(db.persisted) { - return db.persisted[i] - } - return db.heads[i-len(db.persisted)] -} - -// removeBlocks removes the blocks in range [i, j) from the list of persisted -// and head blocks. The blocks are not closed and their files not deleted. -func (db *DB) removeBlocks(i, j int) { - for k := i; k < j; k++ { - if i < len(db.persisted) { - db.persisted = append(db.persisted[:i], db.persisted[i+1:]...) - } else { - l := i - len(db.persisted) - db.heads = append(db.heads[:l], db.heads[l+1:]...) - } - } -} - -func (db *DB) blocks() (bs []Block) { - for _, b := range db.persisted { - bs = append(bs, b) - } - for _, b := range db.heads { - bs = append(bs, b) - } - return bs -} - -// compact block in range [i, j) into a temporary directory and atomically -// swap the blocks out on successful completion. -func (db *DB) compact(i, j int) error { - if j <= i { - return errors.New("invalid compaction block range") - } - var blocks []Block - for k := i; k < j; k++ { - blocks = append(blocks, db.getBlock(k)) - } - var ( - dir = blocks[0].Dir() - tmpdir = dir + ".tmp" - ) +func (db *DB) compact() error { + changes := false + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. + db.mtx.RLock() - if err := db.compactor.compact(tmpdir, blocks...); err != nil { - return err - } + if len(db.heads) > db.opts.AppendableBlocks { + for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { + // Blocks that won't be appendable when instantiating a new appender + // might still have active appenders on them. + // Abort at the first one we encounter. + if atomic.LoadUint64(&h.activeWriters) > 0 { + break + } - pb, err := newPersistedBlock(tmpdir) - if err != nil { - return err - } + db.logger.Log("msg", "write head", "seq", h.Meta().Sequence) - db.mtx.Lock() - defer db.mtx.Unlock() + select { + case <-db.stopc: + return nil + default: + } - for _, b := range blocks { - if err := b.Close(); err != nil { - return errors.Wrapf(err, "close old block %s", b.Dir()) + if err := db.compactor.Write(h.Dir(), h); err != nil { + db.mtx.RUnlock() + return errors.Wrap(err, "persist head block") + } + changes = true } } - if err := renameFile(tmpdir, dir); err != nil { - return errors.Wrap(err, "rename dir") - } - pb.dir = dir + db.mtx.RUnlock() - db.removeBlocks(i, j) - db.persisted = append(db.persisted, pb) + // Check for compactions of multiple blocks. + for { + plans, err := db.compactor.Plan() + if err != nil { + return errors.Wrap(err, "plan compaction") + } - for _, b := range blocks[1:] { - db.logger.Log("msg", "remove old dir", "dir", b.Dir()) - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") + select { + case <-db.stopc: + return nil + default: + } + // We just execute compactions sequentially to not cause too extreme + // CPU and memory spikes. + // TODO(fabxc): return more descriptive plans in the future that allow + // estimation of resource usage and conditional parallelization? + for _, p := range plans { + db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p)) + + if err := db.compactor.Compact(p...); err != nil { + return errors.Wrapf(err, "compact", p) + } + changes = true + } + // If we didn't compact anything, there's nothing left to do. + if len(plans) == 0 { + break } - } - if err := db.retentionCutoff(); err != nil { - return err } + if changes { + return errors.Wrap(db.reloadBlocks(), "reload blocks") + } return nil } -func (db *DB) retentionCutoff() error { - if db.opts.RetentionDuration == 0 { - return nil - } - h := db.heads[len(db.heads)-1] - t := h.meta.MinTime - int64(db.opts.RetentionDuration) +// func (db *DB) retentionCutoff() error { +// if db.opts.RetentionDuration == 0 { +// return nil +// } +// h := db.heads[len(db.heads)-1] +// t := h.meta.MinTime - int64(db.opts.RetentionDuration) + +// var ( +// blocks = db.blocks() +// i int +// b Block +// ) +// for i, b = range blocks { +// if b.Meta().MinTime >= t { +// break +// } +// } +// if i <= 1 { +// return nil +// } +// db.logger.Log("msg", "retention cutoff", "idx", i-1) +// db.removeBlocks(0, i) + +// for _, b := range blocks[:i] { +// if err := os.RemoveAll(b.Dir()); err != nil { +// return errors.Wrap(err, "removing old block") +// } +// } +// return nil +// } + +func (db *DB) reloadBlocks() error { + db.mtx.Lock() + defer db.mtx.Unlock() + dirs, err := blockDirs(db.dir) + if err != nil { + return errors.Wrap(err, "find blocks") + } var ( - blocks = db.blocks() - i int - b Block + metas []*BlockMeta + persisted []*persistedBlock + heads []*headBlock + seqBlocks = make(map[int]Block, len(dirs)) ) - for i, b = range blocks { - if b.Meta().MinTime >= t { - break + + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return errors.Wrapf(err, "read meta information %s", dir) } + metas = append(metas, meta) } - if i <= 1 { - return nil - } - db.logger.Log("msg", "retention cutoff", "idx", i-1) - db.removeBlocks(0, i) - for _, b := range blocks[:i] { - if err := os.RemoveAll(b.Dir()); err != nil { - return errors.Wrap(err, "removing old block") + for i, meta := range metas { + b, ok := db.seqBlocks[meta.Sequence] + if !ok { + return errors.Errorf("missing block for sequence %d", meta.Sequence) } - } - return nil -} -func (db *DB) initBlocks() error { - var ( - persisted []*persistedBlock - heads []*headBlock - ) + if meta.Compaction.Generation == 0 { + if meta.ULID != b.Meta().ULID { + return errors.Errorf("head block ULID changed unexpectedly") + } + heads = append(heads, b.(*headBlock)) + } else { + if meta.ULID != b.Meta().ULID { + if err := b.Close(); err != nil { + return err + } + b, err = newPersistedBlock(dirs[i]) + if err != nil { + return errors.Wrapf(err, "open persisted block %s", dirs[i]) + } + } + persisted = append(persisted, b.(*persistedBlock)) + } - dirs, err := blockDirs(db.dir) - if err != nil { - return err + seqBlocks[meta.Sequence] = b } - for _, dir := range dirs { - if fileutil.Exist(filepath.Join(dir, walDirName)) { - h, err := openHeadBlock(dir, db.logger) - if err != nil { + for seq, b := range db.seqBlocks { + if _, ok := seqBlocks[seq]; !ok { + if err := b.Close(); err != nil { return err } - h.generation = db.headGen - db.headGen++ - heads = append(heads, h) - continue - } - b, err := newPersistedBlock(dir) - if err != nil { - return err } - persisted = append(persisted, b) } + db.seqBlocks = seqBlocks db.persisted = persisted db.heads = heads @@ -643,6 +631,7 @@ func (db *DB) cut(mint int64) (*headBlock, error) { } db.heads = append(db.heads, newHead) + db.seqBlocks[seq] = newHead db.headGen++ newHead.generation = db.headGen diff --git a/head.go b/head.go index 260fb962e..4ddd1957c 100644 --- a/head.go +++ b/head.go @@ -63,7 +63,10 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0777); err != nil { + // Make head block creation appear atomic. + tmp := dir + ".tmp" + + if err := os.MkdirAll(tmp, 0777); err != nil { return nil, err } ulid, err := ulid.New(ulid.Now(), entropy) @@ -71,7 +74,7 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head return nil, err } - if err := writeMetaFile(dir, &BlockMeta{ + if err := writeMetaFile(tmp, &BlockMeta{ ULID: ulid, Sequence: seq, MinTime: mint, @@ -79,6 +82,9 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head }); err != nil { return nil, err } + if err := renameFile(tmp, dir); err != nil { + return nil, err + } return openHeadBlock(dir, l) }