Merge pull request #169 from prometheus/muchfasterwal

wal: parallelize sample processing
pull/5805/head
Fabian Reinartz 7 years ago committed by GitHub
commit d7cd5b21ea

@ -15,6 +15,7 @@ package tsdb
import (
"math"
"runtime"
"sort"
"sync"
"sync/atomic"
@ -186,6 +187,37 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
return h, nil
}
// processWALSamples adds a partition of samples it receives to the head and passes
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) processWALSamples(
mint int64,
partition, total uint64,
input <-chan []RefSample, output chan<- []RefSample,
) (unknownRefs uint64) {
defer close(output)
for samples := range input {
for _, s := range samples {
if s.T < mint || s.Ref%total != partition {
continue
}
ms := h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
}
output <- samples
}
return unknownRefs
}
// ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error {
defer h.postings.ensureOrder()
@ -195,8 +227,32 @@ func (h *Head) ReadWAL() error {
// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs int
var unknownRefs uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
n = runtime.GOMAXPROCS(0)
firstInput = make(chan []RefSample, 300)
input = firstInput
)
for i := 0; i < n; i++ {
output := make(chan []RefSample, 300)
go func(i int, input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output)
atomic.AddUint64(&unknownRefs, unknown)
}(i, input, output)
// The output feeds the next worker goroutine. For the last worker,
// it feeds the initial input again to reuse the RefSample slices.
input = output
}
// TODO(fabxc): series entries spread between samples can starve the sample workers.
// Even with bufferd channels, this can impact startup time with lots of series churn.
// We must not pralellize series creation itself but could make the indexing asynchronous.
seriesFunc := func(series []RefSeries) {
for _, s := range series {
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
@ -207,21 +263,13 @@ func (h *Head) ReadWAL() error {
}
}
samplesFunc := func(samples []RefSample) {
for _, s := range samples {
if s.T < mint {
continue
}
ms := h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
_, chunkCreated := ms.append(s.T, s.V)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
var buf []RefSample
select {
case buf = <-input:
default:
buf = make([]RefSample, 0, len(samples)*11/10)
}
firstInput <- append(buf[:0], samples...)
}
deletesFunc := func(stones []Stone) {
for _, s := range stones {
@ -234,13 +282,18 @@ func (h *Head) ReadWAL() error {
}
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
}
err := r.Read(seriesFunc, samplesFunc, deletesFunc)
if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil {
// Signal termination to first worker and wait for last one to close its output channel.
close(firstInput)
for range input {
}
if err != nil {
return errors.Wrap(err, "consume WAL")
}
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs)
}
return nil
}
@ -1168,10 +1221,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c = s.cut(t)
chunkCreated = true
}
numSamples := c.chunk.NumSamples()
if c.maxTime >= t {
return false, chunkCreated
}
if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt {
if numSamples > samplesPerChunk/4 && t >= s.nextAt {
c = s.cut(t)
chunkCreated = true
}
@ -1179,7 +1234,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) {
c.maxTime = t
if c.chunk.NumSamples() == samplesPerChunk/4 {
if numSamples == samplesPerChunk/4 {
_, maxt := rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt)
}

@ -846,7 +846,7 @@ func (r *walReader) Read(
deletePool sync.Pool
)
donec := make(chan struct{})
datac := make(chan interface{}, 50)
datac := make(chan interface{}, 100)
go func() {
defer close(donec)

Loading…
Cancel
Save