|
|
|
@ -96,9 +96,10 @@ type DB struct {
|
|
|
|
|
mtx sync.RWMutex
|
|
|
|
|
persisted []*persistedBlock
|
|
|
|
|
heads []*headBlock
|
|
|
|
|
seqBlocks map[int]Block
|
|
|
|
|
headGen uint8
|
|
|
|
|
|
|
|
|
|
compactor *compactor
|
|
|
|
|
compactor Compactor
|
|
|
|
|
|
|
|
|
|
compactc chan struct{}
|
|
|
|
|
donec chan struct{}
|
|
|
|
@ -171,14 +172,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|
|
|
|
donec: make(chan struct{}),
|
|
|
|
|
stopc: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
db.compactor = newCompactor(r, &compactorOptions{
|
|
|
|
|
db.compactor = newCompactor(dir, r, &compactorOptions{
|
|
|
|
|
maxBlockRange: opts.MaxBlockDuration,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err := db.initBlocks(); err != nil {
|
|
|
|
|
if err := db.reloadBlocks(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go db.run()
|
|
|
|
|
|
|
|
|
|
return db, nil
|
|
|
|
@ -200,35 +200,8 @@ func (db *DB) run() {
|
|
|
|
|
case <-db.compactc:
|
|
|
|
|
db.metrics.compactionsTriggered.Inc()
|
|
|
|
|
|
|
|
|
|
var seqs []int
|
|
|
|
|
var infos []compactionInfo
|
|
|
|
|
for _, b := range db.compactable() {
|
|
|
|
|
m := b.Meta()
|
|
|
|
|
|
|
|
|
|
infos = append(infos, compactionInfo{
|
|
|
|
|
generation: m.Compaction.Generation,
|
|
|
|
|
mint: m.MinTime,
|
|
|
|
|
maxt: m.MaxTime,
|
|
|
|
|
seq: m.Sequence,
|
|
|
|
|
})
|
|
|
|
|
seqs = append(seqs, m.Sequence)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
i, j, ok := db.compactor.pick(infos)
|
|
|
|
|
if !ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j]))
|
|
|
|
|
|
|
|
|
|
if err := db.compact(i, j); err != nil {
|
|
|
|
|
if err := db.compact(); err != nil {
|
|
|
|
|
db.logger.Log("msg", "compaction failed", "err", err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
db.logger.Log("msg", "compaction completed")
|
|
|
|
|
// Trigger another compaction in case there's more work to do.
|
|
|
|
|
select {
|
|
|
|
|
case db.compactc <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case <-db.stopc:
|
|
|
|
@ -237,150 +210,165 @@ func (db *DB) run() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) getBlock(i int) Block {
|
|
|
|
|
if i < len(db.persisted) {
|
|
|
|
|
return db.persisted[i]
|
|
|
|
|
}
|
|
|
|
|
return db.heads[i-len(db.persisted)]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// removeBlocks removes the blocks in range [i, j) from the list of persisted
|
|
|
|
|
// and head blocks. The blocks are not closed and their files not deleted.
|
|
|
|
|
func (db *DB) removeBlocks(i, j int) {
|
|
|
|
|
for k := i; k < j; k++ {
|
|
|
|
|
if i < len(db.persisted) {
|
|
|
|
|
db.persisted = append(db.persisted[:i], db.persisted[i+1:]...)
|
|
|
|
|
} else {
|
|
|
|
|
l := i - len(db.persisted)
|
|
|
|
|
db.heads = append(db.heads[:l], db.heads[l+1:]...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) blocks() (bs []Block) {
|
|
|
|
|
for _, b := range db.persisted {
|
|
|
|
|
bs = append(bs, b)
|
|
|
|
|
}
|
|
|
|
|
for _, b := range db.heads {
|
|
|
|
|
bs = append(bs, b)
|
|
|
|
|
}
|
|
|
|
|
return bs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// compact block in range [i, j) into a temporary directory and atomically
|
|
|
|
|
// swap the blocks out on successful completion.
|
|
|
|
|
func (db *DB) compact(i, j int) error {
|
|
|
|
|
if j <= i {
|
|
|
|
|
return errors.New("invalid compaction block range")
|
|
|
|
|
}
|
|
|
|
|
var blocks []Block
|
|
|
|
|
for k := i; k < j; k++ {
|
|
|
|
|
blocks = append(blocks, db.getBlock(k))
|
|
|
|
|
}
|
|
|
|
|
var (
|
|
|
|
|
dir = blocks[0].Dir()
|
|
|
|
|
tmpdir = dir + ".tmp"
|
|
|
|
|
)
|
|
|
|
|
func (db *DB) compact() error {
|
|
|
|
|
changes := false
|
|
|
|
|
// Check whether we have pending head blocks that are ready to be persisted.
|
|
|
|
|
// They have the highest priority.
|
|
|
|
|
db.mtx.RLock()
|
|
|
|
|
|
|
|
|
|
if err := db.compactor.compact(tmpdir, blocks...); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if len(db.heads) > db.opts.AppendableBlocks {
|
|
|
|
|
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] {
|
|
|
|
|
// Blocks that won't be appendable when instantiating a new appender
|
|
|
|
|
// might still have active appenders on them.
|
|
|
|
|
// Abort at the first one we encounter.
|
|
|
|
|
if atomic.LoadUint64(&h.activeWriters) > 0 {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pb, err := newPersistedBlock(tmpdir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
db.logger.Log("msg", "write head", "seq", h.Meta().Sequence)
|
|
|
|
|
|
|
|
|
|
db.mtx.Lock()
|
|
|
|
|
defer db.mtx.Unlock()
|
|
|
|
|
select {
|
|
|
|
|
case <-db.stopc:
|
|
|
|
|
return nil
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, b := range blocks {
|
|
|
|
|
if err := b.Close(); err != nil {
|
|
|
|
|
return errors.Wrapf(err, "close old block %s", b.Dir())
|
|
|
|
|
if err := db.compactor.Write(h.Dir(), h); err != nil {
|
|
|
|
|
db.mtx.RUnlock()
|
|
|
|
|
return errors.Wrap(err, "persist head block")
|
|
|
|
|
}
|
|
|
|
|
changes = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := renameFile(tmpdir, dir); err != nil {
|
|
|
|
|
return errors.Wrap(err, "rename dir")
|
|
|
|
|
}
|
|
|
|
|
pb.dir = dir
|
|
|
|
|
db.mtx.RUnlock()
|
|
|
|
|
|
|
|
|
|
db.removeBlocks(i, j)
|
|
|
|
|
db.persisted = append(db.persisted, pb)
|
|
|
|
|
// Check for compactions of multiple blocks.
|
|
|
|
|
for {
|
|
|
|
|
plans, err := db.compactor.Plan()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "plan compaction")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, b := range blocks[1:] {
|
|
|
|
|
db.logger.Log("msg", "remove old dir", "dir", b.Dir())
|
|
|
|
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
|
|
|
|
return errors.Wrap(err, "removing old block")
|
|
|
|
|
select {
|
|
|
|
|
case <-db.stopc:
|
|
|
|
|
return nil
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
// We just execute compactions sequentially to not cause too extreme
|
|
|
|
|
// CPU and memory spikes.
|
|
|
|
|
// TODO(fabxc): return more descriptive plans in the future that allow
|
|
|
|
|
// estimation of resource usage and conditional parallelization?
|
|
|
|
|
for _, p := range plans {
|
|
|
|
|
db.logger.Log("msg", "compact blocks", "seq", fmt.Sprintf("%v", p))
|
|
|
|
|
|
|
|
|
|
if err := db.compactor.Compact(p...); err != nil {
|
|
|
|
|
return errors.Wrapf(err, "compact", p)
|
|
|
|
|
}
|
|
|
|
|
changes = true
|
|
|
|
|
}
|
|
|
|
|
// If we didn't compact anything, there's nothing left to do.
|
|
|
|
|
if len(plans) == 0 {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if err := db.retentionCutoff(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if changes {
|
|
|
|
|
return errors.Wrap(db.reloadBlocks(), "reload blocks")
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) retentionCutoff() error {
|
|
|
|
|
if db.opts.RetentionDuration == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
h := db.heads[len(db.heads)-1]
|
|
|
|
|
t := h.meta.MinTime - int64(db.opts.RetentionDuration)
|
|
|
|
|
// func (db *DB) retentionCutoff() error {
|
|
|
|
|
// if db.opts.RetentionDuration == 0 {
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
// h := db.heads[len(db.heads)-1]
|
|
|
|
|
// t := h.meta.MinTime - int64(db.opts.RetentionDuration)
|
|
|
|
|
|
|
|
|
|
// var (
|
|
|
|
|
// blocks = db.blocks()
|
|
|
|
|
// i int
|
|
|
|
|
// b Block
|
|
|
|
|
// )
|
|
|
|
|
// for i, b = range blocks {
|
|
|
|
|
// if b.Meta().MinTime >= t {
|
|
|
|
|
// break
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// if i <= 1 {
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
// db.logger.Log("msg", "retention cutoff", "idx", i-1)
|
|
|
|
|
// db.removeBlocks(0, i)
|
|
|
|
|
|
|
|
|
|
// for _, b := range blocks[:i] {
|
|
|
|
|
// if err := os.RemoveAll(b.Dir()); err != nil {
|
|
|
|
|
// return errors.Wrap(err, "removing old block")
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
// return nil
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
func (db *DB) reloadBlocks() error {
|
|
|
|
|
db.mtx.Lock()
|
|
|
|
|
defer db.mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
dirs, err := blockDirs(db.dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "find blocks")
|
|
|
|
|
}
|
|
|
|
|
var (
|
|
|
|
|
blocks = db.blocks()
|
|
|
|
|
i int
|
|
|
|
|
b Block
|
|
|
|
|
metas []*BlockMeta
|
|
|
|
|
persisted []*persistedBlock
|
|
|
|
|
heads []*headBlock
|
|
|
|
|
seqBlocks = make(map[int]Block, len(dirs))
|
|
|
|
|
)
|
|
|
|
|
for i, b = range blocks {
|
|
|
|
|
if b.Meta().MinTime >= t {
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
for _, dir := range dirs {
|
|
|
|
|
meta, err := readMetaFile(dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrapf(err, "read meta information %s", dir)
|
|
|
|
|
}
|
|
|
|
|
metas = append(metas, meta)
|
|
|
|
|
}
|
|
|
|
|
if i <= 1 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
db.logger.Log("msg", "retention cutoff", "idx", i-1)
|
|
|
|
|
db.removeBlocks(0, i)
|
|
|
|
|
|
|
|
|
|
for _, b := range blocks[:i] {
|
|
|
|
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
|
|
|
|
return errors.Wrap(err, "removing old block")
|
|
|
|
|
for i, meta := range metas {
|
|
|
|
|
b, ok := db.seqBlocks[meta.Sequence]
|
|
|
|
|
if !ok {
|
|
|
|
|
return errors.Errorf("missing block for sequence %d", meta.Sequence)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (db *DB) initBlocks() error {
|
|
|
|
|
var (
|
|
|
|
|
persisted []*persistedBlock
|
|
|
|
|
heads []*headBlock
|
|
|
|
|
)
|
|
|
|
|
if meta.Compaction.Generation == 0 {
|
|
|
|
|
if meta.ULID != b.Meta().ULID {
|
|
|
|
|
return errors.Errorf("head block ULID changed unexpectedly")
|
|
|
|
|
}
|
|
|
|
|
heads = append(heads, b.(*headBlock))
|
|
|
|
|
} else {
|
|
|
|
|
if meta.ULID != b.Meta().ULID {
|
|
|
|
|
if err := b.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
b, err = newPersistedBlock(dirs[i])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrapf(err, "open persisted block %s", dirs[i])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
persisted = append(persisted, b.(*persistedBlock))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dirs, err := blockDirs(db.dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
seqBlocks[meta.Sequence] = b
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, dir := range dirs {
|
|
|
|
|
if fileutil.Exist(filepath.Join(dir, walDirName)) {
|
|
|
|
|
h, err := openHeadBlock(dir, db.logger)
|
|
|
|
|
if err != nil {
|
|
|
|
|
for seq, b := range db.seqBlocks {
|
|
|
|
|
if _, ok := seqBlocks[seq]; !ok {
|
|
|
|
|
if err := b.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
h.generation = db.headGen
|
|
|
|
|
db.headGen++
|
|
|
|
|
heads = append(heads, h)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
b, err := newPersistedBlock(dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
persisted = append(persisted, b)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db.seqBlocks = seqBlocks
|
|
|
|
|
db.persisted = persisted
|
|
|
|
|
db.heads = heads
|
|
|
|
|
|
|
|
|
@ -643,6 +631,7 @@ func (db *DB) cut(mint int64) (*headBlock, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
db.heads = append(db.heads, newHead)
|
|
|
|
|
db.seqBlocks[seq] = newHead
|
|
|
|
|
db.headGen++
|
|
|
|
|
|
|
|
|
|
newHead.generation = db.headGen
|
|
|
|
|