Browse Source

Also pass chunkOpts into appendPreprocessor

Signed-off-by: Justin Lei <justin.lei@grafana.com>
pull/12396/head
Justin Lei 2 years ago
parent
commit
e73d8b2084
  1. 22
      tsdb/head_append.go

22
tsdb/head_append.go

@ -1134,7 +1134,7 @@ type chunkOpts struct {
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1168,7 +1168,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1262,7 +1262,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1344,9 +1344,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
c = s.head()
if c == nil {
@ -1355,7 +1353,7 @@ func (s *memSeries) appendPreprocessor(
return c, false, false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}
@ -1367,7 +1365,7 @@ func (s *memSeries) appendPreprocessor(
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}
@ -1376,14 +1374,14 @@ func (s *memSeries) appendPreprocessor(
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c.minTime = t
s.nextAt = rangeForTimestamp(c.minTime, chunkRange)
s.nextAt = rangeForTimestamp(c.minTime, o.chunkRange)
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
if numSamples == o.samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
@ -1391,8 +1389,8 @@ func (s *memSeries) appendPreprocessor(
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}

Loading…
Cancel
Save