mirror of https://github.com/prometheus/prometheus
Merge pull request #12874 from krajorama/outof-order-chunks
Fix duplicate sample detection at chunk size limitpull/12875/head
commit
864da019cd
|
@ -1282,9 +1282,6 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
||||||
// There is no head chunk in this series yet, create the first chunk for the sample.
|
// There is no head chunk in this series yet, create the first chunk for the sample.
|
||||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
} else if len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
|
||||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
|
||||||
chunkCreated = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Out of order sample.
|
// Out of order sample.
|
||||||
|
@ -1292,6 +1289,12 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
||||||
return c, false, chunkCreated
|
return c, false, chunkCreated
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
|
||||||
|
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
||||||
|
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||||
|
chunkCreated = true
|
||||||
|
}
|
||||||
|
|
||||||
if c.chunk.Encoding() != e {
|
if c.chunk.Encoding() != e {
|
||||||
// The chunk encoding expected by this append is different than the head chunk's
|
// The chunk encoding expected by this append is different than the head chunk's
|
||||||
// encoding. So we cut a new chunk with the expected encoding.
|
// encoding. So we cut a new chunk with the expected encoding.
|
||||||
|
|
|
@ -5499,3 +5499,49 @@ func TestCuttingNewHeadChunks(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample
|
||||||
|
// is appended to the head, right when the head chunk is at the size limit.
|
||||||
|
// The test adds all samples as duplicate, thus expecting that the result has
|
||||||
|
// exactly half of the samples.
|
||||||
|
func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
|
||||||
|
numSamples := 1000
|
||||||
|
baseTS := int64(1695209650)
|
||||||
|
|
||||||
|
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, h.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
a := h.Appender(context.Background())
|
||||||
|
var err error
|
||||||
|
vals := []float64{math.MaxFloat64, 0x00} // Use the worst case scenario for the XOR encoding. Otherwise we hit the sample limit before the size limit.
|
||||||
|
for i := 0; i < numSamples; i++ {
|
||||||
|
ts := baseTS + int64(i/2)*10000
|
||||||
|
a.Append(0, labels.FromStrings("foo", "bar"), ts, vals[(i/2)%len(vals)])
|
||||||
|
err = a.Commit()
|
||||||
|
require.NoError(t, err)
|
||||||
|
a = h.Appender(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
indexReader, err := h.Index()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var (
|
||||||
|
chunks []chunks.Meta
|
||||||
|
builder labels.ScratchBuilder
|
||||||
|
)
|
||||||
|
require.NoError(t, indexReader.Series(1, &builder, &chunks))
|
||||||
|
|
||||||
|
chunkReader, err := h.Chunks()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
storedSampleCount := 0
|
||||||
|
for _, chunkMeta := range chunks {
|
||||||
|
chunk, err := chunkReader.Chunk(chunkMeta)
|
||||||
|
require.NoError(t, err)
|
||||||
|
storedSampleCount += chunk.NumSamples()
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, numSamples/2, storedSampleCount)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue