feat: Allow customizing TSDB postings decoder (#13567)

* allow customizing TSDB postings decoder

---------

Signed-off-by: Ben Ye <benye@amazon.com>
fionaliao/3.0-migration-guide-fixes
Ben Ye 2 weeks ago committed by GitHub
parent f9057544cb
commit 140f4aa9ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -405,7 +405,7 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
} }
} }
b, err := db.Block(blockID) b, err := db.Block(blockID, tsdb.DefaultPostingsDecoderFactory)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

@ -330,7 +330,7 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs. // to instantiate chunk structs.
func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (pb *Block, err error) {
if logger == nil { if logger == nil {
logger = promslog.NewNopLogger() logger = promslog.NewNopLogger()
} }
@ -351,7 +351,11 @@ func OpenBlock(logger *slog.Logger, dir string, pool chunkenc.Pool) (pb *Block,
} }
closers = append(closers, cr) closers = append(closers, cr)
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) decoder := index.DecodePostingsRaw
if postingsDecoderFactory != nil {
decoder = postingsDecoderFactory(meta)
}
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename), decoder)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -59,14 +59,14 @@ func TestSetCompactionFailed(t *testing.T) {
tmpdir := t.TempDir() tmpdir := t.TempDir()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1)) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil) b, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.False(t, b.meta.Compaction.Failed) require.False(t, b.meta.Compaction.Failed)
require.NoError(t, b.setCompactionFailed()) require.NoError(t, b.setCompactionFailed())
require.True(t, b.meta.Compaction.Failed) require.True(t, b.meta.Compaction.Failed)
require.NoError(t, b.Close()) require.NoError(t, b.Close())
b, err = OpenBlock(nil, blockDir, nil) b, err = OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.True(t, b.meta.Compaction.Failed) require.True(t, b.meta.Compaction.Failed)
require.NoError(t, b.Close()) require.NoError(t, b.Close())
@ -74,7 +74,7 @@ func TestSetCompactionFailed(t *testing.T) {
func TestCreateBlock(t *testing.T) { func TestCreateBlock(t *testing.T) {
tmpdir := t.TempDir() tmpdir := t.TempDir()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil) b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, b.Close()) require.NoError(t, b.Close())
} }
@ -84,7 +84,7 @@ func BenchmarkOpenBlock(b *testing.B) {
blockDir := createBlock(b, tmpdir, genSeries(1e6, 20, 0, 10)) blockDir := createBlock(b, tmpdir, genSeries(1e6, 20, 0, 10))
b.Run("benchmark", func(b *testing.B) { b.Run("benchmark", func(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(b, err) require.NoError(b, err)
require.NoError(b, block.Close()) require.NoError(b, block.Close())
} }
@ -190,7 +190,7 @@ func TestCorruptedChunk(t *testing.T) {
require.NoError(t, f.Close()) require.NoError(t, f.Close())
// Check open err. // Check open err.
b, err := OpenBlock(nil, blockDir, nil) b, err := OpenBlock(nil, blockDir, nil, nil)
if tc.openErr != nil { if tc.openErr != nil {
require.EqualError(t, err, tc.openErr.Error()) require.EqualError(t, err, tc.openErr.Error())
return return
@ -245,7 +245,7 @@ func TestLabelValuesWithMatchers(t *testing.T) {
require.NotEmpty(t, files, "No chunk created.") require.NotEmpty(t, files, "No chunk created.")
// Check open err. // Check open err.
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }() defer func() { require.NoError(t, block.Close()) }()
@ -325,7 +325,7 @@ func TestBlockQuerierReturnsSortedLabelValues(t *testing.T) {
blockDir := createBlock(t, tmpdir, seriesEntries) blockDir := createBlock(t, tmpdir, seriesEntries)
// Check open err. // Check open err.
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) }) t.Cleanup(func() { require.NoError(t, block.Close()) })
@ -352,7 +352,7 @@ func TestBlockSize(t *testing.T) {
// Create a block and compare the reported size vs actual disk size. // Create a block and compare the reported size vs actual disk size.
{ {
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100)) blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil) blockInit, err = OpenBlock(nil, blockDirInit, nil, nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, blockInit.Close()) require.NoError(t, blockInit.Close())
@ -377,7 +377,7 @@ func TestBlockSize(t *testing.T) {
blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, blockDirsAfterCompact, 1) require.Len(t, blockDirsAfterCompact, 1)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil) blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil, nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, blockAfterCompact.Close()) require.NoError(t, blockAfterCompact.Close())
@ -408,7 +408,7 @@ func TestReadIndexFormatV1(t *testing.T) {
*/ */
blockDir := filepath.Join("testdata", "index_format_v1") blockDir := filepath.Join("testdata", "index_format_v1")
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
q, err := NewBlockQuerier(block, 0, 1000) q, err := NewBlockQuerier(block, 0, 1000)
@ -445,7 +445,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) {
require.NotEmpty(b, files, "No chunk created.") require.NotEmpty(b, files, "No chunk created.")
// Check open err. // Check open err.
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(b, err) require.NoError(b, err)
defer func() { require.NoError(b, block.Close()) }() defer func() { require.NoError(b, block.Close()) }()
@ -497,7 +497,7 @@ func TestLabelNamesWithMatchers(t *testing.T) {
require.NotEmpty(t, files, "No chunk created.") require.NotEmpty(t, files, "No chunk created.")
// Check open err. // Check open err.
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) }) t.Cleanup(func() { require.NoError(t, block.Close()) })
@ -551,7 +551,7 @@ func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, files, "No chunk created.") require.NotEmpty(t, files, "No chunk created.")
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) }) t.Cleanup(func() { require.NoError(t, block.Close()) })

