mirror of https://github.com/prometheus/prometheus
Merge pull request #15239 from colega/revert-14975-process-mempostings-delete-with-gomaxprocs-workers
Revert "Process `MemPostings.Delete()` with `GOMAXPROCS` workers"pull/15248/head
commit
d0eecb1223
|
@ -26,7 +26,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/bboreham/go-loser"
|
"github.com/bboreham/go-loser"
|
||||||
"github.com/cespare/xxhash/v2"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -293,17 +292,8 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
|
||||||
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
|
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
defer p.mtx.Unlock()
|
defer p.mtx.Unlock()
|
||||||
if len(p.m) == 0 || len(deleted) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it.
|
process := func(l labels.Label) {
|
||||||
deleteLabelNames := make(chan string, len(p.m))
|
|
||||||
|
|
||||||
process, wait := processWithBoundedParallelismAndConsistentWorkers(
|
|
||||||
runtime.GOMAXPROCS(0),
|
|
||||||
func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) },
|
|
||||||
func(l labels.Label) {
|
|
||||||
orig := p.m[l.Name][l.Value]
|
orig := p.m[l.Name][l.Value]
|
||||||
repl := make([]storage.SeriesRef, 0, len(orig))
|
repl := make([]storage.SeriesRef, 0, len(orig))
|
||||||
for _, id := range orig {
|
for _, id := range orig {
|
||||||
|
@ -315,54 +305,17 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
|
||||||
p.m[l.Name][l.Value] = repl
|
p.m[l.Name][l.Value] = repl
|
||||||
} else {
|
} else {
|
||||||
delete(p.m[l.Name], l.Value)
|
delete(p.m[l.Name], l.Value)
|
||||||
if len(p.m[l.Name]) == 0 {
|
|
||||||
// Delete the key if we removed all values.
|
// Delete the key if we removed all values.
|
||||||
deleteLabelNames <- l.Name
|
if len(p.m[l.Name]) == 0 {
|
||||||
|
delete(p.m, l.Name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
for l := range affected {
|
for l := range affected {
|
||||||
process(l)
|
process(l)
|
||||||
}
|
}
|
||||||
process(allPostingsKey)
|
process(allPostingsKey)
|
||||||
wait()
|
|
||||||
|
|
||||||
// Close deleteLabelNames channel and delete the label names requested.
|
|
||||||
close(deleteLabelNames)
|
|
||||||
for name := range deleteLabelNames {
|
|
||||||
delete(p.m, name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism,
|
|
||||||
// making sure that elements with same hash(T) will always be processed by the same worker.
|
|
||||||
// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed.
|
|
||||||
func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) {
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
jobs := make([]chan T, workers)
|
|
||||||
for i := 0; i < workers; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
jobs[i] = make(chan T, 128)
|
|
||||||
go func(jobs <-chan T) {
|
|
||||||
defer wg.Done()
|
|
||||||
for l := range jobs {
|
|
||||||
f(l)
|
|
||||||
}
|
|
||||||
}(jobs[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
process = func(job T) {
|
|
||||||
jobs[hash(job)%uint64(workers)] <- job
|
|
||||||
}
|
|
||||||
wait = func() {
|
|
||||||
for i := range jobs {
|
|
||||||
close(jobs[i])
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
return process, wait
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
|
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
|
||||||
|
|
|
@ -973,7 +973,6 @@ func TestMemPostingsStats(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMemPostings_Delete(t *testing.T) {
|
func TestMemPostings_Delete(t *testing.T) {
|
||||||
t.Run("some postings", func(t *testing.T) {
|
|
||||||
p := NewMemPostings()
|
p := NewMemPostings()
|
||||||
p.Add(1, labels.FromStrings("lbl1", "a"))
|
p.Add(1, labels.FromStrings("lbl1", "a"))
|
||||||
p.Add(2, labels.FromStrings("lbl1", "b"))
|
p.Add(2, labels.FromStrings("lbl1", "b"))
|
||||||
|
@ -1005,37 +1004,6 @@ func TestMemPostings_Delete(t *testing.T) {
|
||||||
expanded, err = ExpandPostings(deleted)
|
expanded, err = ExpandPostings(deleted)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
|
require.Empty(t, expanded, "expected empty postings, got %v", expanded)
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("all postings", func(t *testing.T) {
|
|
||||||
p := NewMemPostings()
|
|
||||||
p.Add(1, labels.FromStrings("lbl1", "a"))
|
|
||||||
p.Add(2, labels.FromStrings("lbl1", "b"))
|
|
||||||
p.Add(3, labels.FromStrings("lbl2", "a"))
|
|
||||||
|
|
||||||
deletedRefs := map[storage.SeriesRef]struct{}{1: {}, 2: {}, 3: {}}
|
|
||||||
affectedLabels := map[labels.Label]struct{}{
|
|
||||||
{Name: "lbl1", Value: "a"}: {},
|
|
||||||
{Name: "lbl1", Value: "b"}: {},
|
|
||||||
{Name: "lbl1", Value: "c"}: {},
|
|
||||||
}
|
|
||||||
p.Delete(deletedRefs, affectedLabels)
|
|
||||||
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
|
||||||
expanded, err := ExpandPostings(after)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Empty(t, expanded)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("nothing on empty mempostings", func(t *testing.T) {
|
|
||||||
p := NewMemPostings()
|
|
||||||
deletedRefs := map[storage.SeriesRef]struct{}{}
|
|
||||||
affectedLabels := map[labels.Label]struct{}{}
|
|
||||||
p.Delete(deletedRefs, affectedLabels)
|
|
||||||
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
|
|
||||||
expanded, err := ExpandPostings(after)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Empty(t, expanded)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BenchmarkMemPostings_Delete is quite heavy, so consider running it with
|
// BenchmarkMemPostings_Delete is quite heavy, so consider running it with
|
||||||
|
@ -1057,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
const total = 2e6
|
const total = 1e6
|
||||||
allSeries := [total]labels.Labels{}
|
allSeries := [total]labels.Labels{}
|
||||||
nameValues := make([]string, 0, 100)
|
nameValues := make([]string, 0, 100)
|
||||||
for i := 0; i < total; i++ {
|
for i := 0; i < total; i++ {
|
||||||
|
|
Loading…
Reference in New Issue