Merge pull request #12396 from leizor/leizor/chunk-opts

Group args to append to memSeries in chunkOpts
pull/12495/head
Bartlomiej Plotka 1 year ago committed by GitHub
commit 4062f12573
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -881,9 +881,13 @@ func (a *headAppender) Commit() (err error) {
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
chunkRange = a.head.chunkRange.Load()
series *memSeries
enc record.Encoder
appendChunkOpts = chunkOpts{
chunkDiskMapper: a.head.chunkDiskMapper,
chunkRange: a.head.chunkRange.Load(),
samplesPerChunk: a.head.opts.SamplesPerChunk,
}
enc record.Encoder
)
defer func() {
for i := range oooRecords {
@ -987,7 +991,7 @@ func (a *headAppender) Commit() (err error) {
samplesAppended--
}
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
@ -1016,7 +1020,7 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1042,7 +1046,7 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1118,12 +1122,19 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
return ok, chunkCreated, mmapRef
}
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
type chunkOpts struct {
chunkDiskMapper *chunks.ChunkDiskMapper
chunkRange int64
samplesPerChunk int
}
// append adds the sample (t, v) to the series. The caller also has to provide
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange, samplesPerChunk)
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1144,7 +1155,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
@ -1157,7 +1168,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1193,7 +1204,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// - okToAppend and no inserts → Chunk is ready to support our histogram.
switch {
case !okToAppend || counterReset:
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
// New buckets have appeared. We need to recode all
@ -1238,7 +1249,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// appendFloatHistogram adds the float histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
@ -1251,7 +1262,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1287,7 +1298,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
// - okToAppend and no inserts → Chunk is ready to support our histogram.
switch {
case !okToAppend || counterReset:
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
// New buckets have appeared. We need to recode all
@ -1333,9 +1344,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
c = s.head()
if c == nil {
@ -1344,7 +1353,7 @@ func (s *memSeries) appendPreprocessor(
return c, false, false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}
@ -1356,7 +1365,7 @@ func (s *memSeries) appendPreprocessor(
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}
@ -1365,14 +1374,14 @@ func (s *memSeries) appendPreprocessor(
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c.minTime = t
s.nextAt = rangeForTimestamp(c.minTime, chunkRange)
s.nextAt = rangeForTimestamp(c.minTime, o.chunkRange)
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
if numSamples == o.samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)
}
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
@ -1380,8 +1389,8 @@ func (s *memSeries) appendPreprocessor(
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, o.chunkDiskMapper, o.chunkRange)
chunkCreated = true
}

@ -283,10 +283,15 @@ func BenchmarkLoadWAL(b *testing.B) {
if c.mmappedChunkT != 0 {
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(b, err)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: c.mmappedChunkT,
samplesPerChunk: DefaultSamplesPerChunk,
}
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT, DefaultSamplesPerChunk)
s.append(c.mmappedChunkT, 42, 0, cOpts)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
require.NoError(b, chunkDiskMapper.Close())
@ -799,7 +804,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 2000
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 2000,
samplesPerChunk: DefaultSamplesPerChunk,
}
memChunkPool := sync.Pool{
New: func() interface{} {
@ -810,7 +819,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}
@ -1336,26 +1345,30 @@ func TestMemSeries_append(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 500
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 500,
samplesPerChunk: DefaultSamplesPerChunk,
}
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.append(998, 1, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(999, 2, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1001, 4, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1368,7 +1381,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
require.True(t, ok, "append failed")
}
@ -1390,7 +1403,11 @@ func TestMemSeries_appendHistogram(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
chunkRange := int64(1000)
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: int64(1000),
samplesPerChunk: DefaultSamplesPerChunk,
}
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
@ -1404,19 +1421,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1426,7 +1443,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
@ -1446,7 +1463,11 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, chunkDiskMapper.Close())
})
chunkRange := DefaultBlockDuration
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: DefaultBlockDuration,
samplesPerChunk: samplesPerChunk,
}
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
@ -1456,7 +1477,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
var nextTs int64
var totalAppendedSamples int
for i := 0; i < samplesPerChunk/4; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "slow sample %d was not appended", i)
nextTs += slowRate
totalAppendedSamples++
@ -1465,12 +1486,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
// Suddenly, the rate increases and we receive a sample every millisecond.
for i := 0; i < math.MaxUint16; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
require.Truef(t, ok, "quick sample %d was not appended", i)
nextTs++
totalAppendedSamples++
}
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts)
require.True(t, ok, "new chunk sample was not appended")
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
@ -1490,23 +1511,29 @@ func TestGCChunkAccess(t *testing.T) {
require.NoError(t, h.Close())
}()
cOpts := chunkOpts{
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,
}
h.initTime(0)
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.append(0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1543,23 +1570,29 @@ func TestGCSeriesAccess(t *testing.T) {
require.NoError(t, h.Close())
}()
cOpts := chunkOpts{
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,
}
h.initTime(0)
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.append(0, 0, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(999, 999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1791,14 +1824,20 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
require.NoError(t, h.Init(math.MinInt64))
cOpts := chunkOpts{
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: chunkRange,
samplesPerChunk: DefaultSamplesPerChunk,
}
s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
require.True(t, created, "series was not created")
for i := 0; i < 7; i++ {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
h.chunkDiskMapper.CutNewFile()
@ -2144,9 +2183,15 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
h.initTime(0)
cOpts := chunkOpts{
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: DefaultSamplesPerChunk,
}
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load(), DefaultSamplesPerChunk)
ok, _ := s.append(0, 0, 0, cOpts)
require.True(t, ok, "Series append failed.")
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
}
@ -2608,12 +2653,16 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 500
cOpts := chunkOpts{
chunkDiskMapper: chunkDiskMapper,
chunkRange: 500,
samplesPerChunk: DefaultSamplesPerChunk,
}
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
require.True(t, ok, "sample append failed")
}

@ -564,7 +564,11 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
minValidTime := h.minValidTime.Load()
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
chunkRange := h.chunkRange.Load()
appendChunkOpts := chunkOpts{
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
}
for in := range wp.input {
if in.existingSeries != nil {
@ -588,7 +592,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk); chunkCreated {
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
@ -618,9 +622,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
}
var chunkCreated bool
if s.h != nil {
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
} else {
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()

Loading…
Cancel
Save