@ -47,7 +47,7 @@ func TestBlockWriter(t *testing.T) {
// Confirm the block has the correct data. // Confirm the block has the correct data.
blockpath := filepath.Join(outputDir, id.String()) blockpath := filepath.Join(outputDir, id.String())
b, err := OpenBlock(nil, blockpath, nil) b, err := OpenBlock(nil, blockpath, nil, nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { require.NoError(t, b.Close()) }() defer func() { require.NoError(t, b.Close()) }()
q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64) q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64)

@ -87,6 +87,7 @@ type LeveledCompactor struct {
maxBlockChunkSegmentSize int64 maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder postingsEncoder index.PostingsEncoder
postingsDecoderFactory PostingsDecoderFactory
enableOverlappingCompaction bool enableOverlappingCompaction bool
} }
@ -158,6 +159,9 @@ type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
PE index.PostingsEncoder PE index.PostingsEncoder
// PD specifies the postings decoder factory to return different postings decoder based on BlockMeta. It is called when opening a block or opening the index file.
// If it is nil then a default decoder is used, compatible with Prometheus v2.
PD PostingsDecoderFactory
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64 MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
@ -167,6 +171,12 @@ type LeveledCompactorOptions struct {
EnableOverlappingCompaction bool EnableOverlappingCompaction bool
} }
type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder
func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder {
return index.DecodePostingsRaw
}
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
@ -213,6 +223,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc, mergeFunc: mergeFunc,
postingsEncoder: pe, postingsEncoder: pe,
postingsDecoderFactory: opts.PD,
enableOverlappingCompaction: opts.EnableOverlappingCompaction, enableOverlappingCompaction: opts.EnableOverlappingCompaction,
}, nil }, nil
} }
@ -477,7 +488,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
if b == nil { if b == nil {
var err error var err error
b, err = OpenBlock(c.logger, d, c.chunkPool) b, err = OpenBlock(c.logger, d, c.chunkPool, c.postingsDecoderFactory)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -1153,7 +1153,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs := make([]string, 0, len(c.ranges)) blockDirs := make([]string, 0, len(c.ranges))
var blocks []*Block var blocks []*Block
for _, r := range c.ranges { for _, r := range c.ranges {
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil, nil)
require.NoError(b, err) require.NoError(b, err)
blocks = append(blocks, block) blocks = append(blocks, block)
defer func() { defer func() {
@ -1549,7 +1549,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
require.Len(t, ids, 1) require.Len(t, ids, 1)
// Open the block and query it and check the histograms. // Open the block and query it and check the histograms.
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil) block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, block.Close()) require.NoError(t, block.Close())
@ -1911,7 +1911,7 @@ func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
ctx := context.Background() ctx := context.Background()
tmpdir := t.TempDir() tmpdir := t.TempDir()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 10)) blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 10))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
// Write tombstone covering the whole block. // Write tombstone covering the whole block.
err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0")) err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0"))

