From 0f4e5196c42ea012d11127ce2db146045caeb349 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Mon, 22 Aug 2022 19:04:39 +0530 Subject: [PATCH] Implement vertical compaction for native histograms (#11184) * Implement vertical compaction for native histograms Signed-off-by: Ganesh Vernekar * Fix typo Signed-off-by: Ganesh Vernekar Signed-off-by: Ganesh Vernekar --- storage/merge_test.go | 43 ++++++++++++++++++++++++++ storage/series.go | 67 +++++++++++++++++++++++++++-------------- tsdb/chunkenc/chunk.go | 11 +++++++ tsdb/tsdbutil/chunks.go | 26 ++++++++++++++-- 4 files changed, 122 insertions(+), 25 deletions(-) diff --git a/storage/merge_test.go b/storage/merge_test.go index 71ef9a17f..2b4091d9f 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" @@ -384,6 +385,22 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { func TestCompactingChunkSeriesMerger(t *testing.T) { m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge) + // histogramSample returns a histogram that is unique to the ts. + histogramSample := func(ts int64) sample { + idx := ts + 1 + return sample{t: ts, h: &histogram.Histogram{ + Schema: 2, + ZeroThreshold: 0.001, + ZeroCount: 2 * uint64(idx), + Count: 5 * uint64(idx), + Sum: 12.34 * float64(idx), + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 2, Length: 1}}, + NegativeSpans: []histogram.Span{{Offset: 2, Length: 1}, {Offset: 1, Length: 2}}, + PositiveBuckets: []int64{1 * idx, -1 * idx, 3 * idx}, + NegativeBuckets: []int64{1 * idx, 2 * idx, 3 * idx}, + }} + } + for _, tc := range []struct { name string input []ChunkSeries @@ -486,6 +503,32 @@ func TestCompactingChunkSeriesMerger(t *testing.T) { tsdbutil.GenerateSamples(120, 30), ), }, + { + name: "histogram chunks overlapping", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(0), histogramSample(5)}, []tsdbutil.Sample{histogramSample(10), histogramSample(15)}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(2), histogramSample(20)}, []tsdbutil.Sample{histogramSample(25), histogramSample(30)}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(18), histogramSample(26)}, []tsdbutil.Sample{histogramSample(31), histogramSample(35)}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{histogramSample(0), histogramSample(2), histogramSample(5), histogramSample(10), histogramSample(15), histogramSample(18), histogramSample(20), histogramSample(25), histogramSample(26), histogramSample(30)}, + []tsdbutil.Sample{histogramSample(31), histogramSample(35)}, + ), + }, + { + name: "histogram chunks overlapping with float chunks", + input: []ChunkSeries{ + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(0), histogramSample(5)}, []tsdbutil.Sample{histogramSample(10), histogramSample(15)}), + NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{12, 12, nil, nil}}, []tsdbutil.Sample{sample{14, 14, nil, nil}}), + }, + expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), + []tsdbutil.Sample{histogramSample(0)}, + []tsdbutil.Sample{sample{1, 1, nil, nil}}, + []tsdbutil.Sample{histogramSample(5), histogramSample(10)}, + []tsdbutil.Sample{sample{12, 12, nil, nil}, sample{14, 14, nil, nil}}, + []tsdbutil.Sample{histogramSample(15)}, + ), + }, } { t.Run(tc.name, func(t *testing.T) { merged := m(tc.input...) diff --git a/storage/series.go b/storage/series.go index eacd1fa7a..3259dd4d0 100644 --- a/storage/series.go +++ b/storage/series.go @@ -14,6 +14,7 @@ package storage import ( + "fmt" "math" "sort" @@ -252,28 +253,32 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries { } func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { - // TODO(beorn7): Add Histogram support. - chk := chunkenc.NewXORChunk() - app, err := chk.Appender() - if err != nil { - return errChunksIterator{err: err} - } + var ( + chk chunkenc.Chunk + app chunkenc.Appender + err error + ) mint := int64(math.MaxInt64) maxt := int64(math.MinInt64) chks := []chunks.Meta{} - i := 0 seriesIter := s.Series.Iterator() - for seriesIter.Next() == chunkenc.ValFloat { - // Create a new chunk if too many samples in the current one. - if i >= seriesToChunkEncoderSplit { - chks = append(chks, chunks.Meta{ - MinTime: mint, - MaxTime: maxt, - Chunk: chk, - }) - chk = chunkenc.NewXORChunk() + lastType := chunkenc.ValNone + for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { + if typ != lastType || i >= seriesToChunkEncoderSplit { + // Create a new chunk if the sample type changed or too many samples in the current one. + if chk != nil { + chks = append(chks, chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) + } + chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) + if err != nil { + return errChunksIterator{err: err} + } app, err = chk.Appender() if err != nil { return errChunksIterator{err: err} @@ -282,9 +287,23 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { // maxt is immediately overwritten below which is why setting it here won't make a difference. i = 0 } + lastType = typ - t, v := seriesIter.At() - app.Append(t, v) + var ( + t int64 + v float64 + h *histogram.Histogram + ) + switch typ { + case chunkenc.ValFloat: + t, v = seriesIter.At() + app.Append(t, v) + case chunkenc.ValHistogram: + t, h = seriesIter.AtHistogram() + app.AppendHistogram(t, h) + default: + return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} + } maxt = t if mint == math.MaxInt64 { @@ -296,11 +315,13 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator { return errChunksIterator{err: err} } - chks = append(chks, chunks.Meta{ - MinTime: mint, - MaxTime: maxt, - Chunk: chk, - }) + if chk != nil { + chks = append(chks, chunks.Meta{ + MinTime: mint, + MaxTime: maxt, + Chunk: chk, + }) + } return NewListChunkSeriesIterator(chks...) } diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index d11c6d2db..7db5f0cbd 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -146,6 +146,17 @@ func (v ValueType) String() string { } } +func (v ValueType) ChunkEncoding() Encoding { + switch v { + case ValFloat: + return EncXOR + case ValHistogram: + return EncHistogram + default: + return EncNone + } +} + // MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values. func MockSeriesIterator(timestamps []int64, values []float64) Iterator { return &mockSeriesIterator{ diff --git a/tsdb/tsdbutil/chunks.go b/tsdb/tsdbutil/chunks.go index af0f80772..1be6a8331 100644 --- a/tsdb/tsdbutil/chunks.go +++ b/tsdb/tsdbutil/chunks.go @@ -14,6 +14,8 @@ package tsdbutil import ( + "fmt" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -37,10 +39,12 @@ type SampleSlice []Sample func (s SampleSlice) Get(i int) Sample { return s[i] } func (s SampleSlice) Len() int { return len(s) } +// ChunkFromSamples requires all samples to have the same type. func ChunkFromSamples(s []Sample) chunks.Meta { return ChunkFromSamplesGeneric(SampleSlice(s)) } +// ChunkFromSamplesGeneric requires all samples to have the same type. func ChunkFromSamplesGeneric(s Samples) chunks.Meta { mint, maxt := int64(0), int64(0) @@ -48,11 +52,29 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta { mint, maxt = s.Get(0).T(), s.Get(s.Len()-1).T() } - c := chunkenc.NewXORChunk() + if s.Len() == 0 { + return chunks.Meta{ + Chunk: chunkenc.NewXORChunk(), + } + } + + sampleType := s.Get(0).Type() + c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding()) + if err != nil { + panic(err) // TODO(codesome): dont panic. + } + ca, _ := c.Appender() for i := 0; i < s.Len(); i++ { - ca.Append(s.Get(i).T(), s.Get(i).V()) + switch sampleType { + case chunkenc.ValFloat: + ca.Append(s.Get(i).T(), s.Get(i).V()) + case chunkenc.ValHistogram: + ca.AppendHistogram(s.Get(i).T(), s.Get(i).H()) + default: + panic(fmt.Sprintf("unknown sample type %s", sampleType.String())) + } } return chunks.Meta{ MinTime: mint,