diff --git a/head.go b/head.go index 1347b75d9..22221ac73 100644 --- a/head.go +++ b/head.go @@ -15,6 +15,7 @@ package tsdb import ( "math" + "runtime" "sort" "sync" "sync/atomic" @@ -186,6 +187,37 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( return h, nil } +// processWALSamples adds a partition of samples it receives to the head and passes +// them on to other workers. +// Samples before the mint timestamp are discarded. +func (h *Head) processWALSamples( + mint int64, + partition, total uint64, + input <-chan []RefSample, output chan<- []RefSample, +) (unknownRefs uint64) { + defer close(output) + + for samples := range input { + for _, s := range samples { + if s.T < mint || s.Ref%total != partition { + continue + } + ms := h.series.getByID(s.Ref) + if ms == nil { + unknownRefs++ + continue + } + _, chunkCreated := ms.append(s.T, s.V) + if chunkCreated { + h.metrics.chunksCreated.Inc() + h.metrics.chunks.Inc() + } + } + output <- samples + } + return unknownRefs +} + // ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { defer h.postings.ensureOrder() @@ -195,8 +227,32 @@ func (h *Head) ReadWAL() error { // Track number of samples that referenced a series we don't know about // for error reporting. - var unknownRefs int + var unknownRefs uint64 + + // Start workers that each process samples for a partition of the series ID space. + // They are connected through a ring of channels which ensures that all sample batches + // read from the WAL are processed in order. + var ( + n = runtime.GOMAXPROCS(0) + firstInput = make(chan []RefSample, 300) + input = firstInput + ) + for i := 0; i < n; i++ { + output := make(chan []RefSample, 300) + + go func(i int, input <-chan []RefSample, output chan<- []RefSample) { + unknown := h.processWALSamples(mint, uint64(i), uint64(n), input, output) + atomic.AddUint64(&unknownRefs, unknown) + }(i, input, output) + + // The output feeds the next worker goroutine. For the last worker, + // it feeds the initial input again to reuse the RefSample slices. + input = output + } + // TODO(fabxc): series entries spread between samples can starve the sample workers. + // Even with bufferd channels, this can impact startup time with lots of series churn. + // We must not pralellize series creation itself but could make the indexing asynchronous. seriesFunc := func(series []RefSeries) { for _, s := range series { h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels) @@ -207,21 +263,13 @@ func (h *Head) ReadWAL() error { } } samplesFunc := func(samples []RefSample) { - for _, s := range samples { - if s.T < mint { - continue - } - ms := h.series.getByID(s.Ref) - if ms == nil { - unknownRefs++ - continue - } - _, chunkCreated := ms.append(s.T, s.V) - if chunkCreated { - h.metrics.chunksCreated.Inc() - h.metrics.chunks.Inc() - } + var buf []RefSample + select { + case buf = <-input: + default: + buf = make([]RefSample, 0, len(samples)*11/10) } + firstInput <- append(buf[:0], samples...) } deletesFunc := func(stones []Stone) { for _, s := range stones { @@ -234,13 +282,18 @@ func (h *Head) ReadWAL() error { } } - if unknownRefs > 0 { - level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) - } + err := r.Read(seriesFunc, samplesFunc, deletesFunc) - if err := r.Read(seriesFunc, samplesFunc, deletesFunc); err != nil { + // Signal termination to first worker and wait for last one to close its output channel. + close(firstInput) + for range input { + } + if err != nil { return errors.Wrap(err, "consume WAL") } + if unknownRefs > 0 { + level.Warn(h.logger).Log("msg", "unknown series references in WAL samples", "count", unknownRefs) + } return nil } @@ -1168,10 +1221,12 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c = s.cut(t) chunkCreated = true } + numSamples := c.chunk.NumSamples() + if c.maxTime >= t { return false, chunkCreated } - if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { + if numSamples > samplesPerChunk/4 && t >= s.nextAt { c = s.cut(t) chunkCreated = true } @@ -1179,7 +1234,7 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { c.maxTime = t - if c.chunk.NumSamples() == samplesPerChunk/4 { + if numSamples == samplesPerChunk/4 { _, maxt := rangeForTimestamp(c.minTime, s.chunkRange) s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxt) } diff --git a/wal.go b/wal.go index 9dc45c608..850ed9c9f 100644 --- a/wal.go +++ b/wal.go @@ -846,7 +846,7 @@ func (r *walReader) Read( deletePool sync.Pool ) donec := make(chan struct{}) - datac := make(chan interface{}, 50) + datac := make(chan interface{}, 100) go func() { defer close(donec)