diff --git a/promql/engine_test.go b/promql/engine_test.go index c49d92623..6074a48a6 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -181,9 +181,11 @@ type errQuerier struct { func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return errSeriesSet{err: q.err} } -func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) Close() error { return nil } +func (*errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} +func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) Close() error { return nil } // errSeriesSet implements storage.SeriesSet which always returns error. type errSeriesSet struct { diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 0811785e6..cd8877019 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -230,7 +230,7 @@ func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage return storage.ErrSeriesSet(errSelect) } -func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (errQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { return nil, nil, errors.New("label values error") } diff --git a/storage/interface.go b/storage/interface.go index e697d57d1..711c253a7 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -95,8 +95,9 @@ type ChunkQuerier interface { type LabelQuerier interface { // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. - // TODO(yeya24): support matchers or hints. - LabelValues(name string) ([]string, Warnings, error) + // If matchers are specified the returned result set is reduced + // to label values of metrics matching the matchers. + LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) // LabelNames returns all the unique label names present in the block in sorted order. // TODO(yeya24): support matchers or hints. diff --git a/storage/merge.go b/storage/merge.go index 27e701883..c57a24c76 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -155,8 +155,10 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ } // LabelValues returns all potential values for a label name. -func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) { - res, ws, err := q.lvals(q.queriers, name) +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (q *mergeGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) { + res, ws, err := q.lvals(q.queriers, name, matchers...) if err != nil { return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name) } @@ -164,22 +166,22 @@ func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, erro } // lvals performs merge sort for LabelValues from multiple queriers. -func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string) ([]string, Warnings, error) { +func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, Warnings, error) { if lq.Len() == 0 { return nil, nil, nil } if lq.Len() == 1 { - return lq.Get(0).LabelValues(n) + return lq.Get(0).LabelValues(n, matchers...) } a, b := lq.SplitByHalf() var ws Warnings - s1, w, err := q.lvals(a, n) + s1, w, err := q.lvals(a, n, matchers...) ws = append(ws, w...) if err != nil { return nil, ws, err } - s2, ws, err := q.lvals(b, n) + s2, ws, err := q.lvals(b, n, matchers...) ws = append(ws, w...) if err != nil { return nil, ws, err diff --git a/storage/merge_test.go b/storage/merge_test.go index cf8b4634f..415b7fb60 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -719,7 +719,7 @@ type mockGenericQuerier struct { closed bool labelNamesCalls int - labelNamesRequested []string + labelNamesRequested []labelNameRequest sortedSeriesRequested []bool resp []string @@ -727,6 +727,11 @@ type mockGenericQuerier struct { err error } +type labelNameRequest struct { + name string + matchers []*labels.Matcher +} + func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet { m.mtx.Lock() m.sortedSeriesRequested = append(m.sortedSeriesRequested, b) @@ -734,9 +739,12 @@ func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err} } -func (m *mockGenericQuerier) LabelValues(name string) ([]string, Warnings, error) { +func (m *mockGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) { m.mtx.Lock() - m.labelNamesRequested = append(m.labelNamesRequested, name) + m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{ + name: name, + matchers: matchers, + }) m.mtx.Unlock() return m.resp, m.warnings, m.err } @@ -808,8 +816,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedSelectsSeries []labels.Labels expectedLabels []string - expectedWarnings [3]Warnings - expectedErrs [3]error + expectedWarnings [4]Warnings + expectedErrs [4]error }{ {}, { @@ -837,7 +845,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { { name: "one failed primary querier", queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}}, - expectedErrs: [3]error{errStorage, errStorage, errStorage}, + expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with successful secondaries", @@ -873,7 +881,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}}, &secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}}, }, - expectedErrs: [3]error{errStorage, errStorage, errStorage}, + expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage}, }, { name: "one successful primary querier with failed secondaries", @@ -886,7 +894,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { labels.FromStrings("test", "a"), }, expectedLabels: []string{"a"}, - expectedWarnings: [3]Warnings{ + expectedWarnings: [4]Warnings{ + []error{errStorage, errStorage}, []error{errStorage, errStorage}, []error{errStorage, errStorage}, []error{errStorage, errStorage}, @@ -903,7 +912,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { labels.FromStrings("test", "b"), }, expectedLabels: []string{"a", "b"}, - expectedWarnings: [3]Warnings{ + expectedWarnings: [4]Warnings{ + []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, []error{warnStorage, warnStorage}, @@ -964,7 +974,26 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { for _, qr := range q.queriers { m := unwrapMockGenericQuerier(t, qr) - require.Equal(t, []string{"test"}, m.labelNamesRequested) + require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested) + } + }) + t.Run("LabelValuesWithMatchers", func(t *testing.T) { + matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") + res, w, err := q.LabelValues("test2", matcher) + require.Equal(t, tcase.expectedWarnings[3], w) + require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match") + require.Equal(t, tcase.expectedLabels, res) + + if err != nil { + return + } + for _, qr := range q.queriers { + m := unwrapMockGenericQuerier(t, qr) + + require.Equal(t, []labelNameRequest{ + {name: "test"}, + {name: "test2", matchers: []*labels.Matcher{matcher}}, + }, m.labelNamesRequested) } }) }) diff --git a/storage/noop.go b/storage/noop.go index 00599aba7..3f800e76c 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -28,7 +28,7 @@ func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) SeriesSet { return NoopSeriesSet() } -func (noopQuerier) LabelValues(string) ([]string, Warnings, error) { +func (noopQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) { return nil, nil, nil } @@ -51,7 +51,7 @@ func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) ChunkSeri return NoopChunkedSeriesSet() } -func (noopChunkQuerier) LabelValues(string) ([]string, Warnings, error) { +func (noopChunkQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) { return nil, nil, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 4718b4797..94eab01cf 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -206,7 +206,7 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, lab } // LabelValues implements storage.Querier and is a noop. -func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { +func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) { // TODO: Implement: https://github.com/prometheus/prometheus/issues/3351 return nil, nil, errors.New("not implemented") } diff --git a/storage/secondary.go b/storage/secondary.go index 9e768b349..2586a7744 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -47,8 +47,8 @@ func newSecondaryQuerierFromChunk(cq ChunkQuerier) genericQuerier { return &secondaryQuerier{genericQuerier: newGenericQuerierFromChunk(cq)} } -func (s *secondaryQuerier) LabelValues(name string) ([]string, Warnings, error) { - vals, w, err := s.genericQuerier.LabelValues(name) +func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) { + vals, w, err := s.genericQuerier.LabelValues(name, matchers...) if err != nil { return nil, append([]error{err}, w...), nil } diff --git a/tsdb/block.go b/tsdb/block.go index 3ec226197..7ae8d5bbf 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sort" "sync" "github.com/go-kit/kit/log" @@ -63,10 +64,10 @@ type IndexReader interface { Symbols() index.StringIter // SortedLabelValues returns sorted possible label values. - SortedLabelValues(name string) ([]string, error) + SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) // LabelValues returns possible label values which may not be sorted. - LabelValues(name string) ([]string, error) + LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. @@ -86,6 +87,11 @@ type IndexReader interface { // LabelNames returns all the unique label names present in the index in sorted order. LabelNames() ([]string, error) + // LabelValueFor returns label value for the given label name in the series referred to by ID. + // If the series couldn't be found or the series doesn't have the requested label a + // storage.ErrNotFound is returned as error. + LabelValueFor(id uint64, label string) (string, error) + // Close releases the underlying resources of the reader. Close() error } @@ -415,14 +421,29 @@ func (r blockIndexReader) Symbols() index.StringIter { return r.ir.Symbols() } -func (r blockIndexReader) SortedLabelValues(name string) ([]string, error) { - st, err := r.ir.SortedLabelValues(name) +func (r blockIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + var st []string + var err error + + if len(matchers) == 0 { + st, err = r.ir.SortedLabelValues(name) + } else { + st, err = r.LabelValues(name, matchers...) + if err == nil { + sort.Strings(st) + } + } + return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) } -func (r blockIndexReader) LabelValues(name string) ([]string, error) { - st, err := r.ir.LabelValues(name) - return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) +func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + if len(matchers) == 0 { + st, err := r.ir.LabelValues(name) + return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID) + } + + return labelValuesWithMatchers(r, name, matchers...) } func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) { @@ -453,6 +474,11 @@ func (r blockIndexReader) Close() error { return nil } +// LabelValueFor returns label value for the given label name in the series referred to by ID. +func (r blockIndexReader) LabelValueFor(id uint64, label string) (string, error) { + return r.ir.LabelValueFor(id, label) +} + type blockTombstoneReader struct { tombstones.Reader b *Block diff --git a/tsdb/block_test.go b/tsdb/block_test.go index ffaa6b7c5..b3fae03d2 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -17,11 +17,13 @@ import ( "context" "encoding/binary" "errors" + "fmt" "hash/crc32" "io/ioutil" "math/rand" "os" "path/filepath" + "sort" "strconv" "testing" @@ -212,6 +214,78 @@ func TestCorruptedChunk(t *testing.T) { } } +func TestLabelValuesWithMatchers(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test_block_label_values_with_matchers") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(tmpdir)) + }() + + var seriesEntries []storage.Series + for i := 0; i < 100; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.Labels{ + {Name: "unique", Value: fmt.Sprintf("value%d", i)}, + {Name: "tens", Value: fmt.Sprintf("value%d", i/10)}, + }, []tsdbutil.Sample{sample{100, 0}})) + } + + blockDir := createBlock(t, tmpdir, seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(t, err) + require.Greater(t, len(files), 0, "No chunk created.") + + // Check open err. + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + defer func() { require.NoError(t, block.Close()) }() + + indexReader, err := block.Index() + require.NoError(t, err) + defer func() { require.NoError(t, indexReader.Close()) }() + + testCases := []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique ids based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for absence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + actualValues, err := indexReader.SortedLabelValues(tt.labelName, tt.matchers...) + require.NoError(t, err) + require.Equal(t, tt.expectedValues, actualValues) + + actualValues, err = indexReader.LabelValues(tt.labelName, tt.matchers...) + sort.Strings(actualValues) + require.NoError(t, err) + require.Equal(t, tt.expectedValues, actualValues) + }) + } +} + // TestBlockSize ensures that the block size is calculated correctly. func TestBlockSize(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_blockSize") @@ -301,6 +375,49 @@ func TestReadIndexFormatV1(t *testing.T) { }) } +func BenchmarkLabelValuesWithMatchers(b *testing.B) { + tmpdir, err := ioutil.TempDir("", "bench_block_label_values_with_matchers") + require.NoError(b, err) + defer func() { + require.NoError(b, os.RemoveAll(tmpdir)) + }() + + var seriesEntries []storage.Series + metricCount := 1000000 + for i := 0; i < metricCount; i++ { + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.Labels{ + {Name: "unique", Value: fmt.Sprintf("value%d", i)}, + {Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, + {Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1" + }, []tsdbutil.Sample{sample{100, 0}})) + } + + blockDir := createBlock(b, tmpdir, seriesEntries) + files, err := sequenceFiles(chunkDir(blockDir)) + require.NoError(b, err) + require.Greater(b, len(files), 0, "No chunk created.") + + // Check open err. + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(b, err) + defer func() { require.NoError(b, block.Close()) }() + + indexReader, err := block.Index() + require.NoError(b, err) + defer func() { require.NoError(b, indexReader.Close()) }() + + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")} + + b.ResetTimer() + b.ReportAllocs() + + for benchIdx := 0; benchIdx < b.N; benchIdx++ { + actualValues, err := indexReader.LabelValues("tens", matchers...) + require.NoError(b, err) + require.Equal(b, 9, len(actualValues)) + } +} + // createBlock creates a block with given set of series and returns its dir. func createBlock(tb testing.TB, dir string, series []storage.Series) string { blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger()) diff --git a/tsdb/head.go b/tsdb/head.go index 138f8adfe..4352d86d9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1635,8 +1635,10 @@ func (h *headIndexReader) Symbols() index.StringIter { // SortedLabelValues returns label values present in the head for the // specific label name that are within the time range mint to maxt. -func (h *headIndexReader) SortedLabelValues(name string) ([]string, error) { - values, err := h.LabelValues(name) +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + values, err := h.LabelValues(name, matchers...) if err == nil { sort.Strings(values) } @@ -1645,15 +1647,20 @@ func (h *headIndexReader) SortedLabelValues(name string) ([]string, error) { // LabelValues returns label values present in the head for the // specific label name that are within the time range mint to maxt. -func (h *headIndexReader) LabelValues(name string) ([]string, error) { - h.head.symMtx.RLock() - defer h.head.symMtx.RUnlock() +// If matchers are specified the returned result set is reduced +// to label values of metrics matching the matchers. +func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() { return []string{}, nil } - values := h.head.postings.LabelValues(name) - return values, nil + if len(matchers) == 0 { + h.head.symMtx.RLock() + defer h.head.symMtx.RUnlock() + return h.head.postings.LabelValues(name), nil + } + + return labelValuesWithMatchers(h, name, matchers...) } // LabelNames returns all the unique label names present in the head @@ -1746,6 +1753,21 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks return nil } +// LabelValueFor returns label value for the given label name in the series referred to by ID. +func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error) { + memSeries := h.head.series.getByID(id) + if memSeries == nil { + return "", storage.ErrNotFound + } + + value := memSeries.lset.Get(label) + if value == "" { + return "", storage.ErrNotFound + } + + return value, nil +} + func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) { // Just using `getOrSet` below would be semantically sufficient, but we'd create // a new series on every sample inserted via Add(), which causes allocations diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 7f254e6c7..f534adc59 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1884,6 +1884,67 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { } } +func TestHeadLabelValuesWithMatchers(t *testing.T) { + head, _ := newTestHead(t, 1000, false) + defer func() { + require.NoError(t, head.Close()) + }() + + app := head.Appender(context.Background()) + for i := 0; i < 100; i++ { + _, err := app.Add(labels.Labels{ + {Name: "unique", Value: fmt.Sprintf("value%d", i)}, + {Name: "tens", Value: fmt.Sprintf("value%d", i/10)}, + }, 100, 0) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + var testCases = []struct { + name string + labelName string + matchers []*labels.Matcher + expectedValues []string + }{ + { + name: "get tens based on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, + expectedValues: []string{"value3"}, + }, { + name: "get unique ids based on a ten", + labelName: "unique", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, + expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, + }, { + name: "get tens by pattern matching on unique id", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, + expectedValues: []string{"value5", "value6", "value7"}, + }, { + name: "get tens by matching for absence of unique label", + labelName: "tens", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, + expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + headIdxReader := head.indexRange(0, 200) + + actualValues, err := headIdxReader.SortedLabelValues(tt.labelName, tt.matchers...) + require.NoError(t, err) + require.Equal(t, tt.expectedValues, actualValues) + + actualValues, err = headIdxReader.LabelValues(tt.labelName, tt.matchers...) + sort.Strings(actualValues) + require.NoError(t, err) + require.Equal(t, tt.expectedValues, actualValues) + }) + } +} + func TestErrReuseAppender(t *testing.T) { head, _ := newTestHead(t, 1000, false) defer func() { @@ -1952,3 +2013,34 @@ func TestHeadMintAfterTruncation(t *testing.T) { require.NoError(t, head.Close()) } + +func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { + chunkRange := int64(2000) + head, _ := newTestHead(b, chunkRange, false) + b.Cleanup(func() { require.NoError(b, head.Close()) }) + + app := head.Appender(context.Background()) + + metricCount := 1000000 + for i := 0; i < metricCount; i++ { + _, err := app.Add(labels.Labels{ + {Name: "unique", Value: fmt.Sprintf("value%d", i)}, + {Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, + {Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1" + }, 100, 0) + require.NoError(b, err) + } + require.NoError(b, app.Commit()) + + headIdxReader := head.indexRange(0, 200) + matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")} + + b.ResetTimer() + b.ReportAllocs() + + for benchIdx := 0; benchIdx < b.N; benchIdx++ { + actualValues, err := headIdxReader.LabelValues("tens", matchers...) + require.NoError(b, err) + require.Equal(b, 9, len(actualValues)) + } +} diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 70184cbe1..a6ade9455 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -31,6 +31,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -1443,8 +1444,8 @@ func (r *Reader) SymbolTableSize() uint64 { // SortedLabelValues returns value tuples that exist for the given label name. // It is not safe to use the return value beyond the lifetime of the byte slice // passed into the Reader. -func (r *Reader) SortedLabelValues(name string) ([]string, error) { - values, err := r.LabelValues(name) +func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + values, err := r.LabelValues(name, matchers...) if err == nil && r.version == FormatV1 { sort.Strings(values) } @@ -1454,7 +1455,12 @@ func (r *Reader) SortedLabelValues(name string) ([]string, error) { // LabelValues returns value tuples that exist for the given label name. // It is not safe to use the return value beyond the lifetime of the byte slice // passed into the Reader. -func (r *Reader) LabelValues(name string) ([]string, error) { +// TODO(replay): Support filtering by matchers +func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + if len(matchers) > 0 { + return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers) + } + if r.version == FormatV1 { e, ok := r.postingsV1[name] if !ok { @@ -1505,6 +1511,32 @@ func (r *Reader) LabelValues(name string) ([]string, error) { return values, nil } +// LabelValueFor returns label value for the given label name in the series referred to by ID. +func (r *Reader) LabelValueFor(id uint64, label string) (string, error) { + offset := id + // In version 2 series IDs are no longer exact references but series are 16-byte padded + // and the ID is the multiple of 16 of the actual position. + if r.version == FormatV2 { + offset = id * 16 + } + d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable) + buf := d.Get() + if d.Err() != nil { + return "", errors.Wrap(d.Err(), "label values for") + } + + value, err := r.dec.LabelValueFor(buf, label) + if err != nil { + return "", storage.ErrNotFound + } + + if value == "" { + return "", storage.ErrNotFound + } + + return value, nil +} + // Series reads the series with the given ID and writes its labels and chunks into lbls and chks. func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { offset := id @@ -1683,6 +1715,37 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) { return n, newBigEndianPostings(l), d.Err() } +// LabelValueFor decodes a label for a given series. +func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) { + d := encoding.Decbuf{B: b} + k := d.Uvarint() + + for i := 0; i < k; i++ { + lno := uint32(d.Uvarint()) + lvo := uint32(d.Uvarint()) + + if d.Err() != nil { + return "", errors.Wrap(d.Err(), "read series label offsets") + } + + ln, err := dec.LookupSymbol(lno) + if err != nil { + return "", errors.Wrap(err, "lookup label name") + } + + if ln == label { + lv, err := dec.LookupSymbol(lvo) + if err != nil { + return "", errors.Wrap(err, "lookup label value") + } + + return lv, nil + } + } + + return "", d.Err() +} + // Series decodes a series entry from the given byte slice into lset and chks. func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { *lbls = (*lbls)[:0] diff --git a/tsdb/querier.go b/tsdb/querier.go index e27c9cf5f..99038cc3f 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -83,8 +83,8 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er }, nil } -func (q *blockBaseQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { - res, err := q.index.SortedLabelValues(name) +func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + res, err := q.index.SortedLabelValues(name, matchers...) return res, nil, err } @@ -369,6 +369,44 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting return ix.Postings(m.Name, res...) } +func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) { + // We're only interested in metrics which have the label . + requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "") + if err != nil { + return nil, errors.Wrapf(err, "Failed to instantiate label matcher") + } + + var p index.Postings + p, err = PostingsForMatchers(r, append(matchers, requireLabel)...) + if err != nil { + return nil, err + } + + dedupe := map[string]interface{}{} + for p.Next() { + v, err := r.LabelValueFor(p.At(), name) + if err != nil { + if err == storage.ErrNotFound { + continue + } + + return nil, err + } + dedupe[v] = nil + } + + if err = p.Err(); err != nil { + return nil, err + } + + values := make([]string, 0, len(dedupe)) + for value := range dedupe { + values = append(values, value) + } + + return values, nil +} + // blockBaseSeriesSet allows to iterate over all series in the single block. // Iterated series are trimmed with given min and max time as well as tombstones. // See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index c0c26dbd4..f5697201b 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -1147,22 +1147,39 @@ func (m mockIndex) Close() error { return nil } -func (m mockIndex) SortedLabelValues(name string) ([]string, error) { - values, _ := m.LabelValues(name) +func (m mockIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { + values, _ := m.LabelValues(name, matchers...) sort.Strings(values) return values, nil } -func (m mockIndex) LabelValues(name string) ([]string, error) { +func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { values := []string{} - for l := range m.postings { - if l.Name == name { - values = append(values, l.Value) + + if len(matchers) == 0 { + for l := range m.postings { + if l.Name == name { + values = append(values, l.Value) + } + } + return values, nil + } + + for _, series := range m.series { + for _, matcher := range matchers { + if matcher.Matches(series.l.Get(matcher.Name)) { + values = append(values, series.l.Get(name)) + } } } + return values, nil } +func (m mockIndex) LabelValueFor(id uint64, label string) (string, error) { + return m.series[id].l.Get(label), nil +} + func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) { res := make([]index.Postings, 0, len(values)) for _, value := range values { @@ -1970,15 +1987,19 @@ func (m mockMatcherIndex) Symbols() index.StringIter { return nil } func (m mockMatcherIndex) Close() error { return nil } // SortedLabelValues will return error if it is called. -func (m mockMatcherIndex) SortedLabelValues(name string) ([]string, error) { +func (m mockMatcherIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { return []string{}, errors.New("sorted label values called") } // LabelValues will return error if it is called. -func (m mockMatcherIndex) LabelValues(name string) ([]string, error) { +func (m mockMatcherIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) { return []string{}, errors.New("label values called") } +func (m mockMatcherIndex) LabelValueFor(id uint64, label string) (string, error) { + return "", errors.New("label value for called") +} + func (m mockMatcherIndex) Postings(name string, values ...string) (index.Postings, error) { return index.EmptyPostings(), nil } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index c3a900611..3210feb04 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -608,48 +608,36 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) { warnings storage.Warnings ) if len(matcherSets) > 0 { - hints := &storage.SelectHints{ - Start: timestamp.FromTime(start), - End: timestamp.FromTime(end), - Func: "series", // There is no series function, this token is used for lookups that don't need samples. - } - + var callWarnings storage.Warnings labelValuesSet := make(map[string]struct{}) - // Get all series which match matchers. - for _, mset := range matcherSets { - s := q.Select(false, hints, mset...) - for s.Next() { - series := s.At() - labelValue := series.Labels().Get(name) - // Filter out empty value. - if labelValue == "" { - continue - } - labelValuesSet[labelValue] = struct{}{} + for _, matchers := range matcherSets { + vals, callWarnings, err = q.LabelValues(name, matchers...) + if err != nil { + return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer} } - warnings = append(warnings, s.Warnings()...) - if err := s.Err(); err != nil { - return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} + warnings = append(warnings, callWarnings...) + for _, val := range vals { + labelValuesSet[val] = struct{}{} } } - // Convert the map to an array. vals = make([]string, 0, len(labelValuesSet)) - for key := range labelValuesSet { - vals = append(vals, key) + for val := range labelValuesSet { + vals = append(vals, val) } - sort.Strings(vals) } else { vals, warnings, err = q.LabelValues(name) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer} } - } - if vals == nil { - vals = []string{} + if vals == nil { + vals = []string{} + } } + sort.Strings(vals) + return apiFuncResult{vals, nil, warnings, closer} } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 8240ff289..edcaf95c7 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -1699,6 +1699,20 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI "boo", }, }, + // Try to overlap the selected series set as much as possible to test that the value de-duplication works. + { + endpoint: api.labelValues, + params: map[string]string{ + "name": "foo", + }, + query: url.Values{ + "match[]": []string{`test_metric4{dup=~"^1"}`, `test_metric4{foo=~".+o$"}`}, + }, + response: []string{ + "bar", + "boo", + }, + }, // Label names. { endpoint: api.labelNames,