storage: Implement limit in mergeGenericQuerier

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
pull/14489/head
🌲 Harry 🌊 John 🏔 3 weeks ago
parent fc3d19725d
commit f9bc50b247

@ -733,7 +733,7 @@ func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt i
for _, mset := range matcherSets {
sets = append(sets, q.Select(ctx, true, nil, mset...))
}
ss = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
ss = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
} else {
ss = q.Select(ctx, false, nil, matcherSets[0]...)
}

@ -19,7 +19,6 @@ import (
"context"
"fmt"
"math"
"slices"
"sync"
"github.com/prometheus/prometheus/model/histogram"
@ -136,13 +135,17 @@ func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
var limit int
if hints != nil {
limit = hints.Limit
}
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
@ -175,7 +178,7 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
@ -193,35 +196,44 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, ws, err := q.lvals(ctx, q.queriers, name, hints, matchers...)
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
return q.LabelValues(ctx, name, hints, matchers...)
})
if err != nil {
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
}
return res, ws, nil
}
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// mergeResults performs merge sort on the results of invoking the resultsFn against multiple queriers.
func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *LabelHints, resultsFn func(q LabelQuerier) ([]string, annotations.Annotations, error)) ([]string, annotations.Annotations, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(ctx, n, hints, matchers...)
return resultsFn(lq.Get(0))
}
a, b := lq.SplitByHalf()
var ws annotations.Annotations
s1, w, err := q.lvals(ctx, a, n, hints, matchers...)
s1, w, err := q.mergeResults(a, hints, resultsFn)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
s2, ws, err := q.lvals(ctx, b, n, hints, matchers...)
s2, w, err := q.mergeResults(b, hints, resultsFn)
ws.Merge(w)
if err != nil {
return nil, ws, err
}
return mergeStrings(s1, s2), ws, nil
s1 = truncateToLimit(s1, hints)
s2 = truncateToLimit(s2, hints)
merged := mergeStrings(s1, s2)
merged = truncateToLimit(merged, hints)
return merged, ws, nil
}
func mergeStrings(a, b []string) []string {
@ -253,33 +265,13 @@ func mergeStrings(a, b []string) []string {
// LabelNames returns all the unique label names present in all queriers in sorted order.
func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
var (
labelNamesMap = make(map[string]struct{})
warnings annotations.Annotations
)
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames(ctx, hints, matchers...)
if wrn != nil {
// TODO(bwplotka): We could potentially wrap warnings.
warnings.Merge(wrn)
}
if err != nil {
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
}
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
}
if len(labelNamesMap) == 0 {
return nil, warnings, nil
}
labelNames := make([]string, 0, len(labelNamesMap))
for name := range labelNamesMap {
labelNames = append(labelNames, name)
res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) {
return q.LabelNames(ctx, hints, matchers...)
})
if err != nil {
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
}
slices.Sort(labelNames)
return labelNames, warnings, nil
return res, ws, nil
}
// Close releases the resources of the generic querier.
@ -293,17 +285,25 @@ func (q *mergeGenericQuerier) Close() error {
return errs.Err()
}
func truncateToLimit(s []string, hints *LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
// If limit is set, the SeriesSet will be limited up-to the limit. 0 means disabled.
func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
@ -313,12 +313,12 @@ func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) Seri
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
@ -326,9 +326,11 @@ type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
seriesLimit int
mergedSeries int // tracks the total number of series merged and returned.
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
@ -336,7 +338,8 @@ type genericMergeSeriesSet struct {
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
// If seriesLimit is set, only limited series are returned.
func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
@ -356,13 +359,19 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
mergeFunc: mergeFunc,
sets: sets,
heap: h,
seriesLimit: seriesLimit,
}
}
func (c *genericMergeSeriesSet) Next() bool {
if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit {
// Exit early if seriesLimit is set.
return false
}
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
@ -393,6 +402,7 @@ func (c *genericMergeSeriesSet) Next() bool {
break
}
}
c.mergedSeries++
return true
}

@ -1345,7 +1345,7 @@ func makeMergeSeriesSet(serieses [][]Series) SeriesSet {
for i, s := range serieses {
seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)}
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, 0, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
@ -1390,6 +1390,34 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
}
}
func BenchmarkMergeLabelValuesWithLimit(b *testing.B) {
var queriers []genericQuerier
for i := 0; i < 5; i++ {
var lbls []string
for j := 0; j < 100000; j++ {
lbls = append(lbls, fmt.Sprintf("querier_%d_label_%d", i, j))
}
q := &mockQuerier{resp: lbls}
queriers = append(queriers, newGenericQuerierFrom(q))
}
mergeQuerier := &mergeGenericQuerier{
queriers: queriers, // Assume querying 5 blocks.
mergeFn: func(l ...Labels) Labels {
return l[0]
},
}
b.Run("benchmark", func(b *testing.B) {
ctx := context.Background()
hints := &LabelHints{
Limit: 1000,
}
mergeQuerier.LabelValues(ctx, "name", hints)
})
}
func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int {
count := 0
switch x := qr.(type) {
@ -1428,6 +1456,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
name string
primaries []Querier
secondaries []Querier
limit int
expectedSelectsSeries []labels.Labels
expectedLabels []string
@ -1553,12 +1582,39 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedLabels: []string{"a", "b"},
expectedWarnings: annotations.New().Add(warnStorage),
},
{
name: "successful queriers with limit",
primaries: []Querier{
&mockQuerier{resp: []string{"a", "d"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
limit: 2,
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
expectedWarnings: annotations.New().Add(warnStorage),
},
} {
var labelHints *LabelHints
var selectHints *SelectHints
if tcase.limit > 0 {
labelHints = &LabelHints{
Limit: tcase.limit,
}
selectHints = &SelectHints{
Limit: tcase.limit,
}
}
t.Run(tcase.name, func(t *testing.T) {
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
t.Run("Select", func(t *testing.T) {
res := q.Select(context.Background(), false, nil)
res := q.Select(context.Background(), false, selectHints)
var lbls []labels.Labels
for res.Next() {
lbls = append(lbls, res.At().Labels())
@ -1577,7 +1633,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n)
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames(ctx, nil)
res, w, err := q.LabelNames(ctx, labelHints)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)
@ -1590,7 +1646,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
})
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test", nil)
res, w, err := q.LabelValues(ctx, "test", labelHints)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)
@ -1604,7 +1660,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues(ctx, "test2", nil, matcher)
res, w, err := q.LabelValues(ctx, "test2", labelHints, matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)

@ -831,7 +831,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
set = storage.NewMergeChunkSeriesSet(sets, 0, mergeFunc)
}
// Iterate over all sorted chunk series.

@ -2030,7 +2030,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
// TODO(bwplotka): Merge with storage merged series set benchmark.
func BenchmarkMergedSeriesSet(b *testing.B) {
sel := func(sets []storage.SeriesSet) storage.SeriesSet {
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
return storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
}
for _, k := range []int{

@ -917,7 +917,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
s := q.Select(ctx, true, hints, mset...)
sets = append(sets, s)
}
set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
set = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
} else {
// At this point at least one match exists.
set = q.Select(ctx, false, hints, matcherSets[0]...)

@ -100,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
set := storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
var chkIter chunkenc.Iterator
Loop:

Loading…
Cancel
Save