diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index df0695c57..ad0e82f04 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -49,7 +49,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H var unknownExemplarRefs atomic.Uint64 var unknownMetadataRefs atomic.Uint64 // Track number of series records that had overlapping m-map chunks. - var mmapOverlappingChunks uint64 + var mmapOverlappingChunks atomic.Uint64 // Start workers that each process samples for a partition of the series ID space. var ( @@ -107,8 +107,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H processors[i].setup() go func(wp *walSubsetProcessor) { - unknown := wp.processWALSamples(h) + unknown, overlapping := wp.processWALSamples(h, mmappedChunks) unknownRefs.Add(unknown) + mmapOverlappingChunks.Add(overlapping) wg.Done() }(&processors[i]) } @@ -224,56 +225,12 @@ Outer: if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref { h.lastSeriesID.Store(uint64(walSeries.Ref)) } - - idx := uint64(mSeries.ref) % uint64(n) - // It is possible that some old sample is being processed in processWALSamples that - // could cause race below. So we wait for the goroutine to empty input the buffer and finish - // processing all old samples after emptying the buffer. - processors[idx].waitUntilIdle() - // Lock the subset so we can modify the series object - processors[idx].mx.Lock() - - mmc := mmappedChunks[walSeries.Ref] - - if created { - // This is the first WAL series record for this series. - h.resetSeriesWithMMappedChunks(mSeries, mmc) - processors[idx].mx.Unlock() - continue - } - - // There's already a different ref for this series. - // A duplicate series record is only possible when the old samples were already compacted into a block. - // Hence we can discard all the samples and m-mapped chunks replayed till now for this series. - - multiRef[walSeries.Ref] = mSeries.ref - - // Checking if the new m-mapped chunks overlap with the already existing ones. - if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { - if overlapsClosedInterval( - mSeries.mmappedChunks[0].minTime, - mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, - mmc[0].minTime, - mmc[len(mmc)-1].maxTime, - ) { - mmapOverlappingChunks++ - level.Debug(h.logger).Log( - "msg", "M-mapped chunks overlap on a duplicate series record", - "series", mSeries.lset.String(), - "oldref", mSeries.ref, - "oldmint", mSeries.mmappedChunks[0].minTime, - "oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, - "newref", walSeries.Ref, - "newmint", mmc[0].minTime, - "newmaxt", mmc[len(mmc)-1].maxTime, - ) - } + if !created { + multiRef[walSeries.Ref] = mSeries.ref } - // Replacing m-mapped chunks with the new ones (could be empty). - h.resetSeriesWithMMappedChunks(mSeries, mmc) - - processors[idx].mx.Unlock() + idx := uint64(mSeries.ref) % uint64(n) + processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries} } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. seriesPool.Put(v) @@ -299,7 +256,7 @@ Outer: shards[mod] = append(shards[mod], sam) } for i := 0; i < n; i++ { - processors[i].input <- shards[i] + processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]} } samples = samples[m:] } @@ -370,14 +327,38 @@ Outer: if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 || unknownMetadataRefs.Load() > 0 { level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "metadata", unknownMetadataRefs.Load()) } - if mmapOverlappingChunks > 0 { - level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", mmapOverlappingChunks) + if count := mmapOverlappingChunks.Load(); count > 0 { + level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count) } return nil } // resetSeriesWithMMappedChunks is only used during the WAL replay. -func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { +func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) { + if mSeries.ref != walSeriesRef { + // Checking if the new m-mapped chunks overlap with the already existing ones. + if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 { + if overlapsClosedInterval( + mSeries.mmappedChunks[0].minTime, + mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, + mmc[0].minTime, + mmc[len(mmc)-1].maxTime, + ) { + level.Debug(h.logger).Log( + "msg", "M-mapped chunks overlap on a duplicate series record", + "series", mSeries.lset.String(), + "oldref", mSeries.ref, + "oldmint", mSeries.mmappedChunks[0].minTime, + "oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime, + "newref", walSeriesRef, + "newmint", mmc[0].minTime, + "newmaxt", mmc[len(mmc)-1].maxTime, + ) + overlapped = true + } + } + } + h.metrics.chunksCreated.Add(float64(len(mmc))) h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) @@ -394,17 +375,23 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedCh mSeries.nextAt = 0 mSeries.headChunk = nil mSeries.app = nil + return } type walSubsetProcessor struct { - mx sync.Mutex // Take this lock while modifying series in the subset. - input chan []record.RefSample + input chan walSubsetProcessorInputItem output chan []record.RefSample } +type walSubsetProcessorInputItem struct { + samples []record.RefSample + existingSeries *memSeries + walSeriesRef chunks.HeadSeriesRef +} + func (wp *walSubsetProcessor) setup() { wp.output = make(chan []record.RefSample, 300) - wp.input = make(chan []record.RefSample, 300) + wp.input = make(chan walSubsetProcessorInputItem, 300) } func (wp *walSubsetProcessor) closeAndDrain() { @@ -426,15 +413,22 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. -func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { +func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, mmapOverlappingChunks uint64) { defer close(wp.output) minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) - for samples := range wp.input { - wp.mx.Lock() - for _, s := range samples { + for in := range wp.input { + if in.existingSeries != nil { + mmc := mmappedChunks[in.walSeriesRef] + if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, in.walSeriesRef) { + mmapOverlappingChunks++ + } + continue + } + + for _, s := range in.samples { if s.T < minValidTime { continue } @@ -457,26 +451,14 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { mint = s.T } } - wp.mx.Unlock() - wp.output <- samples - } - h.updateMinMaxTime(mint, maxt) - - return unknownRefs -} - -func (wp *walSubsetProcessor) waitUntilIdle() { - select { - case <-wp.output: // Allow output side to drain to avoid deadlock. - default: - } - wp.input <- []record.RefSample{} - for len(wp.input) != 0 { select { - case <-wp.output: // Allow output side to drain to avoid deadlock. - case <-time.After(10 * time.Microsecond): + case wp.output <- in.samples: + default: } } + h.updateMinMaxTime(mint, maxt) + + return unknownRefs, mmapOverlappingChunks } const (