fix(storage/mergeQuerier): add a reproducer for data race that occurs when one of the queriers alters the passed matchers and propose a fix

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
pull/14983/head
machine424 2 months ago
parent ad4857de52
commit eb523a6b29
No known key found for this signature in database
GPG Key ID: A4B001A4FDEE017D

@ -153,13 +153,21 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
) )
// Schedule all Selects for all queriers we know about. // Schedule all Selects for all queriers we know about.
for _, querier := range q.queriers { for _, querier := range q.queriers {
// copy the matchers as some queriers may alter the slice.
// See https://github.com/prometheus/prometheus/issues/14723
// matchersCopy := make([]*labels.Matcher, len(matchers))
// copy(matchersCopy, matchers)
wg.Add(1) wg.Add(1)
go func(qr genericQuerier) { go func(qr genericQuerier) {
// go func(qr genericQuerier, m []*labels.Matcher) {
defer wg.Done() defer wg.Done()
// We need to sort for NewMergeSeriesSet to work. // We need to sort for NewMergeSeriesSet to work.
// seriesSetChan <- qr.Select(ctx, true, hints, m...)
seriesSetChan <- qr.Select(ctx, true, hints, matchers...) seriesSetChan <- qr.Select(ctx, true, hints, matchers...)
}(querier) }(querier)
// }(querier, matchersCopy)
} }
go func() { go func() {
wg.Wait() wg.Wait()

@ -3787,3 +3787,29 @@ func (m mockReaderOfLabels) Series(storage.SeriesRef, *labels.ScratchBuilder, *[
func (m mockReaderOfLabels) Symbols() index.StringIter { func (m mockReaderOfLabels) Symbols() index.StringIter {
panic("Series called") panic("Series called")
} }
// TestMergeQuerierConcurrentSelectMatchers reproduces the data race bug from
// https://github.com/prometheus/prometheus/issues/14723, when one of the queriers (blockQuerier in this case)
// alters the passed matchers.
func TestMergeQuerierConcurrentSelectMatchers(t *testing.T) {
block, err := OpenBlock(nil, createBlock(t, t.TempDir(), genSeries(1, 1, 0, 1)), nil)
require.NoError(t, err)
p, err := NewBlockQuerier(block, 0, 1)
require.NoError(t, err)
// A secondary querier is required to enable concurrent select; a blockQuerier is used for simplicity.
s, err := NewBlockQuerier(block, 0, 1)
require.NoError(t, err)
originalMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "baz", ".*"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
matchers := append([]*labels.Matcher{}, originalMatchers...)
mergedQuerier := storage.NewMergeQuerier([]storage.Querier{p}, []storage.Querier{s}, storage.ChainedSeriesMerge)
defer mergedQuerier.Close()
mergedQuerier.Select(context.Background(), false, nil, matchers...)
require.Equal(t, originalMatchers, matchers)
}

Loading…
Cancel
Save