@ -91,6 +91,7 @@ func DefaultOptions() *Options {
EnableDelayedCompaction: false, EnableDelayedCompaction: false,
CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent,
CompactionDelay: time.Duration(0), CompactionDelay: time.Duration(0),
PostingsDecoderFactory: DefaultPostingsDecoderFactory,
} }
} }
@ -219,6 +220,10 @@ type Options struct {
// BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. // BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
BlockChunkQuerierFunc BlockChunkQuerierFunc BlockChunkQuerierFunc BlockChunkQuerierFunc
// PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta.
// By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder.
PostingsDecoderFactory PostingsDecoderFactory
} }
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
@ -633,7 +638,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed return nil, ErrClosed
default: default:
} }
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -731,7 +736,7 @@ func (db *DBReadOnly) LastBlockID() (string, error) {
} }
// Block returns a block reader by given block id. // Block returns a block reader by given block id.
func (db *DBReadOnly) Block(blockID string) (BlockReader, error) { func (db *DBReadOnly) Block(blockID string, postingsDecoderFactory PostingsDecoderFactory) (BlockReader, error) {
select { select {
case <-db.closed: case <-db.closed:
return nil, ErrClosed return nil, ErrClosed
@ -743,7 +748,7 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
return nil, fmt.Errorf("invalid block ID %s", blockID) return nil, fmt.Errorf("invalid block ID %s", blockID)
} }
block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil) block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil, postingsDecoderFactory)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -902,6 +907,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
PD: opts.PostingsDecoderFactory,
}) })
} }
if err != nil { if err != nil {
@ -1568,7 +1574,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock() db.mtx.Lock()
defer db.mtx.Unlock() defer db.mtx.Unlock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory)
if err != nil { if err != nil {
return err return err
} }
@ -1663,7 +1669,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil return nil
} }
func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir) bDirs, err := blockDirs(dir)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("find blocks: %w", err) return nil, nil, fmt.Errorf("find blocks: %w", err)
@ -1680,7 +1686,7 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.
// See if we already have the block in memory or open it otherwise. // See if we already have the block in memory or open it otherwise.
block, open := getBlock(loaded, meta.ULID) block, open := getBlock(loaded, meta.ULID)
if !open { if !open {
block, err = OpenBlock(l, bDir, chunkPool) block, err = OpenBlock(l, bDir, chunkPool, postingsDecoderFactory)
if err != nil { if err != nil {
corrupted[meta.ULID] = err corrupted[meta.ULID] = err
continue continue

@ -1320,7 +1320,7 @@ func TestTombstoneCleanFail(t *testing.T) {
totalBlocks := 2 totalBlocks := 2
for i := 0; i < totalBlocks; i++ { for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1)) blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
// Add some fake tombstones to trigger the compaction. // Add some fake tombstones to trigger the compaction.
tomb := tombstones.NewMemTombstones() tomb := tombstones.NewMemTombstones()
@ -1375,7 +1375,7 @@ func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
// Generate some blocks with old mint (near epoch). // Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ { for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil) block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err) require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well. // Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones() tomb := tombstones.NewMemTombstones()
@ -1436,7 +1436,7 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *
return []ulid.ULID{}, errors.New("the compactor already did the maximum allowed blocks so it is time to fail") return []ulid.ULID{}, errors.New("the compactor already did the maximum allowed blocks so it is time to fail")
} }
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil, nil)
require.NoError(c.t, err) require.NoError(c.t, err)
require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere. require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
@ -2509,13 +2509,13 @@ func TestDBReadOnly(t *testing.T) {
}) })
t.Run("block", func(t *testing.T) { t.Run("block", func(t *testing.T) {
blockID := expBlock.meta.ULID.String() blockID := expBlock.meta.ULID.String()
block, err := dbReadOnly.Block(blockID) block, err := dbReadOnly.Block(blockID, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch") require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch")
}) })
t.Run("invalid block ID", func(t *testing.T) { t.Run("invalid block ID", func(t *testing.T) {
blockID := "01GTDVZZF52NSWB5SXQF0P2PGF" blockID := "01GTDVZZF52NSWB5SXQF0P2PGF"
_, err := dbReadOnly.Block(blockID) _, err := dbReadOnly.Block(blockID, nil)
require.Error(t, err) require.Error(t, err)
}) })
t.Run("last block ID", func(t *testing.T) { t.Run("last block ID", func(t *testing.T) {
@ -8851,7 +8851,7 @@ func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
// Include blockID into series to identify which block got touched. // Include blockID into series to identify which block got touched.
serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})} serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})}
blockDir := createBlock(t, db.Dir(), serieses) blockDir := createBlock(t, db.Dir(), serieses)
b, err := OpenBlock(db.logger, blockDir, db.chunkPool) b, err := OpenBlock(db.logger, blockDir, db.chunkPool, nil)
require.NoError(t, err) require.NoError(t, err)
// Overwrite meta.json with compaction section for testing purpose. // Overwrite meta.json with compaction section for testing purpose.

