@ -19,6 +19,7 @@ import (
"fmt"
"math"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/exemplar"
@ -936,7 +937,7 @@ func (a *headAppender) Commit() (err error) {
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs [ ] chunks . ChunkDiskMapperRef
ok , chunkCreated , mmapRefs = series . insert ( s . T , s . V , nil , nil , a . head . chunkDiskMapper , oooCapMax )
ok , chunkCreated , mmapRefs = series . insert ( s . T , s . V , nil , nil , a . head . chunkDiskMapper , oooCapMax , a . head . logger )
if chunkCreated {
r , ok := oooMmapMarkers [ series . ref ]
if ! ok || r != nil {
@ -1083,14 +1084,14 @@ func (a *headAppender) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func ( s * memSeries ) insert ( t int64 , v float64 , h * histogram . Histogram , fh * histogram . FloatHistogram , chunkDiskMapper * chunks . ChunkDiskMapper , oooCapMax int64 ) ( inserted , chunkCreated bool , mmapRefs [ ] chunks . ChunkDiskMapperRef ) {
func ( s * memSeries ) insert ( t int64 , v float64 , h * histogram . Histogram , fh * histogram . FloatHistogram , chunkDiskMapper * chunks . ChunkDiskMapper , oooCapMax int64 , logger log . Logger ) ( inserted , chunkCreated bool , mmapRefs [ ] chunks . ChunkDiskMapperRef ) {
if s . ooo == nil {
s . ooo = & memSeriesOOOFields { }
}
c := s . ooo . oooHeadChunk
if c == nil || c . chunk . NumSamples ( ) == int ( oooCapMax ) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c , mmapRefs = s . cutNewOOOHeadChunk ( t , chunkDiskMapper )
c , mmapRefs = s . cutNewOOOHeadChunk ( t , chunkDiskMapper , logger )
chunkCreated = true
}
@ -1444,9 +1445,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
}
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
// The caller must ensure that s.ooo is not nil.
func ( s * memSeries ) cutNewOOOHeadChunk ( mint int64 , chunkDiskMapper * chunks . ChunkDiskMapper ) ( * oooHeadChunk , [ ] chunks . ChunkDiskMapperRef ) {
ref := s . mmapCurrentOOOHeadChunk ( chunkDiskMapper )
// The caller must ensure that s is locked and s .ooo is not nil.
func ( s * memSeries ) cutNewOOOHeadChunk ( mint int64 , chunkDiskMapper * chunks . ChunkDiskMapper , logger log . Logger ) ( * oooHeadChunk , [ ] chunks . ChunkDiskMapperRef ) {
ref := s . mmapCurrentOOOHeadChunk ( chunkDiskMapper , logger )
s . ooo . oooHeadChunk = & oooHeadChunk {
chunk : NewOOOChunk ( ) ,
@ -1457,7 +1458,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
return s . ooo . oooHeadChunk , ref
}
func ( s * memSeries ) mmapCurrentOOOHeadChunk ( chunkDiskMapper * chunks . ChunkDiskMapper ) [ ] chunks . ChunkDiskMapperRef {
// s must be locked when calling.
func ( s * memSeries ) mmapCurrentOOOHeadChunk ( chunkDiskMapper * chunks . ChunkDiskMapper , logger log . Logger ) [ ] chunks . ChunkDiskMapperRef {
if s . ooo == nil || s . ooo . oooHeadChunk == nil {
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
return nil
@ -1469,6 +1471,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap
}
chunkRefs := make ( [ ] chunks . ChunkDiskMapperRef , 0 , 1 )
for _ , memchunk := range chks {
if len ( s . ooo . oooMmappedChunks ) >= ( oooChunkIDMask - 1 ) {
level . Error ( logger ) . Log ( "msg" , "Too many OOO chunks, dropping data" , "series" , s . lset . String ( ) )
break
}
chunkRef := chunkDiskMapper . WriteChunk ( s . ref , s . ooo . oooHeadChunk . minTime , s . ooo . oooHeadChunk . maxTime , memchunk . chunk , true , handleChunkWriteError )
chunkRefs = append ( chunkRefs , chunkRef )
s . ooo . oooMmappedChunks = append ( s . ooo . oooMmappedChunks , & mmappedChunk {