storage: don't wrap single querier in merge-queriers

If given a single querier, just return it instead of constructing a
complicated wrapper. The code in `mergeGenericQuerier` which skipped
merging when there was only one is not needed any more.

This change required a few tests to be tweaked, because they relied on
the specific behaviour of `mergeGenericQuerier.Select()`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/13434/head
Bryan Boreham 2024-01-19 21:25:30 +00:00
parent 34875ae8c7
commit fbca054af6
2 changed files with 27 additions and 21 deletions

View File

@ -46,9 +46,15 @@ type mergeGenericQuerier struct {
//
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
if len(primaries)+len(secondaries) == 0 {
return NoopQuerier()
switch {
case len(primaries)+len(secondaries) == 0:
return noopQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return secondaries[0]
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
@ -78,6 +84,15 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
switch {
case len(primaries) == 0 && len(secondaries) == 0:
return noopChunkQuerier{}
case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0]
case len(primaries) == 0 && len(secondaries) == 1:
return secondaries[0]
}
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
@ -103,13 +118,6 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
if len(q.queriers) == 0 {
return noopGenericSeriesSet{}
}
if len(q.queriers) == 1 {
return q.queriers[0].Select(ctx, sortSeries, hints, matchers...)
}
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
if !q.concurrentSelect {
for _, querier := range q.queriers {

View File

@ -180,9 +180,9 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var p Querier
var p []Querier
if tc.primaryQuerierSeries != nil {
p = &mockQuerier{toReturn: tc.primaryQuerierSeries}
p = append(p, &mockQuerier{toReturn: tc.primaryQuerierSeries})
}
var qs []Querier
for _, in := range tc.querierSeries {
@ -190,7 +190,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
}
qs = append(qs, tc.extraQueriers...)
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil)
mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(context.Background(), false, nil)
// Get all merged series upfront to make sure there are no incorrectly retained shared
// buffers causing bugs.
@ -355,9 +355,9 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
var p ChunkQuerier
var p []ChunkQuerier
if tc.primaryChkQuerierSeries != nil {
p = &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries}
p = append(p, &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries})
}
var qs []ChunkQuerier
@ -366,7 +366,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
}
qs = append(qs, tc.extraQueriers...)
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil)
merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil)
for merged.Next() {
require.True(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
@ -1444,6 +1444,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedErrs [4]error
}{
{
// NewMergeQuerier will not create a mergeGenericQuerier
// with just one querier inside, but we can test it anyway.
name: "one successful primary querier",
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
expectedSelectsSeries: []labels.Labels{
@ -1552,12 +1554,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
exp := []bool{true}
if len(q.queriers) == 1 {
exp[0] = false
}
require.Equal(t, exp, m.sortedSeriesRequested)
// mergeGenericQuerier forces all Selects to be sorted.
require.Equal(t, []bool{true}, m.sortedSeriesRequested)
}
})
t.Run("LabelNames", func(t *testing.T) {