Browse Source

[ENHANCEMENT] TSDB: Optimize querying with regexp matchers

Add method `PostingsForLabelMatching` to `tsdb.IndexReader`, to obtain postings for labels with a certain name and values accepted by a provided callback, and use it from `tsdb.PostingsForMatchers`.
The intention is to optimize regexp matcher paths, especially not having to load all label values before matching on them.

Plus tests, and refactor some `tsdb/index.Reader` methods.

Benchmarking shows memory reduction up to ~100%, and speedup of up to ~50%.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/14073/head
Arve Knudsen 7 months ago committed by GitHub
parent
commit
5c4310aa37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 8
      tsdb/block.go
  3. 81
      tsdb/block_test.go
  4. 4
      tsdb/head_read.go
  5. 23
      tsdb/head_read_test.go
  6. 170
      tsdb/index/index.go
  7. 29
      tsdb/index/postings.go
  8. 4
      tsdb/ooo_head_read.go
  9. 19
      tsdb/querier.go
  10. 14
      tsdb/querier_test.go

1
CHANGELOG.md

@ -5,6 +5,7 @@
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
* [BUGFIX] OTLP: Don't generate target_info unless at least one identifying label is defined. #13991
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics. #13991

8
tsdb/block.go

@ -77,6 +77,10 @@ type IndexReader interface {
// during background garbage collections.
Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings
// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
@ -518,6 +522,10 @@ func (r blockIndexReader) Postings(ctx context.Context, name string, values ...s
return p, nil
}
func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
return r.ir.PostingsForLabelMatching(ctx, name, match)
}
func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}

81
tsdb/block_test.go

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/wlog"
)
@ -509,6 +510,86 @@ func TestLabelNamesWithMatchers(t *testing.T) {
}
}
func TestBlockIndexReader_PostingsForLabelMatching(t *testing.T) {
testPostingsForLabelMatching(t, 2, func(t *testing.T, series []labels.Labels) IndexReader {
var seriesEntries []storage.Series
for _, s := range series {
seriesEntries = append(seriesEntries, storage.NewListSeries(s, []chunks.Sample{sample{100, 0, nil, nil}}))
}
blockDir := createBlock(t, t.TempDir(), seriesEntries)
files, err := sequenceFiles(chunkDir(blockDir))
require.NoError(t, err)
require.NotEmpty(t, files, "No chunk created.")
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })
ir, err := block.Index()
require.NoError(t, err)
return ir
})
}
func testPostingsForLabelMatching(t *testing.T, offset storage.SeriesRef, setUp func(*testing.T, []labels.Labels) IndexReader) {
t.Helper()
ctx := context.Background()
series := []labels.Labels{
labels.FromStrings("n", "1"),
labels.FromStrings("n", "1", "i", "a"),
labels.FromStrings("n", "1", "i", "b"),
labels.FromStrings("n", "2"),
labels.FromStrings("n", "2.5"),
}
ir := setUp(t, series)
t.Cleanup(func() {
require.NoError(t, ir.Close())
})
testCases := []struct {
name string
labelName string
match func(string) bool
exp []storage.SeriesRef
}{
{
name: "n=1",
labelName: "n",
match: func(val string) bool {
return val == "1"
},
exp: []storage.SeriesRef{offset + 1, offset + 2, offset + 3},
},
{
name: "n=2",
labelName: "n",
match: func(val string) bool {
return val == "2"
},
exp: []storage.SeriesRef{offset + 4},
},
{
name: "missing label",
labelName: "missing",
match: func(val string) bool {
return true
},
exp: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
p := ir.PostingsForLabelMatching(ctx, tc.labelName, tc.match)
require.NotNil(t, p)
srs, err := index.ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, tc.exp, srs)
})
}
}
// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger())

4
tsdb/head_read.go

@ -121,6 +121,10 @@ func (h *headIndexReader) Postings(ctx context.Context, name string, values ...s
}
}
func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)

23
tsdb/head_read_test.go

