@ -656,32 +656,15 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
concurrency = h . opts . WALReplayConcurrency
processors = make ( [ ] wblSubsetProcessor , concurrency )
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , concurrency )
histogramShards = make ( [ ] [ ] histogramRecord , concurrency )
decodedCh = make ( chan interface { } , 10 )
decodeErr error
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
markersPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefMmapMarker { }
} ,
}
histogramSamplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefHistogramSample { }
} ,
}
floatHistogramSamplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefFloatHistogramSample { }
} ,
}
decodedCh = make ( chan interface { } , 10 )
decodeErr error
samplesPool zeropool . Pool [ [ ] record . RefSample ]
markersPool zeropool . Pool [ [ ] record . RefMmapMarker ]
histogramSamplesPool zeropool . Pool [ [ ] record . RefHistogramSample ]
floatHistogramSamplesPool zeropool . Pool [ [ ] record . RefFloatHistogramSample ]
)
defer func ( ) {
@ -711,11 +694,13 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
go func ( ) {
defer close ( decodedCh )
var err error
dec := record . NewDecoder ( syms )
for r . Next ( ) {
rec := r . Record ( )
switch dec . Type ( rec ) {
case record . Samples :
samples := samplesPool . Get ( ) .( [ ] record . RefSample ) [: 0 ]
samples := samplesPool . Get ( ) [: 0 ]
samples , err = dec . Samples ( rec , samples )
if err != nil {
decodeErr = & wlog . CorruptionErr {
@ -727,7 +712,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- samples
case record . MmapMarkers :
markers := markersPool . Get ( ) .( [ ] record . RefMmapMarker ) [: 0 ]
markers := markersPool . Get ( ) [: 0 ]
markers , err = dec . MmapMarkers ( rec , markers )
if err != nil {
decodeErr = & wlog . CorruptionErr {
@ -739,7 +724,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- markers
case record . HistogramSamples :
hists := histogramSamplesPool . Get ( ) .( [ ] record . RefHistogramSample ) [: 0 ]
hists := histogramSamplesPool . Get ( ) [: 0 ]
hists , err = dec . HistogramSamples ( rec , hists )
if err != nil {
decodeErr = & wlog . CorruptionErr {
@ -751,7 +736,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
decodedCh <- hists
case record . FloatHistogramSamples :
hists := floatHistogramSamplesPool . Get ( ) .( [ ] record . RefFloatHistogramSample ) [: 0 ]
hists := floatHistogramSamplesPool . Get ( ) [: 0 ]
hists , err = dec . FloatHistogramSamples ( rec , hists )
if err != nil {
decodeErr = & wlog . CorruptionErr {
@ -802,7 +787,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples [ m : ]
}
samplesPool . Put ( d )
samplesPool . Put ( v )
case [ ] record . RefMmapMarker :
markers := v
for _ , rm := range markers {
@ -857,7 +842,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples [ m : ]
}
histogramSamplesPool . Put ( v ) //nolint:staticcheck
histogramSamplesPool . Put ( v )
case [ ] record . RefFloatHistogramSample :
samples := v
// We split up the samples into chunks of 5000 samples or less.
@ -889,7 +874,7 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch
}
samples = samples [ m : ]
}
floatHistogramSamplesPool . Put ( v ) //nolint:staticcheck
floatHistogramSamplesPool . Put ( v )
default :
panic ( fmt . Errorf ( "unexpected decodedCh type: %T" , d ) )
}