@ -118,6 +118,8 @@ type symbolCacheEntry struct {
type PostingsEncoder func(*encoding.Encbuf, []uint32) error type PostingsEncoder func(*encoding.Encbuf, []uint32) error
type PostingsDecoder func(encoding.Decbuf) (int, Postings, error)
// Writer implements the IndexWriter interface for the standard // Writer implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type Writer struct { type Writer struct {
@ -1157,17 +1159,17 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// NewReader returns a new index reader on the given byte slice. It automatically // NewReader returns a new index reader on the given byte slice. It automatically
// handles different format versions. // handles different format versions.
func NewReader(b ByteSlice) (*Reader, error) { func NewReader(b ByteSlice, decoder PostingsDecoder) (*Reader, error) {
return newReader(b, io.NopCloser(nil)) return newReader(b, io.NopCloser(nil), decoder)
} }
// NewFileReader returns a new index reader against the given index file. // NewFileReader returns a new index reader against the given index file.
func NewFileReader(path string) (*Reader, error) { func NewFileReader(path string, decoder PostingsDecoder) (*Reader, error) {
f, err := fileutil.OpenMmapFile(path) f, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r, err := newReader(realByteSlice(f.Bytes()), f) r, err := newReader(realByteSlice(f.Bytes()), f, decoder)
if err != nil { if err != nil {
return nil, tsdb_errors.NewMulti( return nil, tsdb_errors.NewMulti(
err, err,
@ -1178,7 +1180,7 @@ func NewFileReader(path string) (*Reader, error) {
return r, nil return r, nil
} }
func newReader(b ByteSlice, c io.Closer) (*Reader, error) { func newReader(b ByteSlice, c io.Closer, postingsDecoder PostingsDecoder) (*Reader, error) {
r := &Reader{ r := &Reader{
b: b, b: b,
c: c, c: c,
@ -1277,7 +1279,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r.nameSymbols[off] = k r.nameSymbols[off] = k
} }
r.dec = &Decoder{LookupSymbol: r.lookupSymbol} r.dec = &Decoder{LookupSymbol: r.lookupSymbol, DecodePostings: postingsDecoder}
return r, nil return r, nil
} }
@ -1706,7 +1708,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
} }
// Read from the postings table. // Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) d := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d.Get()) _, p, err := r.dec.DecodePostings(d)
if err != nil { if err != nil {
return nil, fmt.Errorf("decode postings: %w", err) return nil, fmt.Errorf("decode postings: %w", err)
} }
@ -1749,7 +1751,7 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
if val == value { if val == value {
// Read from the postings table. // Read from the postings table.
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get()) _, p, err := r.dec.DecodePostings(d2)
if err != nil { if err != nil {
return false, fmt.Errorf("decode postings: %w", err) return false, fmt.Errorf("decode postings: %w", err)
} }
@ -1790,7 +1792,7 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
if match(val) { if match(val) {
// We want this postings iterator since the value is a match // We want this postings iterator since the value is a match
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec) _, p, err := r.dec.DecodePostings(postingsDec)
if err != nil { if err != nil {
return false, fmt.Errorf("decode postings: %w", err) return false, fmt.Errorf("decode postings: %w", err)
} }
@ -1823,7 +1825,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
// Read from the postings table. // Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable) d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(d) _, p, err := r.dec.DecodePostings(d)
if err != nil { if err != nil {
return ErrPostings(fmt.Errorf("decode postings: %w", err)) return ErrPostings(fmt.Errorf("decode postings: %w", err))
} }
@ -1918,17 +1920,12 @@ func (s stringListIter) Err() error { return nil }
// It currently does not contain decoding methods for all entry types but can be extended // It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand. // by them if there's demand.
type Decoder struct { type Decoder struct {
LookupSymbol func(context.Context, uint32) (string, error) LookupSymbol func(context.Context, uint32) (string, error)
} DecodePostings PostingsDecoder
// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := encoding.Decbuf{B: b}
return dec.PostingsFromDecbuf(d)
} }
// PostingsFromDecbuf returns a postings list for d and its number of elements. // DecodePostingsRaw returns a postings list for d and its number of elements.
func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) { func DecodePostingsRaw(d encoding.Decbuf) (int, Postings, error) {
n := d.Be32int() n := d.Be32int()
l := d.Get() l := d.Get()
if d.Err() != nil { if d.Err() != nil {

@ -146,7 +146,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, iw.Close()) require.NoError(t, iw.Close())
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn, DecodePostingsRaw)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, ir.Close()) require.NoError(t, ir.Close())
@ -157,7 +157,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
f.Close() f.Close()
_, err = NewFileReader(dir) _, err = NewFileReader(dir, DecodePostingsRaw)
require.Error(t, err) require.Error(t, err)
} }
@ -218,7 +218,7 @@ func TestIndexRW_Postings(t *testing.T) {
}, labelIndices) }, labelIndices)
t.Run("ShardedPostings()", func(t *testing.T) { t.Run("ShardedPostings()", func(t *testing.T) {
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn, DecodePostingsRaw)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, ir.Close()) require.NoError(t, ir.Close())
@ -469,7 +469,7 @@ func TestDecbufUvarintWithInvalidBuffer(t *testing.T) {
func TestReaderWithInvalidBuffer(t *testing.T) { func TestReaderWithInvalidBuffer(t *testing.T) {
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
_, err := NewReader(b) _, err := NewReader(b, DecodePostingsRaw)
require.Error(t, err) require.Error(t, err)
} }
@ -481,7 +481,7 @@ func TestNewFileReaderErrorNoOpenFiles(t *testing.T) {
err := os.WriteFile(idxName, []byte("corrupted contents"), 0o666) err := os.WriteFile(idxName, []byte("corrupted contents"), 0o666)
require.NoError(t, err) require.NoError(t, err)
_, err = NewFileReader(idxName) _, err = NewFileReader(idxName, DecodePostingsRaw)
require.Error(t, err) require.Error(t, err)
// dir.Close will fail on Win if idxName fd is not closed on error path. // dir.Close will fail on Win if idxName fd is not closed on error path.
@ -560,7 +560,8 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
} }
func TestDecoder_Postings_WrongInput(t *testing.T) { func TestDecoder_Postings_WrongInput(t *testing.T) {
_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) d := encoding.Decbuf{B: []byte("the cake is a lie")}
_, _, err := (&Decoder{DecodePostings: DecodePostingsRaw}).DecodePostings(d)
require.Error(t, err) require.Error(t, err)
} }
@ -690,7 +691,7 @@ func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSerie
} }
require.NoError(tb, iw.Close()) require.NoError(tb, iw.Close())
ir, err := NewFileReader(fn) ir, err := NewFileReader(fn, DecodePostingsRaw)
require.NoError(tb, err) require.NoError(tb, err)
tb.Cleanup(func() { tb.Cleanup(func() {
require.NoError(tb, ir.Close()) require.NoError(tb, ir.Close())

@ -75,7 +75,7 @@ func BenchmarkQuerier(b *testing.B) {
b.Run("Block", func(b *testing.B) { b.Run("Block", func(b *testing.B) {
blockdir := createBlockFromHead(b, b.TempDir(), h) blockdir := createBlockFromHead(b, b.TempDir(), h)
block, err := OpenBlock(nil, blockdir, nil) block, err := OpenBlock(nil, blockdir, nil, nil)
require.NoError(b, err) require.NoError(b, err)
defer func() { defer func() {
require.NoError(b, block.Close()) require.NoError(b, block.Close())
@ -315,7 +315,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
tmpdir := b.TempDir() tmpdir := b.TempDir()
blockdir := createBlockFromHead(b, tmpdir, h) blockdir := createBlockFromHead(b, tmpdir, h)
block, err := OpenBlock(nil, blockdir, nil) block, err := OpenBlock(nil, blockdir, nil, nil)
require.NoError(b, err) require.NoError(b, err)
defer func() { defer func() {
require.NoError(b, block.Close()) require.NoError(b, block.Close())

@ -2443,7 +2443,7 @@ func BenchmarkQueryIterator(b *testing.B) {
} else { } else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt) generatedSeries = populateSeries(prefilledLabels, mint, maxt)
} }
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err) require.NoError(b, err)
blocks = append(blocks, block) blocks = append(blocks, block)
defer block.Close() defer block.Close()
@ -2506,7 +2506,7 @@ func BenchmarkQuerySeek(b *testing.B) {
} else { } else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt) generatedSeries = populateSeries(prefilledLabels, mint, maxt)
} }
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err) require.NoError(b, err)
blocks = append(blocks, block) blocks = append(blocks, block)
defer block.Close() defer block.Close()
@ -2641,7 +2641,7 @@ func BenchmarkSetMatcher(b *testing.B) {
} else { } else {
generatedSeries = populateSeries(prefilledLabels, mint, maxt) generatedSeries = populateSeries(prefilledLabels, mint, maxt)
} }
block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil, nil)
require.NoError(b, err) require.NoError(b, err)
blocks = append(blocks, block) blocks = append(blocks, block)
defer block.Close() defer block.Close()
@ -3209,7 +3209,7 @@ func BenchmarkQueries(b *testing.B) {
qs := make([]storage.Querier, 0, 10) qs := make([]storage.Querier, 0, 10)
for x := 0; x <= 10; x++ { for x := 0; x <= 10; x++ {
block, err := OpenBlock(nil, createBlock(b, dir, series), nil) block, err := OpenBlock(nil, createBlock(b, dir, series), nil, nil)
require.NoError(b, err) require.NoError(b, err)
q, err := NewBlockQuerier(block, 1, nSamples) q, err := NewBlockQuerier(block, 1, nSamples)
require.NoError(b, err) require.NoError(b, err)
@ -3792,7 +3792,7 @@ func (m mockReaderOfLabels) Symbols() index.StringIter {
// https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case) // https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case)
// alters the passed matchers. // alters the passed matchers.
func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) { func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) {
block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil) block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil, nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, block.Close()) require.NoError(t, block.Close())

@ -79,7 +79,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0o777)) require.NoError(t, os.MkdirAll(filepath.Join(tmpDbDir, "chunks"), 0o777))
// Read current index to check integrity. // Read current index to check integrity.
r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) r, err := index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), index.DecodePostingsRaw)
require.NoError(t, err) require.NoError(t, err)
p, err := r.Postings(ctx, "b", "1") p, err := r.Postings(ctx, "b", "1")
require.NoError(t, err) require.NoError(t, err)
@ -97,7 +97,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
db.Close() db.Close()
r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename), index.DecodePostingsRaw)
require.NoError(t, err) require.NoError(t, err)
defer r.Close() defer r.Close()
p, err = r.Postings(ctx, "b", "1") p, err = r.Postings(ctx, "b", "1")

Loading…
Cancel
Save