@ -14,6 +14,7 @@
package tsdb
import (
"context"
"fmt"
"sync"
"testing"
@ -552,3 +553,25 @@ func TestMemSeries_chunk(t *testing.T) {
})
}
}
func TestHeadIndexReader_PostingsForLabelMatching(t *testing.T) {
testPostingsForLabelMatching(t, 0, func(t *testing.T, series []labels.Labels) IndexReader {
opts := DefaultHeadOptions()
opts.ChunkRange = 1000
opts.ChunkDirRoot = t.TempDir()
h, err := NewHead(nil, nil, nil, nil, opts, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
app := h.Appender(context.Background())
for _, s := range series {
app.Append(0, s, 0, 0)
}
require.NoError(t, app.Commit())
ir, err := h.Index()
require.NoError(t, err)
return ir
})
}

170
tsdb/index/index.go

@ -1536,36 +1536,14 @@ func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labe
if len(e) == 0 {
return nil, nil
}
values := make([]string, 0, len(e)*symbolFactor)
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(e[0].off)
values := make([]string, 0, len(e)*symbolFactor)
lastVal := e[len(e)-1].value
skip := 0
for d.Err() == nil && ctx.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}
s := yoloString(d.UvarintBytes()) // Label value.
values = append(values, s)
if s == lastVal {
break
}
d.Uvarint64() // Offset.
}
if d.Err() != nil {
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
}
return values, ctx.Err()
err := r.traversePostingOffsets(ctx, e[0].off, func(val string, _ uint64) (bool, error) {
values = append(values, val)
return val != lastVal, nil
})
return values, err
}
// LabelNamesFor returns all the label names for the series referred to by IDs.
@ -1662,6 +1640,44 @@ func (r *Reader) Series(id storage.SeriesRef, builder *labels.ScratchBuilder, ch
return nil
}
// traversePostingOffsets traverses r's posting offsets table, starting at off, and calls cb with every label value and postings offset.
// If cb returns false (or an error), the traversing is interrupted.
func (r *Reader) traversePostingOffsets(ctx context.Context, off int, cb func(string, uint64) (bool, error)) error {
// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(off)
skip := 0
ctxErr := ctx.Err()
for d.Err() == nil && ctxErr == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than to parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}
v := yoloString(d.UvarintBytes()) // Label value.
postingsOff := d.Uvarint64() // Offset.
if ok, err := cb(v, postingsOff); err != nil {
return err
} else if !ok {
break
}
ctxErr = ctx.Err()
}
if d.Err() != nil {
return fmt.Errorf("get postings offset entry: %w", d.Err())
}
if ctxErr != nil {
return fmt.Errorf("get postings offset entry: %w", ctxErr)
}
return nil
}
func (r *Reader) Postings(ctx context.Context, name string, values ...string) (Postings, error) {
if r.version == FormatV1 {
e, ok := r.postingsV1[name]
@ -1696,7 +1712,6 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
slices.Sort(values) // Values must be in order so we can step through the table on disk.
res := make([]Postings, 0, len(values))
skip := 0
valueIndex := 0
for valueIndex < len(values) && values[valueIndex] < e[0].value {
// Discard values before the start.
@ -1714,33 +1729,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
// Need to look from previous entry.
i--
}
// Don't Crc32 the entire postings offset table, this is very slow
// so hope any issues were caught at startup.
d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsTable), nil)
d.Skip(e[i].off)
// Iterate on the offset table.
var postingsOff uint64 // The offset into the postings table.
for d.Err() == nil && ctx.Err() == nil {
if skip == 0 {
// These are always the same number of bytes,
// and it's faster to skip than parse.
skip = d.Len()
d.Uvarint() // Keycount.
d.UvarintBytes() // Label name.
skip -= d.Len()
} else {
d.Skip(skip)
}
v := d.UvarintBytes() // Label value.
postingsOff = d.Uvarint64() // Offset.
for string(v) >= value {
if string(v) == value {
if err := r.traversePostingOffsets(ctx, e[i].off, func(val string, postingsOff uint64) (bool, error) {
for val >= value {
if val == value {
// Read from the postings table.
d2 := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.Postings(d2.Get())
if err != nil {
return nil, fmt.Errorf("decode postings: %w", err)
return false, fmt.Errorf("decode postings: %w", err)
}
res = append(res, p)
}
@ -1752,18 +1749,70 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
}
if i+1 == len(e) || value >= e[i+1].value || valueIndex == len(values) {
// Need to go to a later postings offset entry, if there is one.
break
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}
if d.Err() != nil {
return nil, fmt.Errorf("get postings offset entry: %w", d.Err())
}
return Merge(ctx, res...), nil
}
func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match)
}
e := r.postings[name]
if len(e) == 0 {
return EmptyPostings()
}
lastVal := e[len(e)-1].value
var its []Postings
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) {
// We want this postings iterator since the value is a match
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
if err != nil {
return false, fmt.Errorf("decode postings: %w", err)
}
its = append(its, p)
}
if ctx.Err() != nil {
return nil, fmt.Errorf("get postings offset entry: %w", ctx.Err())
return val != lastVal, nil
}); err != nil {
return ErrPostings(err)
}
return Merge(ctx, its...)
}
func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, match func(string) bool) Postings {
e := r.postingsV1[name]
if len(e) == 0 {
return EmptyPostings()
}
var its []Postings
for val, offset := range e {
if !match(val) {
continue
}
// Read from the postings table.
d := encoding.NewDecbufAt(r.b, int(offset), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(d)
if err != nil {
return ErrPostings(fmt.Errorf("decode postings: %w", err))
}
its = append(its, p)
}
return Merge(ctx, res...), nil
return Merge(ctx, its...)
}
// SortedPostings returns the given postings list reordered so that the backing series
@ -1856,6 +1905,11 @@ type Decoder struct {
// Postings returns a postings list for b and its number of elements.
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
d := encoding.Decbuf{B: b}
return dec.PostingsFromDecbuf(d)
}
// PostingsFromDecbuf returns a postings list for d and its number of elements.
func (dec *Decoder) PostingsFromDecbuf(d encoding.Decbuf) (int, Postings, error) {
n := d.Be32int()
l := d.Get()
if d.Err() != nil {

29
tsdb/index/postings.go

@ -397,6 +397,35 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) {
}
}
func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
p.mtx.RLock()
e := p.m[name]
if len(e) == 0 {
p.mtx.RUnlock()
return EmptyPostings()
}
// Benchmarking shows that first copying the values into a slice and then matching over that is
// faster than matching over the map keys directly, at least on AMD64.
vals := make([]string, 0, len(e))
for v, srs := range e {
if len(srs) > 0 {
vals = append(vals, v)
}
}
var its []Postings
for _, v := range vals {
if match(v) {
its = append(its, NewListPostings(e[v]))
}
}
p.mtx.RUnlock()
return Merge(ctx, its...)
}
// ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []storage.SeriesRef, err error) {
for p.Next() {

4
tsdb/ooo_head_read.go

@ -446,6 +446,10 @@ func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string,
return index.NewListPostings(ir.ch.postings), nil
}
func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p

19
tsdb/querier.go

@ -326,23 +326,8 @@ func postingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Matcher)
}
}
vals, err := ix.LabelValues(ctx, m.Name)
if err != nil {
return nil, err
}
res := vals[:0]
for _, val := range vals {
if m.Matches(val) {
res = append(res, val)
}
}
if len(res) == 0 {
return index.EmptyPostings(), nil
}
return ix.Postings(ctx, m.Name, res...)
it := ix.PostingsForLabelMatching(ctx, m.Name, m.Matches)
return it, it.Err()
}
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.

14
tsdb/querier_test.go

@ -2326,6 +2326,16 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
return index.NewListPostings(ep)
}
func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name && match(l.Value) {
res = append(res, index.NewListPostings(srs))
}
}
return index.Merge(ctx, res...)
}
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
out := make([]storage.SeriesRef, 0, 128)
@ -3238,6 +3248,10 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
return []string{}, nil
}
func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
}
func TestPostingsForMatcher(t *testing.T) {
ctx := context.Background()

Loading…
Cancel
Save