From e196b977afdfd3cc72ac15de97845bec056a8a3d Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 25 Sep 2024 10:38:47 +0200 Subject: [PATCH 1/5] Process MemPostings.Delete() with GOMAXPROCS workers We are still seeing lock contention on MemPostings.mtx, and MemPostings.Delete() is by far the most expensive operation on that mutex. This adds parallelism to that method, trying to reduce the amount of time we spend with the mutex held. Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 41 +++++++++++++++++++++++++++++++++---- tsdb/index/postings_test.go | 2 +- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index bfe74c323..25780e4ad 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -26,6 +26,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -293,6 +294,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. + deleteLabelNames := make(chan string, len(p.m)) + process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -305,17 +309,46 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) + // Delete the key if we removed all values. + deleteLabelNames <- l.Name } } } + // Create GOMAXPROCS workers. + wg := sync.WaitGroup{} + jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) + for i := range jobs { + jobs[i] = make(chan labels.Label, 128) + wg.Add(1) + go func(jobs chan labels.Label) { + defer wg.Done() + for l := range jobs { + process(l) + } + }(jobs[i]) + } + + // Process all affected labels and the allPostingsKey. for l := range affected { - process(l) + j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs))) + jobs[j] <- l + } + j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs))) + jobs[j] <- allPostingsKey + + // Close jobs channels and wait all workers to finish. + for i := range jobs { + close(jobs[i]) + } + wg.Wait() + + // Close deleteLabelNames channel and delete the label names requested. + close(deleteLabelNames) + for name := range deleteLabelNames { + delete(p.m, name) } - process(allPostingsKey) } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124..1802c9e89 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1025,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return s } - const total = 1e6 + const total = 2e6 allSeries := [total]labels.Labels{} nameValues := make([]string, 0, 100) for i := 0; i < total; i++ { From 9c417aa71045e36d9fad66e1e77c1d942cbacc17 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 25 Sep 2024 14:08:50 +0200 Subject: [PATCH 2/5] Fix deadlock with empty MemPostings Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 25780e4ad..3164d8c2f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -295,7 +295,8 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma defer p.mtx.Unlock() // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. - deleteLabelNames := make(chan string, len(p.m)) + // Adding +1 to length to account for allPostingsKey processing when MemPostings is empty. + deleteLabelNames := make(chan string, len(p.m)+1) process := func(l labels.Label) { orig := p.m[l.Name][l.Value] From ccd0308abcb98505797161b9142da1fe9ddbe88c Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 25 Sep 2024 14:59:16 +0200 Subject: [PATCH 3/5] Don't do anything if MemPostings are empty Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 3164d8c2f..e6a6c708f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -293,10 +293,12 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) { p.mtx.Lock() defer p.mtx.Unlock() + if len(p.m) == 0 || len(deleted) == 0 { + return + } // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. - // Adding +1 to length to account for allPostingsKey processing when MemPostings is empty. - deleteLabelNames := make(chan string, len(p.m)+1) + deleteLabelNames := make(chan string, len(p.m)) process := func(l labels.Label) { orig := p.m[l.Name][l.Value] From 4fd2556baa8bc11d49529abb92163feca33d1a58 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 26 Sep 2024 15:43:19 +0200 Subject: [PATCH 4/5] Extract processWithBoundedParallelismAndConsistentWorkers Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 89 ++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index e6a6c708f..f8415407e 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -300,58 +300,69 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. deleteLabelNames := make(chan string, len(p.m)) - process := func(l labels.Label) { - orig := p.m[l.Name][l.Value] - repl := make([]storage.SeriesRef, 0, len(orig)) - for _, id := range orig { - if _, ok := deleted[id]; !ok { - repl = append(repl, id) + process, wait := processWithBoundedParallelismAndConsistentWorkers( + runtime.GOMAXPROCS(0), + func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) }, + func(l labels.Label) { + orig := p.m[l.Name][l.Value] + repl := make([]storage.SeriesRef, 0, len(orig)) + for _, id := range orig { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } } - } - if len(repl) > 0 { - p.m[l.Name][l.Value] = repl - } else { - delete(p.m[l.Name], l.Value) - if len(p.m[l.Name]) == 0 { - // Delete the key if we removed all values. - deleteLabelNames <- l.Name + if len(repl) > 0 { + p.m[l.Name][l.Value] = repl + } else { + delete(p.m[l.Name], l.Value) + if len(p.m[l.Name]) == 0 { + // Delete the key if we removed all values. + deleteLabelNames <- l.Name + } } - } + }, + ) + + for l := range affected { + process(l) } + process(allPostingsKey) + wait() - // Create GOMAXPROCS workers. - wg := sync.WaitGroup{} - jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) - for i := range jobs { - jobs[i] = make(chan labels.Label, 128) + // Close deleteLabelNames channel and delete the label names requested. + close(deleteLabelNames) + for name := range deleteLabelNames { + delete(p.m, name) + } +} + +// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism, +// making sure that elements with same hash(T) will always be processed by the same worker. +// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed. +func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) { + wg := &sync.WaitGroup{} + jobs := make([]chan T, workers) + for i := 0; i < workers; i++ { wg.Add(1) - go func(jobs chan labels.Label) { + jobs[i] = make(chan T, 128) + go func(jobs <-chan T) { defer wg.Done() for l := range jobs { - process(l) + f(l) } }(jobs[i]) } - // Process all affected labels and the allPostingsKey. - for l := range affected { - j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs))) - jobs[j] <- l - } - j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs))) - jobs[j] <- allPostingsKey - - // Close jobs channels and wait all workers to finish. - for i := range jobs { - close(jobs[i]) + process = func(job T) { + jobs[hash(job)%uint64(workers)] <- job } - wg.Wait() - - // Close deleteLabelNames channel and delete the label names requested. - close(deleteLabelNames) - for name := range deleteLabelNames { - delete(p.m, name) + wait = func() { + for i := range jobs { + close(jobs[i]) + } + wg.Wait() } + return process, wait } // Iter calls f for each postings list. It aborts if f returns an error and returns it. From ada8a6ef10c37ec0ea37b2e0c21e4ec2187a6fa8 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 27 Sep 2024 10:14:39 +0200 Subject: [PATCH 5/5] Add some more tests for MemPostings_Delete Signed-off-by: Oleg Zaytsev --- tsdb/index/postings_test.go | 86 +++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 27 deletions(-) diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 1802c9e89..b41fb54e6 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -973,37 +973,69 @@ func TestMemPostingsStats(t *testing.T) { } func TestMemPostings_Delete(t *testing.T) { - p := NewMemPostings() - p.Add(1, labels.FromStrings("lbl1", "a")) - p.Add(2, labels.FromStrings("lbl1", "b")) - p.Add(3, labels.FromStrings("lbl2", "a")) + t.Run("some postings", func(t *testing.T) { + p := NewMemPostings() + p.Add(1, labels.FromStrings("lbl1", "a")) + p.Add(2, labels.FromStrings("lbl1", "b")) + p.Add(3, labels.FromStrings("lbl2", "a")) + + before := p.Get(allPostingsKey.Name, allPostingsKey.Value) + deletedRefs := map[storage.SeriesRef]struct{}{ + 2: {}, + } + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "b"}: {}, + } + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) - before := p.Get(allPostingsKey.Name, allPostingsKey.Value) - deletedRefs := map[storage.SeriesRef]struct{}{ - 2: {}, - } - affectedLabels := map[labels.Label]struct{}{ - {Name: "lbl1", Value: "b"}: {}, - } - p.Delete(deletedRefs, affectedLabels) - after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + // Make sure postings gotten before the delete have the old data when + // iterated over. + expanded, err := ExpandPostings(before) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) - // Make sure postings gotten before the delete have the old data when - // iterated over. - expanded, err := ExpandPostings(before) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded) + // Make sure postings gotten after the delete have the new data when + // iterated over. + expanded, err = ExpandPostings(after) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{1, 3}, expanded) - // Make sure postings gotten after the delete have the new data when - // iterated over. - expanded, err = ExpandPostings(after) - require.NoError(t, err) - require.Equal(t, []storage.SeriesRef{1, 3}, expanded) + deleted := p.Get("lbl1", "b") + expanded, err = ExpandPostings(deleted) + require.NoError(t, err) + require.Empty(t, expanded, "expected empty postings, got %v", expanded) + }) - deleted := p.Get("lbl1", "b") - expanded, err = ExpandPostings(deleted) - require.NoError(t, err) - require.Empty(t, expanded, "expected empty postings, got %v", expanded) + t.Run("all postings", func(t *testing.T) { + p := NewMemPostings() + p.Add(1, labels.FromStrings("lbl1", "a")) + p.Add(2, labels.FromStrings("lbl1", "b")) + p.Add(3, labels.FromStrings("lbl2", "a")) + + deletedRefs := map[storage.SeriesRef]struct{}{1: {}, 2: {}, 3: {}} + affectedLabels := map[labels.Label]struct{}{ + {Name: "lbl1", Value: "a"}: {}, + {Name: "lbl1", Value: "b"}: {}, + {Name: "lbl1", Value: "c"}: {}, + } + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + expanded, err := ExpandPostings(after) + require.NoError(t, err) + require.Empty(t, expanded) + }) + + t.Run("nothing on empty mempostings", func(t *testing.T) { + p := NewMemPostings() + deletedRefs := map[storage.SeriesRef]struct{}{} + affectedLabels := map[labels.Label]struct{}{} + p.Delete(deletedRefs, affectedLabels) + after := p.Get(allPostingsKey.Name, allPostingsKey.Value) + expanded, err := ExpandPostings(after) + require.NoError(t, err) + require.Empty(t, expanded) + }) } // BenchmarkMemPostings_Delete is quite heavy, so consider running it with