tsdb: prepare inserting native histograms into OOO head

Rename a variable.
Add parameters to memSeries.insert function.

No effect on how float samples are handled.

Related to #14546

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
pull/14671/head
György Krajcsovits 3 months ago
parent c2bc6cfe97
commit 41656162fc

@ -840,7 +840,7 @@ func (a *headAppender) Commit() (err error) {
floatsAppended = len(a.samples) floatsAppended = len(a.samples)
histogramsAppended = len(a.histograms) + len(a.floatHistograms) histogramsAppended = len(a.histograms) + len(a.floatHistograms)
// number of samples out of order but accepted: with ooo enabled and within time window // number of samples out of order but accepted: with ooo enabled and within time window
floatOOOAccepted int oooFloatsAccepted int
// number of samples rejected due to: out of order but OOO support disabled. // number of samples rejected due to: out of order but OOO support disabled.
floatOOORejected int floatOOORejected int
histoOOORejected int histoOOORejected int
@ -936,7 +936,7 @@ func (a *headAppender) Commit() (err error) {
// Sample is OOO and OOO handling is enabled // Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance. // and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef var mmapRefs []chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated { if chunkCreated {
r, ok := oooMmapMarkers[series.ref] r, ok := oooMmapMarkers[series.ref]
if !ok || r != nil { if !ok || r != nil {
@ -969,7 +969,7 @@ func (a *headAppender) Commit() (err error) {
if s.T > oooMaxT { if s.T > oooMaxT {
oooMaxT = s.T oooMaxT = s.T
} }
floatOOOAccepted++ oooFloatsAccepted++
} else { } else {
// Sample is an exact duplicate of the last sample. // Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
@ -1065,7 +1065,7 @@ func (a *headAppender) Commit() (err error) {
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected)) a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended)) a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted)) a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooFloatsAccepted))
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt) a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT) a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)
@ -1083,7 +1083,7 @@ func (a *headAppender) Commit() (err error) {
} }
// insert is like append, except it inserts. Used for OOO samples. // insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
if s.ooo == nil { if s.ooo == nil {
s.ooo = &memSeriesOOOFields{} s.ooo = &memSeriesOOOFields{}
} }
@ -1094,7 +1094,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
chunkCreated = true chunkCreated = true
} }
ok := c.chunk.Insert(t, v, nil, nil) ok := c.chunk.Insert(t, v, h, fh)
if ok { if ok {
if chunkCreated || t < c.minTime { if chunkCreated || t < c.minTime {
c.minTime = t c.minTime = t

@ -890,7 +890,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
unknownRefs++ unknownRefs++
continue continue
} }
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax) ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax)
if chunkCreated { if chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc() h.metrics.chunks.Inc()

Loading…
Cancel
Save