Browse Source

Merge pull request #14975 from colega/process-mempostings-delete-with-gomaxprocs-workers

Process `MemPostings.Delete()` with `GOMAXPROCS` workers
pull/15005/head
Bryan Boreham 2 months ago committed by GitHub
parent
commit
54de4fb780
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 79
      tsdb/index/postings.go
  2. 88
      tsdb/index/postings_test.go

79
tsdb/index/postings.go

@ -26,6 +26,7 @@ import (
"sync"
"github.com/bboreham/go-loser"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -292,30 +293,76 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
p.mtx.Lock()
defer p.mtx.Unlock()
if len(p.m) == 0 || len(deleted) == 0 {
return
}
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
// Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it.
deleteLabelNames := make(chan string, len(p.m))
process, wait := processWithBoundedParallelismAndConsistentWorkers(
runtime.GOMAXPROCS(0),
func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) },
func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
}
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
if len(p.m[l.Name]) == 0 {
// Delete the key if we removed all values.
deleteLabelNames <- l.Name
}
}
}
}
},
)
for l := range affected {
process(l)
}
process(allPostingsKey)
wait()
// Close deleteLabelNames channel and delete the label names requested.
close(deleteLabelNames)
for name := range deleteLabelNames {
delete(p.m, name)
}
}
// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism,
// making sure that elements with same hash(T) will always be processed by the same worker.
// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed.
func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) {
wg := &sync.WaitGroup{}
jobs := make([]chan T, workers)
for i := 0; i < workers; i++ {
wg.Add(1)
jobs[i] = make(chan T, 128)
go func(jobs <-chan T) {
defer wg.Done()
for l := range jobs {
f(l)
}
}(jobs[i])
}
process = func(job T) {
jobs[hash(job)%uint64(workers)] <- job
}
wait = func() {
for i := range jobs {
close(jobs[i])
}
wg.Wait()
}
return process, wait
}
// Iter calls f for each postings list. It aborts if f returns an error and returns it.

88
tsdb/index/postings_test.go

@ -973,37 +973,69 @@ func TestMemPostingsStats(t *testing.T) {
}
func TestMemPostings_Delete(t *testing.T) {
p := NewMemPostings()
p.Add(1, labels.FromStrings("lbl1", "a"))
p.Add(2, labels.FromStrings("lbl1", "b"))
p.Add(3, labels.FromStrings("lbl2", "a"))
t.Run("some postings", func(t *testing.T) {
p := NewMemPostings()
p.Add(1, labels.FromStrings("lbl1", "a"))
p.Add(2, labels.FromStrings("lbl1", "b"))
p.Add(3, labels.FromStrings("lbl2", "a"))
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
deletedRefs := map[storage.SeriesRef]struct{}{
2: {},
}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "b"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
deletedRefs := map[storage.SeriesRef]struct{}{
2: {},
}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "b"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
// Make sure postings gotten before the delete have the old data when
// iterated over.
expanded, err := ExpandPostings(before)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded)
// Make sure postings gotten before the delete have the old data when
// iterated over.
expanded, err := ExpandPostings(before)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{1, 2, 3}, expanded)
// Make sure postings gotten after the delete have the new data when
// iterated over.
expanded, err = ExpandPostings(after)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{1, 3}, expanded)
// Make sure postings gotten after the delete have the new data when
// iterated over.
expanded, err = ExpandPostings(after)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{1, 3}, expanded)
deleted := p.Get("lbl1", "b")
expanded, err = ExpandPostings(deleted)
require.NoError(t, err)
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
})
deleted := p.Get("lbl1", "b")
expanded, err = ExpandPostings(deleted)
require.NoError(t, err)
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
t.Run("all postings", func(t *testing.T) {
p := NewMemPostings()
p.Add(1, labels.FromStrings("lbl1", "a"))
p.Add(2, labels.FromStrings("lbl1", "b"))
p.Add(3, labels.FromStrings("lbl2", "a"))
deletedRefs := map[storage.SeriesRef]struct{}{1: {}, 2: {}, 3: {}}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "a"}: {},
{Name: "lbl1", Value: "b"}: {},
{Name: "lbl1", Value: "c"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
expanded, err := ExpandPostings(after)
require.NoError(t, err)
require.Empty(t, expanded)
})
t.Run("nothing on empty mempostings", func(t *testing.T) {
p := NewMemPostings()
deletedRefs := map[storage.SeriesRef]struct{}{}
affectedLabels := map[labels.Label]struct{}{}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
expanded, err := ExpandPostings(after)
require.NoError(t, err)
require.Empty(t, expanded)
})
}
// BenchmarkMemPostings_Delete is quite heavy, so consider running it with
@ -1025,7 +1057,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
return s
}
const total = 1e6
const total = 2e6
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {

Loading…
Cancel
Save