diff --git a/block.go b/block.go index b5b2f3ee5..1d5ebfb10 100644 --- a/block.go +++ b/block.go @@ -178,6 +178,8 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Short descriptions of the direct blocks that were used to create + // this block. Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` } diff --git a/db.go b/db.go index f81c2e8fa..394181089 100644 --- a/db.go +++ b/db.go @@ -326,6 +326,13 @@ func (a dbAppender) Commit() error { return err } +// Compact data if possible. After successful compaction blocks are reloaded +// which will also trigger blocks to be deleted that fall out of the retention +// window. +// If no blocks are compacted, the retention window state doesn't change. Thus, +// this is sufficient to reliably delete old data. +// Old blocks are only deleted on reload based on the new block's parent information. +// See DB.reload documentation for further information. func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -426,8 +433,13 @@ func (db *DB) reload() (err error) { // We delete old blocks that have been superseded by new ones by gathering all parents // from existing blocks. Those parents all have newer replacements and can be safely deleted // after we loaded the other blocks. + // This makes us resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. By creating + // blocks atomically with their parents, we can pick up the deletion where it left off during + // a crash. var ( blocks []*Block + corrupted = map[ulid.ULID]error{} opened = map[ulid.ULID]struct{}{} deleteable = map[ulid.ULID]struct{}{} ) @@ -437,6 +449,13 @@ func (db *DB) reload() (err error) { // The block was potentially in the middle of being deleted during a crash. // Skip it since we may delete it properly further down again. level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + + ulid, err2 := ulid.Parse(filepath.Base(dir)) + if err2 != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + corrupted[ulid] = err continue } if db.beyondRetention(meta) { @@ -447,11 +466,17 @@ func (db *DB) reload() (err error) { deleteable[b.ULID] = struct{}{} } } + // Blocks we failed to open should all be those we are want to delete anyway. + for c, err := range corrupted { + if _, ok := deleteable[c]; !ok { + return errors.Wrapf(err, "unexpected corrupted block %s", c) + } + } + // Load new blocks into memory. for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) - continue + return errors.Wrapf(err, "read meta information %s", dir) } // Don't load blocks that are scheduled for deletion. if _, ok := deleteable[meta.ULID]; ok {