diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 6c493be1d..e7f8cef8e 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -30,9 +30,19 @@ func AllPostingsKey() (name, value string) { return allPostingsKey.Name, allPostingsKey.Value } +// ensureOrderBatchSize is the max number of postings passed to a worker in a single batch in MemPostings.EnsureOrder(). +const ensureOrderBatchSize = 1024 + +// ensureOrderBatchPool is a pool used to recycle batches passed to workers in MemPostings.EnsureOrder(). +var ensureOrderBatchPool = sync.Pool{ + New: func() interface{} { + return make([][]uint64, 0, ensureOrderBatchSize) + }, +} + // MemPostings holds postings list for series ID per label pair. They may be written // to out of order. -// ensureOrder() must be called once before any reads are done. This allows for quick +// EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { mtx sync.RWMutex @@ -49,7 +59,7 @@ func NewMemPostings() *MemPostings { } // NewUnorderedMemPostings returns a memPostings that is not safe to be read from -// until ensureOrder was called once. +// until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { return &MemPostings{ m: make(map[string]map[string][]uint64, 512), @@ -218,25 +228,42 @@ func (p *MemPostings) EnsureOrder() { } n := runtime.GOMAXPROCS(0) - workc := make(chan []uint64) + workc := make(chan [][]uint64) var wg sync.WaitGroup wg.Add(n) for i := 0; i < n; i++ { go func() { - for l := range workc { - sort.Slice(l, func(a, b int) bool { return l[a] < l[b] }) + for job := range workc { + for _, l := range job { + sort.Sort(uint64Slice(l)) + } + + job = job[:0] + ensureOrderBatchPool.Put(job) //nolint:staticcheck // Ignore SA6002 safe to ignore and actually fixing it has some performance penalty. } wg.Done() }() } + nextJob := ensureOrderBatchPool.Get().([][]uint64) for _, e := range p.m { for _, l := range e { - workc <- l + nextJob = append(nextJob, l) + + if len(nextJob) >= ensureOrderBatchSize { + workc <- nextJob + nextJob = ensureOrderBatchPool.Get().([][]uint64) + } } } + + // If the last job was partially filled, we need to push it to workers too. + if len(nextJob) > 0 { + workc <- nextJob + } + close(workc) wg.Wait() @@ -796,3 +823,10 @@ func (it *bigEndianPostings) Seek(x uint64) bool { func (it *bigEndianPostings) Err() error { return nil } + +// uint64Slice attaches the methods of sort.Interface to []uint64, sorting in increasing order. +type uint64Slice []uint64 + +func (x uint64Slice) Len() int { return len(x) } +func (x uint64Slice) Less(i, j int) bool { return x[i] < x[j] } +func (x uint64Slice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 4ec85aee4..fef788c8e 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -18,6 +18,7 @@ import ( "fmt" "math/rand" "sort" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -63,6 +64,59 @@ func TestMemPostings_ensureOrder(t *testing.T) { } } +func BenchmarkMemPostings_ensureOrder(b *testing.B) { + tests := map[string]struct { + numLabels int + numValuesPerLabel int + numRefsPerValue int + }{ + "many values per label": { + numLabels: 100, + numValuesPerLabel: 10000, + numRefsPerValue: 100, + }, + "few values per label": { + numLabels: 1000000, + numValuesPerLabel: 1, + numRefsPerValue: 100, + }, + "few refs per label value": { + numLabels: 1000, + numValuesPerLabel: 1000, + numRefsPerValue: 10, + }, + } + + for testName, testData := range tests { + b.Run(testName, func(b *testing.B) { + p := NewUnorderedMemPostings() + + // Generate postings. + for l := 0; l < testData.numLabels; l++ { + labelName := strconv.Itoa(l) + p.m[labelName] = map[string][]uint64{} + + for v := 0; v < testData.numValuesPerLabel; v++ { + refs := make([]uint64, testData.numRefsPerValue) + for j := range refs { + refs[j] = rand.Uint64() + } + + labelValue := strconv.Itoa(v) + p.m[labelName][labelValue] = refs + } + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + p.EnsureOrder() + p.ordered = false + } + }) + } +} + func TestIntersect(t *testing.T) { a := newListPostings(1, 2, 3) b := newListPostings(2, 3, 4)