From 512c67ec26e764e7adb4d2746ecf71d2222701f5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 12 Aug 2024 18:49:00 +0100 Subject: [PATCH] TSDB: Never go over maximum number of OOO chunks In `mmapCurrentOOOHeadChunk`, check if the number is at the maximum and drop the data with an error log. This is not expected to happen as the maximum is over 8 million; that's 8 years of 1 sample every second. Signed-off-by: Bryan Boreham --- tsdb/head_append.go | 20 +++++++++++++------- tsdb/head_wal.go | 2 +- tsdb/ooo_head_read.go | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 59681b8da..b66ac7278 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,7 @@ import ( "fmt" "math" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/exemplar" @@ -936,7 +937,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax) + ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax, a.head.logger) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1083,14 +1084,14 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger log.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper) + c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger) chunkCreated = true } @@ -1444,9 +1445,9 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange } // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. -// The caller must ensure that s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { - ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper) +// The caller must ensure that s is locked and s.ooo is not nil. +func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { + ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger) s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), @@ -1457,7 +1458,8 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk return s.ooo.oooHeadChunk, ref } -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef { +// s must be locked when calling. +func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger log.Logger) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { // OOO is not enabled or there is no head chunk, so nothing to m-map here. return nil @@ -1469,6 +1471,10 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMap } chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1) for _, memchunk := range chks { + if len(s.ooo.oooMmappedChunks) >= (oooChunkIDMask - 1) { + level.Error(logger).Log("msg", "Too many OOO chunks, dropping data", "series", s.lset.String()) + break + } chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError) chunkRefs = append(chunkRefs, chunkRef) s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{ diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 85b0c656d..7397bbf41 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -890,7 +890,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { unknownRefs++ continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax) + ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 47e2efb86..55e241fd9 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -340,7 +340,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, } var lastMmapRef chunks.ChunkDiskMapperRef - mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper) + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger) if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}