PostingsForMatchers race with creating new series (#12558)

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
pull/12715/head
Dimitar Dimitrov 1 year ago committed by GitHub
parent 1b9a53b5ee
commit b40865833d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
@ -244,6 +245,41 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
labelMustBeSet[m.Name] = true
}
}
isSubtractingMatcher := func(m *labels.Matcher) bool {
if !labelMustBeSet[m.Name] {
return true
}
return (m.Type == labels.MatchNotEqual || m.Type == labels.MatchNotRegexp) && m.Matches("")
}
hasSubtractingMatchers, hasIntersectingMatchers := false, false
for _, m := range ms {
if isSubtractingMatcher(m) {
hasSubtractingMatchers = true
} else {
hasIntersectingMatchers = true
}
}
if hasSubtractingMatchers && !hasIntersectingMatchers {
// If there's nothing to subtract from, add in everything and remove the notIts later.
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
// doesn't include series that may be added to the index reader during this function call.
k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v)
if err != nil {
return nil, err
}
its = append(its, allPostings)
}
// Sort matchers to have the intersecting matchers first.
// This way the base for subtraction is smaller and
// there is no chance that the set we subtract from
// contains postings of series that didn't exist when
// we constructed the set we subtract by.
slices.SortStableFunc(ms, func(i, j *labels.Matcher) bool {
return !isSubtractingMatcher(i) && isSubtractingMatcher(j)
})
for _, m := range ms {
switch {
@ -312,16 +348,6 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
}
}
// If there's nothing to subtract from, add in everything and remove the notIts later.
if len(its) == 0 && len(notIts) != 0 {
k, v := index.AllPostingsKey()
allPostings, err := ix.Postings(k, v)
if err != nil {
return nil, err
}
its = append(its, allPostings)
}
it := index.Intersect(its...)
for _, n := range notIts {

@ -22,6 +22,7 @@ import (
"path/filepath"
"sort"
"strconv"
"sync"
"testing"
"time"
@ -2194,6 +2195,71 @@ func TestPostingsForMatchers(t *testing.T) {
}
}
// TestQuerierIndexQueriesRace tests the index queries with racing appends.
func TestQuerierIndexQueriesRace(t *testing.T) {
const testRepeats = 1000
testCases := []struct {
matchers []*labels.Matcher
}{
{
matchers: []*labels.Matcher{
// This matcher should involve the AllPostings posting list in calculating the posting lists.
labels.MustNewMatcher(labels.MatchNotEqual, labels.MetricName, "metric"),
},
},
{
matchers: []*labels.Matcher{
// The first matcher should be effectively the same as AllPostings, because all series have always_0=0
// If it is evaluated first, then __name__=metric will contain more series than always_0=0.
labels.MustNewMatcher(labels.MatchNotEqual, "always_0", "0"),
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric"),
},
},
}
for _, c := range testCases {
c := c
t.Run(fmt.Sprintf("%v", c.matchers), func(t *testing.T) {
db := openTestDB(t, DefaultOptions(), nil)
h := db.Head()
t.Cleanup(func() {
require.NoError(t, db.Close())
})
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go appendSeries(t, ctx, wg, h)
t.Cleanup(wg.Wait)
t.Cleanup(cancel)
for i := 0; i < testRepeats; i++ {
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
values, _, err := q.LabelValues("seq", c.matchers...)
require.NoError(t, err)
require.Emptyf(t, values, `label values for label "seq" should be empty`)
}
})
}
}
func appendSeries(t *testing.T, ctx context.Context, wg *sync.WaitGroup, h *Head) {
defer wg.Done()
for i := 0; ctx.Err() != nil; i++ {
app := h.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric", "seq", strconv.Itoa(i), "always_0", "0"), 0, 0)
require.NoError(t, err)
err = app.Commit()
require.NoError(t, err)
// Throttle down the appends to keep the test somewhat nimble.
time.Sleep(time.Millisecond)
}
}
// TestClose ensures that calling Close more than once doesn't block and doesn't panic.
func TestClose(t *testing.T) {
dir := t.TempDir()

Loading…
Cancel
Save