diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 8103926dc..6744d582a 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -656,32 +656,15 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch concurrency = h.opts.WALReplayConcurrency processors = make([]wblSubsetProcessor, concurrency) - dec record.Decoder shards = make([][]record.RefSample, concurrency) histogramShards = make([][]histogramRecord, concurrency) - decodedCh = make(chan interface{}, 10) - decodeErr error - samplesPool = sync.Pool{ - New: func() interface{} { - return []record.RefSample{} - }, - } - markersPool = sync.Pool{ - New: func() interface{} { - return []record.RefMmapMarker{} - }, - } - histogramSamplesPool = sync.Pool{ - New: func() interface{} { - return []record.RefHistogramSample{} - }, - } - floatHistogramSamplesPool = sync.Pool{ - New: func() interface{} { - return []record.RefFloatHistogramSample{} - }, - } + decodedCh = make(chan interface{}, 10) + decodeErr error + samplesPool zeropool.Pool[[]record.RefSample] + markersPool zeropool.Pool[[]record.RefMmapMarker] + histogramSamplesPool zeropool.Pool[[]record.RefHistogramSample] + floatHistogramSamplesPool zeropool.Pool[[]record.RefFloatHistogramSample] ) defer func() { @@ -711,11 +694,13 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch go func() { defer close(decodedCh) + var err error + dec := record.NewDecoder(syms) for r.Next() { rec := r.Record() switch dec.Type(rec) { case record.Samples: - samples := samplesPool.Get().([]record.RefSample)[:0] + samples := samplesPool.Get()[:0] samples, err = dec.Samples(rec, samples) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -727,7 +712,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- samples case record.MmapMarkers: - markers := markersPool.Get().([]record.RefMmapMarker)[:0] + markers := markersPool.Get()[:0] markers, err = dec.MmapMarkers(rec, markers) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -739,7 +724,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- markers case record.HistogramSamples: - hists := histogramSamplesPool.Get().([]record.RefHistogramSample)[:0] + hists := histogramSamplesPool.Get()[:0] hists, err = dec.HistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -751,7 +736,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } decodedCh <- hists case record.FloatHistogramSamples: - hists := floatHistogramSamplesPool.Get().([]record.RefFloatHistogramSample)[:0] + hists := floatHistogramSamplesPool.Get()[:0] hists, err = dec.FloatHistogramSamples(rec, hists) if err != nil { decodeErr = &wlog.CorruptionErr{ @@ -802,7 +787,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - samplesPool.Put(d) + samplesPool.Put(v) case []record.RefMmapMarker: markers := v for _, rm := range markers { @@ -857,7 +842,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - histogramSamplesPool.Put(v) //nolint:staticcheck + histogramSamplesPool.Put(v) case []record.RefFloatHistogramSample: samples := v // We split up the samples into chunks of 5000 samples or less. @@ -889,7 +874,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch } samples = samples[m:] } - floatHistogramSamplesPool.Put(v) //nolint:staticcheck + floatHistogramSamplesPool.Put(v) default: panic(fmt.Errorf("unexpected decodedCh type: %T", d)) }