Browse Source

tsdb: expose hook to customize block querier (#14114)

* expose hook for block querier

Signed-off-by: Ben Ye <benye@amazon.com>

* update comment

Signed-off-by: Ben Ye <benye@amazon.com>

* use defined type

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
pull/14345/head
Ben Ye 5 months ago committed by GitHub
parent
commit
5585a3c7e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 54
      tsdb/db.go
  2. 75
      tsdb/db_test.go

54
tsdb/db.go

@ -192,12 +192,22 @@ type Options struct {
// NewCompactorFunc is a function that returns a TSDB compactor.
NewCompactorFunc NewCompactorFunc
// BlockQuerierFunc is a function to return storage.Querier from a BlockReader.
BlockQuerierFunc BlockQuerierFunc
// BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
BlockChunkQuerierFunc BlockChunkQuerierFunc
}
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error)
type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
@ -244,6 +254,10 @@ type DB struct {
writeNotified wlog.WriteNotified
registerer prometheus.Registerer
blockQuerierFunc BlockQuerierFunc
blockChunkQuerierFunc BlockChunkQuerierFunc
}
type dbMetrics struct {
@ -559,10 +573,12 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
db.closers = append(db.closers, head)
return &DB{
dir: db.dir,
logger: db.logger,
blocks: blocks,
head: head,
dir: db.dir,
logger: db.logger,
blocks: blocks,
head: head,
blockQuerierFunc: NewBlockQuerier,
blockChunkQuerierFunc: NewBlockChunkQuerier,
}, nil
}
@ -870,6 +886,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
db.compactCancel = cancel
if opts.BlockQuerierFunc == nil {
db.blockQuerierFunc = NewBlockQuerier
} else {
db.blockQuerierFunc = opts.BlockQuerierFunc
}
if opts.BlockChunkQuerierFunc == nil {
db.blockChunkQuerierFunc = NewBlockChunkQuerier
} else {
db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc
}
var wal, wbl *wlog.WL
segmentSize := wlog.DefaultSegmentSize
// Wal is enabled.
@ -1964,7 +1992,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
var err error
inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
}
@ -1981,7 +2009,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt)
inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
}
@ -1995,9 +2023,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
var err error
outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()
return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
@ -2007,7 +2035,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
q, err := db.blockQuerierFunc(b, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}
@ -2045,7 +2073,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
}
@ -2062,7 +2090,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt)
inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
}
@ -2075,7 +2103,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
}
@ -2084,7 +2112,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}
for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
q, err := db.blockChunkQuerierFunc(b, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}

75
tsdb/db_test.go

@ -7159,3 +7159,78 @@ func TestNewCompactorFunc(t *testing.T) {
require.Len(t, ulids, 1)
require.Equal(t, block2, ulids[0])
}
func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
opts := DefaultOptions()
opts.BlockQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.Querier, error) {
// Only block with hints can be queried.
if len(b.Meta().Compaction.Hints) > 0 {
return NewBlockQuerier(b, mint, maxt)
}
return storage.NoopQuerier(), nil
}
opts.BlockChunkQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
// Only level 4 compaction block can be queried.
if b.Meta().Compaction.Level == 4 {
return NewBlockChunkQuerier(b, mint, maxt)
}
return storage.NoopChunkedQuerier(), nil
}
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
metas := []BlockMeta{
{Compaction: BlockMetaCompaction{Hints: []string{"test-hint"}}},
{Compaction: BlockMetaCompaction{Level: 4}},
}
for i := range metas {
// Include blockID into series to identify which block got touched.
serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})}
blockDir := createBlock(t, db.Dir(), serieses)
b, err := OpenBlock(db.logger, blockDir, db.chunkPool)
require.NoError(t, err)
// Overwrite meta.json with compaction section for testing purpose.
b.meta.Compaction = metas[i].Compaction
_, err = writeMetaFile(db.logger, blockDir, &b.meta)
require.NoError(t, err)
require.NoError(t, b.Close())
}
require.NoError(t, db.reloadBlocks())
require.Len(t, db.Blocks(), 2)
querier, err := db.Querier(0, 500)
require.NoError(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")
seriesSet := querier.Select(context.Background(), false, nil, matcher)
count := 0
var lbls labels.Labels
for seriesSet.Next() {
count++
lbls = seriesSet.At().Labels()
}
require.NoError(t, seriesSet.Err())
require.Equal(t, 1, count)
// Make sure only block-0 is queried.
require.Equal(t, "block-0", lbls.Get("block"))
chunkQuerier, err := db.ChunkQuerier(0, 500)
require.NoError(t, err)
defer chunkQuerier.Close()
css := chunkQuerier.Select(context.Background(), false, nil, matcher)
count = 0
// Reset lbls variable.
lbls = labels.EmptyLabels()
for css.Next() {
count++
lbls = css.At().Labels()
}
require.NoError(t, css.Err())
require.Equal(t, 1, count)
// Make sure only block-1 is queried.
require.Equal(t, "block-1", lbls.Get("block"))
}

Loading…
Cancel
Save