Extract processWithBoundedParallelismAndConsistentWorkers

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
pull/14975/head
Oleg Zaytsev 2 months ago
parent ccd0308abc
commit 4fd2556baa
No known key found for this signature in database
GPG Key ID: 7E9FE9FD48F512EF

@ -300,58 +300,69 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma
// Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it.
deleteLabelNames := make(chan string, len(p.m)) deleteLabelNames := make(chan string, len(p.m))
process := func(l labels.Label) { process, wait := processWithBoundedParallelismAndConsistentWorkers(
orig := p.m[l.Name][l.Value] runtime.GOMAXPROCS(0),
repl := make([]storage.SeriesRef, 0, len(orig)) func(l labels.Label) uint64 { return xxhash.Sum64String(l.Name) },
for _, id := range orig { func(l labels.Label) {
if _, ok := deleted[id]; !ok { orig := p.m[l.Name][l.Value]
repl = append(repl, id) repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
} }
} if len(repl) > 0 {
if len(repl) > 0 { p.m[l.Name][l.Value] = repl
p.m[l.Name][l.Value] = repl } else {
} else { delete(p.m[l.Name], l.Value)
delete(p.m[l.Name], l.Value) if len(p.m[l.Name]) == 0 {
if len(p.m[l.Name]) == 0 { // Delete the key if we removed all values.
// Delete the key if we removed all values. deleteLabelNames <- l.Name
deleteLabelNames <- l.Name }
} }
} },
)
for l := range affected {
process(l)
} }
process(allPostingsKey)
wait()
// Create GOMAXPROCS workers. // Close deleteLabelNames channel and delete the label names requested.
wg := sync.WaitGroup{} close(deleteLabelNames)
jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) for name := range deleteLabelNames {
for i := range jobs { delete(p.m, name)
jobs[i] = make(chan labels.Label, 128) }
}
// processWithBoundedParallelismAndConsistentWorkers will call f() with bounded parallelism,
// making sure that elements with same hash(T) will always be processed by the same worker.
// Call process() to add more jobs to process, and once finished adding, call wait() to ensure that all jobs are processed.
func processWithBoundedParallelismAndConsistentWorkers[T any](workers int, hash func(T) uint64, f func(T)) (process func(T), wait func()) {
wg := &sync.WaitGroup{}
jobs := make([]chan T, workers)
for i := 0; i < workers; i++ {
wg.Add(1) wg.Add(1)
go func(jobs chan labels.Label) { jobs[i] = make(chan T, 128)
go func(jobs <-chan T) {
defer wg.Done() defer wg.Done()
for l := range jobs { for l := range jobs {
process(l) f(l)
} }
}(jobs[i]) }(jobs[i])
} }
// Process all affected labels and the allPostingsKey. process = func(job T) {
for l := range affected { jobs[hash(job)%uint64(workers)] <- job
j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs)))
jobs[j] <- l
}
j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs)))
jobs[j] <- allPostingsKey
// Close jobs channels and wait all workers to finish.
for i := range jobs {
close(jobs[i])
} }
wg.Wait() wait = func() {
for i := range jobs {
// Close deleteLabelNames channel and delete the label names requested. close(jobs[i])
close(deleteLabelNames) }
for name := range deleteLabelNames { wg.Wait()
delete(p.m, name)
} }
return process, wait
} }
// Iter calls f for each postings list. It aborts if f returns an error and returns it. // Iter calls f for each postings list. It aborts if f returns an error and returns it.

Loading…
Cancel
Save