|
|
|
@ -326,6 +326,13 @@ func (a dbAppender) Commit() error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compact data if possible. After successful compaction blocks are reloaded
|
|
|
|
|
// which will also trigger blocks to be deleted that fall out of the retention
|
|
|
|
|
// window.
|
|
|
|
|
// If no blocks are compacted, the retention window state doesn't change. Thus,
|
|
|
|
|
// this is sufficient to reliably delete old data.
|
|
|
|
|
// Old blocks are only deleted on reload based on the new block's parent information.
|
|
|
|
|
// See DB.reload documentation for further information.
|
|
|
|
|
func (db *DB) compact() (changes bool, err error) {
|
|
|
|
|
db.cmtx.Lock()
|
|
|
|
|
defer db.cmtx.Unlock()
|
|
|
|
@ -426,8 +433,13 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
// We delete old blocks that have been superseded by new ones by gathering all parents
|
|
|
|
|
// from existing blocks. Those parents all have newer replacements and can be safely deleted
|
|
|
|
|
// after we loaded the other blocks.
|
|
|
|
|
// This makes us resilient against the process crashing towards the end of a compaction.
|
|
|
|
|
// Creation of a new block and deletion of its parents cannot happen atomically. By creating
|
|
|
|
|
// blocks atomically with their parents, we can pick up the deletion where it left off during
|
|
|
|
|
// a crash.
|
|
|
|
|
var (
|
|
|
|
|
blocks []*Block
|
|
|
|
|
corrupted = map[ulid.ULID]error{}
|
|
|
|
|
opened = map[ulid.ULID]struct{}{}
|
|
|
|
|
deleteable = map[ulid.ULID]struct{}{}
|
|
|
|
|
)
|
|
|
|
@ -437,6 +449,13 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
// The block was potentially in the middle of being deleted during a crash.
|
|
|
|
|
// Skip it since we may delete it properly further down again.
|
|
|
|
|
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
|
|
|
|
|
|
|
|
|
|
ulid, err2 := ulid.Parse(filepath.Base(dir))
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
corrupted[ulid] = err
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if db.beyondRetention(meta) {
|
|
|
|
@ -447,11 +466,17 @@ func (db *DB) reload() (err error) {
|
|
|
|
|
deleteable[b.ULID] = struct{}{}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Blocks we failed to open should all be those we are want to delete anyway.
|
|
|
|
|
for c, err := range corrupted {
|
|
|
|
|
if _, ok := deleteable[c]; !ok {
|
|
|
|
|
return errors.Wrapf(err, "unexpected corrupted block %s", c)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Load new blocks into memory.
|
|
|
|
|
for _, dir := range dirs {
|
|
|
|
|
meta, err := readMetaFile(dir)
|
|
|
|
|
if err != nil {
|
|
|
|
|
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
|
|
|
|
|
continue
|
|
|
|
|
return errors.Wrapf(err, "read meta information %s", dir)
|
|
|
|
|
}
|
|
|
|
|
// Don't load blocks that are scheduled for deletion.
|
|
|
|
|
if _, ok := deleteable[meta.ULID]; ok {
|
|
|
|
|