Browse Source

BlocksToDelete function in DB options (#7638)

* Optional retention filter for DB

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Specify len for the map creation

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
pull/7645/head
Ganesh Vernekar 4 years ago committed by GitHub
parent
commit
4a8531a64b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 82
      tsdb/db.go

82
tsdb/db.go

@ -118,19 +118,27 @@ type Options struct {
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
SeriesLifecycleCallback SeriesLifecycleCallback
// BlocksToDelete is a function which returns the blocks which can be deleted.
// It is always the default time and size based retention in Prometheus and
// mainly meant for external users who import TSDB.
BlocksToDelete BlocksToDeleteFunc
}
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf fileutil.Releaser
logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunkenc.Pool
compactor Compactor
logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunkenc.Pool
compactor Compactor
blocksToDelete BlocksToDeleteFunc
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
@ -560,14 +568,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
db = &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
autoCompact: true,
chunkPool: chunkenc.NewPool(),
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
autoCompact: true,
chunkPool: chunkenc.NewPool(),
blocksToDelete: opts.BlocksToDelete,
}
if db.blocksToDelete == nil {
db.blocksToDelete = DefaultBlocksToDelete(db)
}
if !opts.NoLockfile {
@ -871,13 +883,17 @@ func (db *DB) reload() (err error) {
return err
}
deletable := db.deletableBlocks(loadable)
deletableULIDs := db.blocksToDelete(loadable)
deletable := make(map[ulid.ULID]*Block, len(deletableULIDs))
// Corrupted blocks that have been superseded by a loadable block can be safely ignored.
// This makes it resilient against the process crashing towards the end of a compaction.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range loadable {
if _, ok := deletableULIDs[block.meta.ULID]; ok {
deletable[block.meta.ULID] = block
}
for _, b := range block.Meta().Compaction.Parents {
delete(corrupted, b.ULID)
deletable[b.ULID] = nil
@ -986,9 +1002,17 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
return blocks, corrupted, nil
}
// DefaultBlocksToDelete returns a filter which decides time based and size based
// retention from the options of the db.
func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc {
return func(blocks []*Block) map[ulid.ULID]struct{} {
return deletableBlocks(db, blocks)
}
}
// deletableBlocks returns all blocks past retention policy.
func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
deletable := make(map[ulid.ULID]*Block)
func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} {
deletable := make(map[ulid.ULID]struct{})
// Sort the blocks by time - newest to oldest (largest to smallest timestamp).
// This ensures that the retentions will remove the oldest blocks.
@ -998,34 +1022,36 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
for _, block := range blocks {
if block.Meta().Compaction.Deletable {
deletable[block.Meta().ULID] = block
deletable[block.Meta().ULID] = struct{}{}
}
}
for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
for ulid := range BeyondTimeRetention(db, blocks) {
deletable[ulid] = struct{}{}
}
for ulid, block := range db.beyondSizeRetention(blocks) {
deletable[ulid] = block
for ulid := range BeyondSizeRetention(db, blocks) {
deletable[ulid] = struct{}{}
}
return deletable
}
func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
// BeyondTimeRetention returns those blocks which are beyond the time retention
// set in the db options.
func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) {
// Time retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 {
return
}
deletable = make(map[ulid.ULID]*Block)
deletable = make(map[ulid.ULID]struct{})
for i, block := range blocks {
// The difference between the first block and this block is larger than
// the retention period so any blocks after that are added as deletable.
if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration {
for _, b := range blocks[i:] {
deletable[b.meta.ULID] = b
deletable[b.meta.ULID] = struct{}{}
}
db.metrics.timeRetentionCount.Inc()
break
@ -1034,13 +1060,15 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo
return deletable
}
func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) {
// BeyondSizeRetention returns those blocks which are beyond the size retention
// set in the db options.
func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) {
// Size retention is disabled or no blocks to work with.
if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 {
return
}
deletable = make(map[ulid.ULID]*Block)
deletable = make(map[ulid.ULID]struct{})
walSize, _ := db.Head().wal.Size()
headChunksSize := db.Head().chunkDiskMapper.Size()
@ -1052,7 +1080,7 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo
if blocksSize > int64(db.opts.MaxBytes) {
// Add this and all following blocks for deletion.
for _, b := range blocks[i:] {
deletable[b.meta.ULID] = b
deletable[b.meta.ULID] = struct{}{}
}
db.metrics.sizeRetentionCount.Inc()
break

Loading…
Cancel
Save