@ -52,18 +52,15 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
var mmapOverlappingChunks uint64
// Start workers that each process samples for a partition of the series ID space.
// They are connected through a ring of channels which ensures that all sample batches
// read from the WAL are processed in order.
var (
wg sync . WaitGroup
n = runtime . GOMAXPROCS ( 0 )
inputs = make ( [ ] chan [ ] record . RefSample , n )
outputs = make ( [ ] chan [ ] record . RefSample , n )
exemplarsInput chan record . RefExemplar
histogramsInput chan record . RefHistogram
wg sync . WaitGroup
n = runtime . GOMAXPROCS ( 0 )
processors = make ( [ ] walSubsetProcessor , n )
exemplarsInput chan record . RefExemplar
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
histogramShards = make ( [ ] [ ] record . RefHistogram , n )
decoded = make ( chan interface { } , 10 )
decodeErr , seriesCreationErr error
@ -99,26 +96,23 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
_ , ok := err . ( * wal . CorruptionErr )
if ok || seriesCreationErr != nil {
for i := 0 ; i < n ; i ++ {
close ( inputs [ i ] )
for range outputs [ i ] {
}
processors [ i ] . closeAndDrain ( )
}
close ( exemplarsInput )
close ( histogramsInput )
wg . Wait ( )
}
} ( )
wg . Add ( n )
for i := 0 ; i < n ; i ++ {
outputs [ i ] = make ( chan [ ] record . RefSample , 300 )
inputs [ i ] = make ( chan [ ] record . RefSample , 300 )
processors [ i ] . setup ( )
go func ( input <- chan [ ] record . RefSample , output chan <- [ ] record . RefSample ) {
unknown := h . processWALSamples ( h . minValidTime . Load ( ) , input , output )
go func ( wp * walSubsetProcessor ) {
unknown , unknownHistograms := wp . processWALSamples ( h )
unknownRefs . Add ( unknown )
unknownHistogramRefs . Add ( unknownHistograms )
wg . Done ( )
} ( inputs [ i ] , output s[ i ] )
} ( & processor s[ i ] )
}
wg . Add ( 1 )
@ -145,47 +139,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.H
}
} ( exemplarsInput )
wg . Add ( 1 )
histogramsInput = make ( chan record . RefHistogram , 300 )
go func ( input <- chan record . RefHistogram ) {
defer wg . Done ( )
mint , maxt := int64 ( math . MaxInt64 ) , int64 ( math . MinInt64 )
for rh := range input {
ms := h . series . getByID ( rh . Ref )
if ms == nil {
unknownHistogramRefs . Inc ( )
continue
}
if ms . head ( ) == nil {
// First histogram for the series. Count this in metrics.
ms . isHistogramSeries = true
}
if rh . T < h . minValidTime . Load ( ) {
continue
}
// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
// replaying the WAL, so lets just log the error if it's not that type.
_ , chunkCreated := ms . appendHistogram ( rh . T , rh . H , 0 , h . chunkDiskMapper )
if chunkCreated {
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if rh . T > maxt {
maxt = rh . T
}
if rh . T < mint {
mint = rh . T
}
}
h . updateMinMaxTime ( mint , maxt )
} ( histogramsInput )
go func ( ) {
defer close ( decoded )
for r . Next ( ) {
@ -273,11 +226,20 @@ Outer:
h . lastSeriesID . Store ( uint64 ( walSeries . Ref ) )
}
idx := uint64 ( mSeries . ref ) % uint64 ( n )
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors [ idx ] . waitUntilIdle ( )
// Lock the subset so we can modify the series object
processors [ idx ] . mx . Lock ( )
mmc := mmappedChunks [ walSeries . Ref ]
if created {
// This is the first WAL series record for this series.
h . setMMappedChunks ( mSeries , mmc )
h . resetSeriesWithMMappedChunks ( mSeries , mmc )
processors [ idx ] . mx . Unlock ( )
continue
}
@ -287,23 +249,6 @@ Outer:
multiRef [ walSeries . Ref ] = mSeries . ref
idx := uint64 ( mSeries . ref ) % uint64 ( n )
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
select {
case <- outputs [ idx ] : // allow output side to drain to avoid deadlock
default :
}
inputs [ idx ] <- [ ] record . RefSample { }
for len ( inputs [ idx ] ) != 0 {
time . Sleep ( 1 * time . Millisecond )
select {
case <- outputs [ idx ] : // allow output side to drain to avoid deadlock
default :
}
}
// Checking if the new m-mapped chunks overlap with the already existing ones.
if len ( mSeries . mmappedChunks ) > 0 && len ( mmc ) > 0 {
if overlapsClosedInterval (
@ -327,12 +272,9 @@ Outer:
}
// Replacing m-mapped chunks with the new ones (could be empty).
h . setMMappedChunks ( mSeries , mmc )
h . re setSeriesWith MMappedChunks( mSeries , mmc )
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries . nextAt = 0
mSeries . headChunk = nil
mSeries . app = nil
processors [ idx ] . mx . Unlock ( )
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool . Put ( v )
@ -348,12 +290,7 @@ Outer:
m = len ( samples )
}
for i := 0 ; i < n ; i ++ {
var buf [ ] record . RefSample
select {
case buf = <- outputs [ i ] :
default :
}
shards [ i ] = buf [ : 0 ]
shards [ i ] = processors [ i ] . reuseBuf ( )
}
for _ , sam := range samples [ : m ] {
if r , ok := multiRef [ sam . Ref ] ; ok {
@ -363,7 +300,7 @@ Outer:
shards [ mod ] = append ( shards [ mod ] , sam )
}
for i := 0 ; i < n ; i ++ {
input s[ i ] <- shards [ i ]
processor s[ i ] . input <- shards [ i ]
}
samples = samples [ m : ]
}
@ -391,9 +328,30 @@ Outer:
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
exemplarsPool . Put ( v )
case [ ] record . RefHistogram :
// TODO: split into multiple slices and have multiple workers processing the histograms like we do for samples.
for _ , rh := range v {
histogramsInput <- rh
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len ( samples ) > 0 {
m := 5000
if len ( samples ) < m {
m = len ( samples )
}
for i := 0 ; i < n ; i ++ {
histogramShards [ i ] = processors [ i ] . reuseHistogramBuf ( )
}
for _ , sam := range samples [ : m ] {
if r , ok := multiRef [ sam . Ref ] ; ok {
sam . Ref = r
}
mod := uint64 ( sam . Ref ) % uint64 ( n )
histogramShards [ mod ] = append ( histogramShards [ mod ] , sam )
}
for i := 0 ; i < n ; i ++ {
processors [ i ] . input <- histogramShards [ i ]
}
samples = samples [ m : ]
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
histogramsPool . Put ( v )
@ -414,12 +372,9 @@ Outer:
// Signal termination to each worker and wait for it to close its output channel.
for i := 0 ; i < n ; i ++ {
close ( inputs [ i ] )
for range outputs [ i ] {
}
processors [ i ] . closeAndDrain ( )
}
close ( exemplarsInput )
close ( histogramsInput )
wg . Wait ( )
if r . Err ( ) != nil {
@ -435,7 +390,8 @@ Outer:
return nil
}
func ( h * Head ) setMMappedChunks ( mSeries * memSeries , mmc [ ] * mmappedChunk ) {
// resetSeriesWithMMappedChunks is only used during the WAL replay.
func ( h * Head ) resetSeriesWithMMappedChunks ( mSeries * memSeries , mmc [ ] * mmappedChunk ) {
h . metrics . chunksCreated . Add ( float64 ( len ( mmc ) ) )
h . metrics . chunksRemoved . Add ( float64 ( len ( mSeries . mmappedChunks ) ) )
h . metrics . chunks . Add ( float64 ( len ( mmc ) - len ( mSeries . mmappedChunks ) ) )
@ -447,48 +403,150 @@ func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
mSeries . mmMaxTime = mmc [ len ( mmc ) - 1 ] . maxTime
h . updateMinMaxTime ( mmc [ 0 ] . minTime , mSeries . mmMaxTime )
}
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries . nextAt = 0
mSeries . headChunk = nil
mSeries . app = nil
}
type walSubsetProcessor struct {
mx sync . Mutex // Take this lock while modifying series in the subset.
input chan interface { } // Either []record.RefSample or []record.RefHistogram.
output chan [ ] record . RefSample
histogramsOutput chan [ ] record . RefHistogram
}
func ( wp * walSubsetProcessor ) setup ( ) {
wp . output = make ( chan [ ] record . RefSample , 300 )
wp . input = make ( chan interface { } , 300 )
wp . histogramsOutput = make ( chan [ ] record . RefHistogram , 300 )
}
func ( wp * walSubsetProcessor ) closeAndDrain ( ) {
close ( wp . input )
for range wp . output {
}
for range wp . histogramsOutput {
}
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func ( wp * walSubsetProcessor ) reuseBuf ( ) [ ] record . RefSample {
select {
case buf := <- wp . output :
return buf [ : 0 ]
default :
}
return nil
}
// If there is a buffer in the output chan, return it for reuse, otherwise return nil.
func ( wp * walSubsetProcessor ) reuseHistogramBuf ( ) [ ] record . RefHistogram {
select {
case buf := <- wp . histogramsOutput :
return buf [ : 0 ]
default :
}
return nil
}
// processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func ( h * Head ) processWALSamples (
minValidTime int64 ,
input <- chan [ ] record . RefSample , output chan <- [ ] record . RefSample ,
) ( unknownRefs uint64 ) {
defer close ( output )
func ( wp * walSubsetProcessor ) processWALSamples ( h * Head ) ( unknownRefs , unknownHistogramRefs uint64 ) {
defer close ( wp . output )
defer close ( wp . histogramsOutput )
minValidTime := h . minValidTime . Load ( )
mint , maxt := int64 ( math . MaxInt64 ) , int64 ( math . MinInt64 )
for samples := range input {
for _ , s := range samples {
if s . T < minValidTime {
continue
}
ms := h . series . getByID ( s . Ref )
if ms == nil {
unknownRefs ++
continue
}
if s . T <= ms . mmMaxTime {
continue
}
if _ , chunkCreated := ms . append ( s . T , s . V , 0 , h . chunkDiskMapper ) ; chunkCreated {
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if s . T > maxt {
maxt = s . T
for v := range wp . input {
wp . mx . Lock ( )
switch samples := v . ( type ) {
case [ ] record . RefSample :
for _ , s := range samples {
if s . T < minValidTime {
continue
}
ms := h . series . getByID ( s . Ref )
if ms == nil {
unknownRefs ++
continue
}
ms . isHistogramSeries = false
if s . T <= ms . mmMaxTime {
continue
}
if _ , chunkCreated := ms . append ( s . T , s . V , 0 , h . chunkDiskMapper ) ; chunkCreated {
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if s . T > maxt {
maxt = s . T
}
if s . T < mint {
mint = s . T
}
}
if s . T < mint {
mint = s . T
wp . output <- samples
case [ ] record . RefHistogram :
for _ , s := range samples {
if s . T < minValidTime {
continue
}
ms := h . series . getByID ( s . Ref )
if ms == nil {
unknownHistogramRefs ++
continue
}
ms . isHistogramSeries = true
if s . T <= ms . mmMaxTime {
continue
}
if _ , chunkCreated := ms . appendHistogram ( s . T , s . H , 0 , h . chunkDiskMapper ) ; chunkCreated {
h . metrics . chunksCreated . Inc ( )
h . metrics . chunks . Inc ( )
}
if s . T > maxt {
maxt = s . T
}
if s . T < mint {
mint = s . T
}
}
wp . histogramsOutput <- samples
}
output <- samples
wp . mx . Unlock ( )
}
h . updateMinMaxTime ( mint , maxt )
return unknownRefs
return unknownRefs , unknownHistogramRefs
}
func ( wp * walSubsetProcessor ) waitUntilIdle ( ) {
select {
case <- wp . output : // Allow output side to drain to avoid deadlock.
default :
}
select {
case <- wp . histogramsOutput : // Allow output side to drain to avoid deadlock.
default :
}
wp . input <- [ ] record . RefSample { }
for len ( wp . input ) != 0 {
time . Sleep ( 1 * time . Millisecond )
select {
case <- wp . output : // Allow output side to drain to avoid deadlock.
default :
}
select {
case <- wp . histogramsOutput : // Allow output side to drain to avoid deadlock.
default :
}
}
}
const (