diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 5ed41f769..e909a3717 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -26,6 +26,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -292,30 +293,76 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) { p.mtx.Lock() defer p.mtx.Unlock() + if len(p.m) == 0 || len(deleted) == 0 { + return + } - process := func(l labels.Label) { - orig := p.m[l.Name][l.Value] - repl := make([]storage.SeriesRef, 0, len(orig)) - for _, id := range orig { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) + // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. + deleteLabelNames := make(chan string, len(p.m)) + + process, wait := processWithBoundedParallelismAndConsistentWorkers( + runtime.GOMAXPROCS(0), + func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) }, + func(l labels.Label) { + orig := p.m[l.Name][l.Value] + repl := make([]storage.SeriesRef, 0, len(orig)) + for _, id := range orig { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } } - } - if len(repl) > 0 { - p.m[l.Name][l.Value] = repl - } else { - delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. - if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) + if len(repl) > 0 { + p.m[l.Name][l.Value] = repl + } else { + delete(p.m[l.Name], l.Value) + if len(p.m[l.Name]) == 0 { + // Delete the key if we removed all values. + deleteLabelNames <- l.Name + } } - } - } + }, + ) for l := range affected { process(l) } process(allPostingsKey) + wait() + + // Close deleteLabelNames channel and delete the label names requested. + close(deleteLabelNames) + for name := range deleteLabelNames { + delete(p.m, name) + } +} + +// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism, +// making sure that elements with same hash(T) will always be processed by the same worker. +// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed. +func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) { + wg := &sync.WaitGroup{} + jobs := make([]chan T, workers) + for i := 0; i < workers; i++ { + wg.Add(1) + jobs[i] = make(chan T, 128) + go func(jobs <-chan T) { + defer wg.Done() + for l := range jobs { + f(l) + } + }(jobs[i]) + } + + process = func(job T) { + jobs[hash(job)%uint64(workers)] <- job + } + wait = func() { + for i := range jobs { + close(jobs[i]) + } + wg.Wait() + } + return process, wait } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124..b41fb54e6 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -973,37 +973,69 @@ func TestMemPostingsStats(t *testing.T) { } func TestMemPostings_Delete(t *testing.T) { - p := NewMemPostings() - p.Add(1, labels.FromStrings("lbl1", "a")) - p.Add(2, labels.FromStrings("lbl1", "b")) - p.Add(3, labels.FromStrings("lbl2", "a")) + t.Run("some postings", func(t *testing.T) { + p := NewMemPostings() + p.Add(1, labels.FromStrings("lbl1", "a")) + p.Add(2, labels.FromStrings("lbl1", "b")) + p.Add(3, labels.FromStrings("lbl2", "a")) + + before := p.Get(allPostingsKey.Name, allPostingsKey.Value) + deletedRefs := map[storage.SeriesRef]struct{}{ + 2: {}, + } + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "b"}: {}, + } + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) - before := p.Get(allPostingsKey.Name, allPostingsKey.Value) - deletedRefs := map[storage.SeriesRef]struct{}{ - 2: {}, - } - affectedLabels := map[labels.Label]struct{}{ - {Name: "lbl1", Value: "b"}: {}, - } - p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + // Make sure postings gotten before the delete have the old data when + // iterated over. + expanded, err := ExpandPostings(before) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) - // Make sure postings gotten before the delete have the old data when - // iterated over. - expanded, err := ExpandPostings(before) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) + // Make sure postings gotten after the delete have the new data when + // iterated over. + expanded, err = ExpandPostings(after) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 3}, expanded) - // Make sure postings gotten after the delete have the new data when - // iterated over. - expanded, err = ExpandPostings(after) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 3}, expanded) + deleted := p.Get("lbl1", "b") + expanded, err = ExpandPostings(deleted) + require.NoError(t, err) + require.Empty(t, expanded, "expected empty postings, got %v", expanded) + }) - deleted := p.Get("lbl1", "b") - expanded, err = ExpandPostings(deleted) - require.NoError(t, err) - require.Empty(t, expanded, "expected empty postings, got %v", expanded) + t.Run("all postings", func(t *testing.T) { + p := NewMemPostings() + p.Add(1, labels.FromStrings("lbl1", "a")) + p.Add(2, labels.FromStrings("lbl1", "b")) + p.Add(3, labels.FromStrings("lbl2", "a")) + + deletedRefs := map[storage.SeriesRef]struct{}{1: {}, 2: {}, 3: {}} + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "a"}: {}, + {Name: "lbl1", Value: "b"}: {}, + {Name: "lbl1", Value: "c"}: {}, + } + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + expanded, err := ExpandPostings(after) + require.NoError(t, err) + require.Empty(t, expanded) + }) + + t.Run("nothing on empty mempostings", func(t *testing.T) { + p := NewMemPostings() + deletedRefs := map[storage.SeriesRef]struct{}{} + affectedLabels := map[labels.Label]struct{}{} + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + expanded, err := ExpandPostings(after) + require.NoError(t, err) + require.Empty(t, expanded) + }) } // BenchmarkMemPostings_Delete is quite heavy, so consider running it with @@ -1025,7 +1057,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return s } - const total = 1e6 + const total = 2e6 allSeries := [total]labels.Labels{} nameValues := make([]string, 0, 100) for i := 0; i < total; i++ {