diff --git a/block.go b/block.go index 9ae2adbde..1d5ebfb10 100644 --- a/block.go +++ b/block.go @@ -164,6 +164,13 @@ type BlockStats struct { NumTombstones uint64 `json:"numTombstones,omitempty"` } +// BlockDesc describes a block by ULID and time range. +type BlockDesc struct { + ULID ulid.ULID `json:"ulid"` + MinTime int64 `json:"minTime"` + MaxTime int64 `json:"maxTime"` +} + // BlockMetaCompaction holds information about compactions a block went through. type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has @@ -171,6 +178,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Short descriptions of the direct blocks that were used to create + // this block. + Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` } @@ -475,19 +485,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { pb.tombstones.Iter(func(id uint64, ivs Intervals) error { numStones += len(ivs) - return nil }) - if numStones == 0 { return nil, nil } - uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime) + meta := pb.Meta() + uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) if err != nil { return nil, err } - return &uid, nil } diff --git a/compact.go b/compact.go index 16a3bd747..8743c8ba2 100644 --- a/compact.go +++ b/compact.go @@ -55,7 +55,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) + Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } + res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ + ULID: b.ULID, + MinTime: b.MinTime, + MaxTime: b.MaxTime, + }) } res.Compaction.Level++ @@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, return uid, merr } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) ( meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} + if parent != nil { + meta.Compaction.Parents = []BlockDesc{ + {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, + } + } + err := c.write(dest, meta, b) if err != nil { return uid, err diff --git a/db.go b/db.go index 2b188423f..28fafc09b 100644 --- a/db.go +++ b/db.go @@ -267,17 +267,9 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - _, err1 := db.retentionCutoff() - if err1 != nil { - level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1) - } - - _, err2 := db.compact() - if err2 != nil { - level.Error(db.logger).Log("msg", "compaction failed", "err", err2) - } - - if err1 != nil || err2 != nil { + _, err := db.compact() + if err != nil { + level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) } else { backoff = 0 @@ -289,19 +281,9 @@ func (db *DB) run() { } } -func (db *DB) retentionCutoff() (b bool, err error) { - defer func() { - if !b && err == nil { - // no data had to be cut off. - return - } - db.metrics.cutoffs.Inc() - if err != nil { - db.metrics.cutoffsFailed.Inc() - } - }() +func (db *DB) beyondRetention(meta *BlockMeta) bool { if db.opts.RetentionDuration == 0 { - return false, nil + return false } db.mtx.RLock() @@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) { db.mtx.RUnlock() if len(blocks) == 0 { - return false, nil + return false } last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - dirs, err := retentionCutoffDirs(db.dir, mint) - if err != nil { - return false, err - } - - // This will close the dirs and then delete the dirs. - if len(dirs) > 0 { - return true, db.reload(dirs...) - } - return false, nil + return meta.MaxTime < mint } // Appender opens a new appender against the database. @@ -354,6 +326,13 @@ func (a dbAppender) Commit() error { return err } +// Compact data if possible. After successful compaction blocks are reloaded +// which will also trigger blocks to be deleted that fall out of the retention +// window. +// If no blocks are compacted, the retention window state doesn't change. Thus, +// this is sufficient to reliably delete old data. +// Old blocks are only deleted on reload based on the new block's parent information. +// See DB.reload documentation for further information. func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -383,7 +362,7 @@ func (db *DB) compact() (changes bool, err error) { mint: mint, maxt: maxt, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -418,7 +397,7 @@ func (db *DB) compact() (changes bool, err error) { changes = true runtime.GC() - if err := db.reload(plan...); err != nil { + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -427,39 +406,6 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoffDirs returns all directories of blocks in dir that are strictly -// before mint. -func retentionCutoffDirs(dir string, mint int64) ([]string, error) { - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, errors.Wrapf(err, "open directory") - } - defer df.Close() - - dirs, err := blockDirs(dir) - if err != nil { - return nil, errors.Wrapf(err, "list block dirs %s", dir) - } - - delDirs := []string{} - - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return nil, errors.Wrapf(err, "read block meta %s", dir) - } - // The first block we encounter marks that we crossed the boundary - // of deletable blocks. - if meta.MaxTime >= mint { - break - } - - delDirs = append(delDirs, dir) - } - - return delDirs, nil -} - func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { @@ -469,18 +415,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -func stringsContain(set []string, elem string) bool { - for _, e := range set { - if elem == e { - return true - } - } - return false -} - // reload on-disk blocks and trigger head truncation if new blocks appeared. It takes // a list of block directories which should be deleted during reload. -func (db *DB) reload(deleteable ...string) (err error) { +// Blocks that are obsolete due to replacement or retention will be deleted. +func (db *DB) reload() (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -492,21 +430,58 @@ func (db *DB) reload(deleteable ...string) (err error) { if err != nil { return errors.Wrap(err, "find blocks") } + // We delete old blocks that have been superseded by new ones by gathering all parents + // from existing blocks. Those parents all have newer replacements and can be safely deleted + // after we loaded the other blocks. + // This makes us resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. By creating + // blocks with their parents, we can pick up the deletion where it left off during a crash. var ( - blocks []*Block - exist = map[ulid.ULID]struct{}{} + blocks []*Block + corrupted = map[ulid.ULID]error{} + opened = map[ulid.ULID]struct{}{} + deleteable = map[ulid.ULID]struct{}{} ) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + // The block was potentially in the middle of being deleted during a crash. + // Skip it since we may delete it properly further down again. + level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + ulid, err2 := ulid.Parse(filepath.Base(dir)) + if err2 != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + corrupted[ulid] = err + continue + } + if db.beyondRetention(meta) { + deleteable[meta.ULID] = struct{}{} + continue + } + for _, b := range meta.Compaction.Parents { + deleteable[b.ULID] = struct{}{} + } + } + // Blocks we failed to open should all be those we are want to delete anyway. + for c, err := range corrupted { + if _, ok := deleteable[c]; !ok { + return errors.Wrapf(err, "unexpected corrupted block %s", c) + } + } + // Load new blocks into memory. for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } - // If the block is pending for deletion, don't add it to the new block set. - if stringsContain(deleteable, dir) { + // Don't load blocks that are scheduled for deletion. + if _, ok := deleteable[meta.ULID]; ok { continue } - + // See if we already have the block in memory or open it otherwise. b, ok := db.getBlock(meta.ULID) if !ok { b, err = OpenBlock(dir, db.chunkPool) @@ -514,9 +489,8 @@ func (db *DB) reload(deleteable ...string) (err error) { return errors.Wrapf(err, "open block %s", dir) } } - blocks = append(blocks, b) - exist[meta.ULID] = struct{}{} + opened[meta.ULID] = struct{}{} } sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime @@ -533,15 +507,19 @@ func (db *DB) reload(deleteable ...string) (err error) { db.blocks = blocks db.mtx.Unlock() + // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := exist[b.Meta().ULID]; ok { + if _, ok := opened[b.Meta().ULID]; ok { continue } if err := b.Close(); err != nil { level.Warn(db.logger).Log("msg", "closing block failed", "err", err) } - if err := os.RemoveAll(b.Dir()); err != nil { - level.Warn(db.logger).Log("msg", "deleting block failed", "err", err) + } + // Delete all obsolete blocks. None of them are opened any longer. + for ulid := range deleteable { + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) } } @@ -765,7 +743,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) return errors.Wrap(err, "snapshot head block") } @@ -859,22 +837,15 @@ func (db *DB) CleanTombstones() (err error) { blocks := db.blocks[:] db.mtx.RUnlock() - deletable := []string{} for _, b := range blocks { if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) return err } else if uid != nil { // New block was created. - deletable = append(deletable, b.Dir()) newUIDs = append(newUIDs, *uid) } } - - if len(deletable) == 0 { - return nil - } - - return errors.Wrap(db.reload(deletable...), "reload blocks") + return errors.Wrap(db.reload(), "reload blocks") } func intervalOverlap(amin, amax, bmin, bmax int64) bool { diff --git a/db_test.go b/db_test.go index f0f3142c3..72817a109 100644 --- a/db_test.go +++ b/db_test.go @@ -842,7 +842,7 @@ type mockCompactorFailing struct { func (*mockCompactorFailing) Plan(dir string) ([]string, error) { return nil, nil } -func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { +func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { if len(c.blocks) >= c.max { return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } @@ -925,10 +925,8 @@ func TestDB_Retention(t *testing.T) { testutil.Equals(t, 2, len(db.blocks)) - // Now call retention. - changes, err := db.retentionCutoff() - testutil.Ok(t, err) - testutil.Assert(t, changes, "there should be changes") + // Reload blocks, which should drop blocks beyond the retention boundary. + testutil.Ok(t, db.reload()) testutil.Equals(t, 1, len(db.blocks)) testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. }