Merge pull request #15380 from bboreham/improve-loadwbl

[BUGFIX] TSDB: Apply fixes from loadWAL to loadWBL
pull/15240/merge
Bryan Boreham 2024-11-25 17:31:49 +00:00 committed by GitHub
commit dd0252a774
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 15 additions and 30 deletions

View File

@ -656,32 +656,15 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency)
dec record.Decoder
shards = make([][]record.RefSample, concurrency)
histogramShards = make([][]histogramRecord, concurrency)
decodedCh = make(chan interface{}, 10)
decodeErr error
samplesPool = sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
markersPool = sync.Pool{
New: func() interface{} {
return []record.RefMmapMarker{}
},
}
histogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefHistogramSample{}
},
}
floatHistogramSamplesPool = sync.Pool{
New: func() interface{} {
return []record.RefFloatHistogramSample{}
},
}
decodedCh = make(chan interface{}, 10)
decodeErr error
samplesPool zeropool.Pool[[]record.RefSample]
markersPool zeropool.Pool[[]record.RefMmapMarker]
histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample]
)
defer func() {
@ -711,11 +694,13 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
go func() {
defer close(decodedCh)
var err error
dec := record.NewDecoder(syms)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples := samplesPool.Get()[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -727,7 +712,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- samples
case record.MmapMarkers:
markers := markersPool.Get().([]record.RefMmapMarker)[:0]
markers := markersPool.Get()[:0]
markers, err = dec.MmapMarkers(rec, markers)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -739,7 +724,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- markers
case record.HistogramSamples:
hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0]
hists := histogramSamplesPool.Get()[:0]
hists, err = dec.HistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -751,7 +736,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- hists
case record.FloatHistogramSamples:
hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0]
hists := floatHistogramSamplesPool.Get()[:0]
hists, err = dec.FloatHistogramSamples(rec, hists)
if err != nil {
decodeErr = &wlog.CorruptionErr{
@ -802,7 +787,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
samplesPool.Put(d)
samplesPool.Put(v)
case []record.RefMmapMarker:
markers := v
for _, rm := range markers {
@ -857,7 +842,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
histogramSamplesPool.Put(v) //nolint:staticcheck
histogramSamplesPool.Put(v)
case []record.RefFloatHistogramSample:
samples := v
// We split up the samples into chunks of 5000 samples or less.
@ -889,7 +874,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples[m:]
}
floatHistogramSamplesPool.Put(v) //nolint:staticcheck
floatHistogramSamplesPool.Put(v)
default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
}