Replay WAL concurrently without blocking (#10973)

* Replay WAL concurrently without blocking

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

* Resolve review comments

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
pull/11183/head
Xiaochao Dong 2 years ago committed by GitHub
parent 3196c98bc2
commit 09187fb0cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -49,7 +49,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
var unknownExemplarRefs atomic.Uint64
var unknownMetadataRefs atomic.Uint64
// Track number of series records that had overlapping m-map chunks.
var mmapOverlappingChunks uint64
var mmapOverlappingChunks atomic.Uint64
// Start workers that each process samples for a partition of the series ID space.
var (
@ -107,8 +107,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
processors[i].setup()
go func(wp *walSubsetProcessor) {
unknown := wp.processWALSamples(h)
unknown, overlapping := wp.processWALSamples(h, mmappedChunks)
unknownRefs.Add(unknown)
mmapOverlappingChunks.Add(overlapping)
wg.Done()
}(&processors[i])
}
@ -224,56 +225,12 @@ Outer:
if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref {
h.lastSeriesID.Store(uint64(walSeries.Ref))
}
idx := uint64(mSeries.ref) % uint64(n)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors[idx].waitUntilIdle()
// Lock the subset so we can modify the series object
processors[idx].mx.Lock()
mmc := mmappedChunks[walSeries.Ref]
if created {
// This is the first WAL series record for this series.
h.resetSeriesWithMMappedChunks(mSeries, mmc)
processors[idx].mx.Unlock()
continue
}
// There's already a different ref for this series.
// A duplicate series record is only possible when the old samples were already compacted into a block.
// Hence we can discard all the samples and m-mapped chunks replayed till now for this series.
multiRef[walSeries.Ref] = mSeries.ref
// Checking if the new m-mapped chunks overlap with the already existing ones.
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
if overlapsClosedInterval(
mSeries.mmappedChunks[0].minTime,
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
mmc[0].minTime,
mmc[len(mmc)-1].maxTime,
) {
mmapOverlappingChunks++
level.Debug(h.logger).Log(
"msg", "M-mapped chunks overlap on a duplicate series record",
"series", mSeries.lset.String(),
"oldref", mSeries.ref,
"oldmint", mSeries.mmappedChunks[0].minTime,
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
"newref", walSeries.Ref,
"newmint", mmc[0].minTime,
"newmaxt", mmc[len(mmc)-1].maxTime,
)
}
if !created {
multiRef[walSeries.Ref] = mSeries.ref
}
// Replacing m-mapped chunks with the new ones (could be empty).
h.resetSeriesWithMMappedChunks(mSeries, mmc)
processors[idx].mx.Unlock()
idx := uint64(mSeries.ref) % uint64(n)
processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v)
@ -299,7 +256,7 @@ Outer:
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < n; i++ {
processors[i].input <- shards[i]
processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
}
samples = samples[m:]
}
@ -370,14 +327,38 @@ Outer:
if unknownRefs.Load() > 0 || unknownExemplarRefs.Load() > 0 || unknownMetadataRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "samples", unknownRefs.Load(), "exemplars", unknownExemplarRefs.Load(), "metadata", unknownMetadataRefs.Load())
}
if mmapOverlappingChunks > 0 {
level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", mmapOverlappingChunks)
if count := mmapOverlappingChunks.Load(); count > 0 {
level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count)
}
return nil
}
// resetSeriesWithMMappedChunks is only used during the WAL replay.
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
if mSeries.ref != walSeriesRef {
// Checking if the new m-mapped chunks overlap with the already existing ones.
if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
if overlapsClosedInterval(
mSeries.mmappedChunks[0].minTime,
mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
mmc[0].minTime,
mmc[len(mmc)-1].maxTime,
) {
level.Debug(h.logger).Log(
"msg", "M-mapped chunks overlap on a duplicate series record",
"series", mSeries.lset.String(),
"oldref", mSeries.ref,
"oldmint", mSeries.mmappedChunks[0].minTime,
"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
"newref", walSeriesRef,
"newmint", mmc[0].minTime,
"newmaxt", mmc[len(mmc)-1].maxTime,
)
overlapped = true
}
}
}
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
@ -394,17 +375,23 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc []*mmappedCh
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
return
}
type walSubsetProcessor struct {
mx sync.Mutex // Take this lock while modifying series in the subset.
input chan []record.RefSample
input chan walSubsetProcessorInputItem
output chan []record.RefSample
}
type walSubsetProcessorInputItem struct {
samples []record.RefSample
existingSeries *memSeries
walSeriesRef chunks.HeadSeriesRef
}
func (wp *walSubsetProcessor) setup() {
wp.output = make(chan []record.RefSample, 300)
wp.input = make(chan []record.RefSample, 300)
wp.input = make(chan walSubsetProcessorInputItem, 300)
}
func (wp *walSubsetProcessor) closeAndDrain() {
@ -426,15 +413,22 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
// processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, mmapOverlappingChunks uint64) {
defer close(wp.output)
minValidTime := h.minValidTime.Load()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range wp.input {
wp.mx.Lock()
for _, s := range samples {
for in := range wp.input {
if in.existingSeries != nil {
mmc := mmappedChunks[in.walSeriesRef]
if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, in.walSeriesRef) {
mmapOverlappingChunks++
}
continue
}
for _, s := range in.samples {
if s.T < minValidTime {
continue
}
@ -457,26 +451,14 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
mint = s.T
}
}
wp.mx.Unlock()
wp.output <- samples
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs
}
func (wp *walSubsetProcessor) waitUntilIdle() {
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
default:
}
wp.input <- []record.RefSample{}
for len(wp.input) != 0 {
select {
case <-wp.output: // Allow output side to drain to avoid deadlock.
case <-time.After(10 * time.Microsecond):
case wp.output <- in.samples:
default:
}
}
h.updateMinMaxTime(mint, maxt)
return unknownRefs, mmapOverlappingChunks
}
const (

Loading…
Cancel
Save