diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8d66d1e81..0d4628eaf 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1091,7 +1091,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk chunkCreated = true } - ok := c.chunk.Insert(t, v) + ok := c.chunk.Insert(t, v, nil, nil) if ok { if chunkCreated || t < c.minTime { c.minTime = t diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index b2556d62e..59477e5a5 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -17,6 +17,7 @@ import ( "fmt" "sort" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/oklog/ulid" @@ -39,13 +40,13 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. -func (o *OOOChunk) Insert(t int64, v float64) bool { +func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { // Although out-of-order samples can be out-of-order amongst themselves, we // are opinionated and expect them to be usually in-order meaning we could // try to append at the end first if the new timestamp is higher than the // last known timestamp. if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { - o.samples = append(o.samples, sample{t, v, nil, nil}) + o.samples = append(o.samples, sample{t, v, h, fh}) return true } @@ -54,7 +55,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { if i >= len(o.samples) { // none found. append it at the end - o.samples = append(o.samples, sample{t, v, nil, nil}) + o.samples = append(o.samples, sample{t, v, h, fh}) return true } @@ -66,7 +67,7 @@ func (o *OOOChunk) Insert(t int64, v float64) bool { // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - o.samples[i] = sample{t, v, nil, nil} + o.samples[i] = sample{t, v, h, fh} return true } @@ -142,9 +143,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + cmint = s.t } chunk = newChunk - cmint = s.t } case chunkenc.EncFloatHistogram: // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. @@ -157,9 +158,9 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) + cmint = s.t } chunk = newChunk - cmint = s.t } } cmaxt = s.t diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index 27ff4048b..d3cd5f601 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -14,8 +14,14 @@ package tsdb import ( + "math" "testing" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/stretchr/testify/require" ) @@ -52,7 +58,7 @@ func TestOOOInsert(t *testing.T) { chunk := NewOOOChunk() chunk.samples = makeEvenSampleSlice(numPreExisting) newSample := samplify(valOdd(insertPos)) - chunk.Insert(newSample.t, newSample.f) + chunk.Insert(newSample.t, newSample.f, nil, nil) var expSamples []sample // Our expected new samples slice, will be first the original samples. @@ -83,7 +89,7 @@ func TestOOOInsertDuplicate(t *testing.T) { dupSample := chunk.samples[dupPos] dupSample.f = 0.123 - ok := chunk.Insert(dupSample.t, dupSample.f) + ok := chunk.Insert(dupSample.t, dupSample.f, nil, nil) expSamples := makeEvenSampleSlice(num) // We expect no change. require.False(t, ok) @@ -91,3 +97,136 @@ func TestOOOInsertDuplicate(t *testing.T) { } } } + +type chunkVerify struct { + encoding chunkenc.Encoding + minTime int64 + maxTime int64 +} + +func TestOOOChunks_ToEncodedChunks(t *testing.T) { + h1 := tsdbutil.GenerateTestHistogram(1) + // Make h2 appendable but with more buckets, to trigger recoding. + h2 := h1.Copy() + h2.PositiveSpans = append(h2.PositiveSpans, histogram.Span{Offset: 1, Length: 1}) + h2.PositiveBuckets = append(h2.PositiveBuckets, 12) + + testCases := map[string]struct { + samples []sample + expectedCounterResets []histogram.CounterResetHint + expectedChunks []chunkVerify + }{ + "empty": { + samples: []sample{}, + }, + "has floats": { + samples: []sample{ + {t: 1000, f: 43.0}, + {t: 1100, f: 42.0}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1100}, + }, + }, + "mix of floats and histograms": { + samples: []sample{ + {t: 1000, f: 43.0}, + {t: 1100, h: h1}, + {t: 1200, f: 42.0}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset, histogram.UnknownCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncXOR, minTime: 1000, maxTime: 1000}, + {encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100}, + {encoding: chunkenc.EncXOR, minTime: 1200, maxTime: 1200}, + }, + }, + "has a counter reset": { + samples: []sample{ + {t: 1000, h: h2}, + {t: 1100, h: h1}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.CounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000}, + {encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100}, + }, + }, + "has a recoded histogram": { // Regression test for wrong minT, maxT in histogram recoding. + samples: []sample{ + {t: 0, h: h1}, + {t: 1, h: h2}, + }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.NotCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncHistogram, minTime: 0, maxTime: 1}, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Sanity check. + require.Equal(t, len(tc.samples), len(tc.expectedCounterResets), "number of samples and counter resets") + + oooChunk := OOOChunk{} + for _, s := range tc.samples { + switch s.Type() { + case chunkenc.ValFloat: + oooChunk.Insert(s.t, s.f, nil, nil) + case chunkenc.ValHistogram: + oooChunk.Insert(s.t, 0, s.h.Copy(), nil) + case chunkenc.ValFloatHistogram: + oooChunk.Insert(s.t, 0, nil, s.fh.Copy()) + default: + t.Fatalf("unexpected sample type %d", s.Type()) + } + } + + chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, len(tc.expectedChunks), len(chunks), "number of chunks") + sampleIndex := 0 + for i, c := range chunks { + require.Equal(t, tc.expectedChunks[i].encoding, c.chunk.Encoding(), "chunk %d encoding", i) + require.Equal(t, tc.expectedChunks[i].minTime, c.minTime, "chunk %d minTime", i) + require.Equal(t, tc.expectedChunks[i].maxTime, c.maxTime, "chunk %d maxTime", i) + samples, err := storage.ExpandSamples(c.chunk.Iterator(nil), newSample) + require.GreaterOrEqual(t, len(tc.samples)-sampleIndex, len(samples), "too many samples in chunk %d expected less than %d", i, len(tc.samples)-sampleIndex) + require.NoError(t, err) + if len(samples) == 0 { + // Ignore empty chunks. + continue + } + switch c.chunk.Encoding() { + case chunkenc.EncXOR: + for j, s := range samples { + require.Equal(t, chunkenc.ValFloat, s.Type()) + // XOR chunks don't have counter reset hints, so we shouldn't expect anything else than UnknownCounterReset. + require.Equal(t, histogram.UnknownCounterReset, tc.expectedCounterResets[sampleIndex+j], "sample reset hint %d", sampleIndex+j) + require.Equal(t, tc.samples[sampleIndex+j].f, s.F(), "sample %d", sampleIndex+j) + } + case chunkenc.EncHistogram: + for j, s := range samples { + require.Equal(t, chunkenc.ValHistogram, s.Type()) + require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.H().CounterResetHint, "sample reset hint %d", sampleIndex+j) + compareTo := tc.samples[sampleIndex+j].h.Copy() + compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j] + require.Equal(t, compareTo, s.H().Compact(0), "sample %d", sampleIndex+j) + } + case chunkenc.EncFloatHistogram: + for j, s := range samples { + require.Equal(t, chunkenc.ValFloatHistogram, s.Type()) + require.Equal(t, tc.expectedCounterResets[sampleIndex+j], s.FH().CounterResetHint, "sample reset hint %d", sampleIndex+j) + compareTo := tc.samples[sampleIndex+j].fh.Copy() + compareTo.CounterResetHint = tc.expectedCounterResets[sampleIndex+j] + require.Equal(t, compareTo, s.FH().Compact(0), "sample %d", sampleIndex+j) + } + } + sampleIndex += len(samples) + } + require.Equal(t, len(tc.samples), sampleIndex, "number of samples") + }) + } +}