Merge pull request #10992 from prometheus/beorn7/sparsehistogram

tsdb: Fix chunk handling during appendHistogram
pull/11011/head
Björn Rabenstein 2 years ago committed by GitHub
commit 1338e98fb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1309,6 +1309,60 @@ func TestMemSeries_append(t *testing.T) {
}
}
func TestMemSeries_appendHistogram(t *testing.T) {
dir := t.TempDir()
// This is usually taken from the Head, but passing manually here.
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
require.NoError(t, err)
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled)
histograms := GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy()
histogramWithOneMoreBucket.Count++
histogramWithOneMoreBucket.Sum += 1.23
histogramWithOneMoreBucket.PositiveSpans[1].Length = 3
histogramWithOneMoreBucket.PositiveBuckets = append(histogramWithOneMoreBucket.PositiveBuckets, 1)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
require.Equal(t, int64(1002), s.headChunk.maxTime, "wrong chunk range")
}
func TestMemSeries_append_atVariableRate(t *testing.T) {
const samplesPerChunk = 120
dir := t.TempDir()

@ -735,12 +735,33 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
var h *histogram.Histogram
t, h = p.currDelIter.AtHistogram()
p.curr.MinTime = t
app.AppendHistogram(t, h)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValHistogram {
panic(fmt.Errorf("found value type %v in histogram chunk", vt))
err = fmt.Errorf("found value type %v in histogram chunk", vt)
break
}
t, h = p.currDelIter.AtHistogram()
// Defend against corrupted chunks.
pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h)
if len(pI)+len(nI) > 0 {
err = fmt.Errorf(
"bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required",
len(pI), len(nI),
)
break
}
if counterReset {
err = errors.New("detected unexpected counter reset in histogram")
break
}
if !okToAppend {
err = errors.New("unable to append histogram due to unexpected schema change")
break
}
app.AppendHistogram(t, h)
}
case chunkenc.ValFloat:
@ -754,7 +775,8 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool {
app.Append(t, v)
for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() {
if vt != chunkenc.ValFloat {
panic(fmt.Errorf("found value type %v in float chunk", vt))
err = fmt.Errorf("found value type %v in float chunk", vt)
break
}
t, v = p.currDelIter.At()
app.Append(t, v)

Loading…
Cancel
Save