mirror of https://github.com/prometheus/prometheus
Implement vertical compaction for native histograms (#11184)
* Implement vertical compaction for native histograms Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Fix typo Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>pull/11208/head
parent
9325caa41c
commit
0f4e5196c4
|
@ -23,6 +23,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
|
@ -384,6 +385,22 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
|
||||||
func TestCompactingChunkSeriesMerger(t *testing.T) {
|
func TestCompactingChunkSeriesMerger(t *testing.T) {
|
||||||
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
|
m := NewCompactingChunkSeriesMerger(ChainedSeriesMerge)
|
||||||
|
|
||||||
|
// histogramSample returns a histogram that is unique to the ts.
|
||||||
|
histogramSample := func(ts int64) sample {
|
||||||
|
idx := ts + 1
|
||||||
|
return sample{t: ts, h: &histogram.Histogram{
|
||||||
|
Schema: 2,
|
||||||
|
ZeroThreshold: 0.001,
|
||||||
|
ZeroCount: 2 * uint64(idx),
|
||||||
|
Count: 5 * uint64(idx),
|
||||||
|
Sum: 12.34 * float64(idx),
|
||||||
|
PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 2, Length: 1}},
|
||||||
|
NegativeSpans: []histogram.Span{{Offset: 2, Length: 1}, {Offset: 1, Length: 2}},
|
||||||
|
PositiveBuckets: []int64{1 * idx, -1 * idx, 3 * idx},
|
||||||
|
NegativeBuckets: []int64{1 * idx, 2 * idx, 3 * idx},
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
name string
|
name string
|
||||||
input []ChunkSeries
|
input []ChunkSeries
|
||||||
|
@ -486,6 +503,32 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
|
||||||
tsdbutil.GenerateSamples(120, 30),
|
tsdbutil.GenerateSamples(120, 30),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "histogram chunks overlapping",
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(0), histogramSample(5)}, []tsdbutil.Sample{histogramSample(10), histogramSample(15)}),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(2), histogramSample(20)}, []tsdbutil.Sample{histogramSample(25), histogramSample(30)}),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(18), histogramSample(26)}, []tsdbutil.Sample{histogramSample(31), histogramSample(35)}),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]tsdbutil.Sample{histogramSample(0), histogramSample(2), histogramSample(5), histogramSample(10), histogramSample(15), histogramSample(18), histogramSample(20), histogramSample(25), histogramSample(26), histogramSample(30)},
|
||||||
|
[]tsdbutil.Sample{histogramSample(31), histogramSample(35)},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "histogram chunks overlapping with float chunks",
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{histogramSample(0), histogramSample(5)}, []tsdbutil.Sample{histogramSample(10), histogramSample(15)}),
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1, nil, nil}, sample{12, 12, nil, nil}}, []tsdbutil.Sample{sample{14, 14, nil, nil}}),
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
[]tsdbutil.Sample{histogramSample(0)},
|
||||||
|
[]tsdbutil.Sample{sample{1, 1, nil, nil}},
|
||||||
|
[]tsdbutil.Sample{histogramSample(5), histogramSample(10)},
|
||||||
|
[]tsdbutil.Sample{sample{12, 12, nil, nil}, sample{14, 14, nil, nil}},
|
||||||
|
[]tsdbutil.Sample{histogramSample(15)},
|
||||||
|
),
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
merged := m(tc.input...)
|
merged := m(tc.input...)
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
@ -252,28 +253,32 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
// TODO(beorn7): Add Histogram support.
|
var (
|
||||||
chk := chunkenc.NewXORChunk()
|
chk chunkenc.Chunk
|
||||||
app, err := chk.Appender()
|
app chunkenc.Appender
|
||||||
if err != nil {
|
err error
|
||||||
return errChunksIterator{err: err}
|
)
|
||||||
}
|
|
||||||
mint := int64(math.MaxInt64)
|
mint := int64(math.MaxInt64)
|
||||||
maxt := int64(math.MinInt64)
|
maxt := int64(math.MinInt64)
|
||||||
|
|
||||||
chks := []chunks.Meta{}
|
chks := []chunks.Meta{}
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
seriesIter := s.Series.Iterator()
|
seriesIter := s.Series.Iterator()
|
||||||
for seriesIter.Next() == chunkenc.ValFloat {
|
lastType := chunkenc.ValNone
|
||||||
// Create a new chunk if too many samples in the current one.
|
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
|
||||||
if i >= seriesToChunkEncoderSplit {
|
if typ != lastType || i >= seriesToChunkEncoderSplit {
|
||||||
chks = append(chks, chunks.Meta{
|
// Create a new chunk if the sample type changed or too many samples in the current one.
|
||||||
MinTime: mint,
|
if chk != nil {
|
||||||
MaxTime: maxt,
|
chks = append(chks, chunks.Meta{
|
||||||
Chunk: chk,
|
MinTime: mint,
|
||||||
})
|
MaxTime: maxt,
|
||||||
chk = chunkenc.NewXORChunk()
|
Chunk: chk,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
|
||||||
|
if err != nil {
|
||||||
|
return errChunksIterator{err: err}
|
||||||
|
}
|
||||||
app, err = chk.Appender()
|
app, err = chk.Appender()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
|
@ -282,9 +287,23 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
// maxt is immediately overwritten below which is why setting it here won't make a difference.
|
// maxt is immediately overwritten below which is why setting it here won't make a difference.
|
||||||
i = 0
|
i = 0
|
||||||
}
|
}
|
||||||
|
lastType = typ
|
||||||
|
|
||||||
t, v := seriesIter.At()
|
var (
|
||||||
app.Append(t, v)
|
t int64
|
||||||
|
v float64
|
||||||
|
h *histogram.Histogram
|
||||||
|
)
|
||||||
|
switch typ {
|
||||||
|
case chunkenc.ValFloat:
|
||||||
|
t, v = seriesIter.At()
|
||||||
|
app.Append(t, v)
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
t, h = seriesIter.AtHistogram()
|
||||||
|
app.AppendHistogram(t, h)
|
||||||
|
default:
|
||||||
|
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
|
||||||
|
}
|
||||||
|
|
||||||
maxt = t
|
maxt = t
|
||||||
if mint == math.MaxInt64 {
|
if mint == math.MaxInt64 {
|
||||||
|
@ -296,11 +315,13 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
chks = append(chks, chunks.Meta{
|
if chk != nil {
|
||||||
MinTime: mint,
|
chks = append(chks, chunks.Meta{
|
||||||
MaxTime: maxt,
|
MinTime: mint,
|
||||||
Chunk: chk,
|
MaxTime: maxt,
|
||||||
})
|
Chunk: chk,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return NewListChunkSeriesIterator(chks...)
|
return NewListChunkSeriesIterator(chks...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,6 +146,17 @@ func (v ValueType) String() string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v ValueType) ChunkEncoding() Encoding {
|
||||||
|
switch v {
|
||||||
|
case ValFloat:
|
||||||
|
return EncXOR
|
||||||
|
case ValHistogram:
|
||||||
|
return EncHistogram
|
||||||
|
default:
|
||||||
|
return EncNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values.
|
// MockSeriesIterator returns an iterator for a mock series with custom timeStamps and values.
|
||||||
func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
|
func MockSeriesIterator(timestamps []int64, values []float64) Iterator {
|
||||||
return &mockSeriesIterator{
|
return &mockSeriesIterator{
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
package tsdbutil
|
package tsdbutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
|
@ -37,10 +39,12 @@ type SampleSlice []Sample
|
||||||
func (s SampleSlice) Get(i int) Sample { return s[i] }
|
func (s SampleSlice) Get(i int) Sample { return s[i] }
|
||||||
func (s SampleSlice) Len() int { return len(s) }
|
func (s SampleSlice) Len() int { return len(s) }
|
||||||
|
|
||||||
|
// ChunkFromSamples requires all samples to have the same type.
|
||||||
func ChunkFromSamples(s []Sample) chunks.Meta {
|
func ChunkFromSamples(s []Sample) chunks.Meta {
|
||||||
return ChunkFromSamplesGeneric(SampleSlice(s))
|
return ChunkFromSamplesGeneric(SampleSlice(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChunkFromSamplesGeneric requires all samples to have the same type.
|
||||||
func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
|
func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
|
||||||
mint, maxt := int64(0), int64(0)
|
mint, maxt := int64(0), int64(0)
|
||||||
|
|
||||||
|
@ -48,11 +52,29 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
|
||||||
mint, maxt = s.Get(0).T(), s.Get(s.Len()-1).T()
|
mint, maxt = s.Get(0).T(), s.Get(s.Len()-1).T()
|
||||||
}
|
}
|
||||||
|
|
||||||
c := chunkenc.NewXORChunk()
|
if s.Len() == 0 {
|
||||||
|
return chunks.Meta{
|
||||||
|
Chunk: chunkenc.NewXORChunk(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sampleType := s.Get(0).Type()
|
||||||
|
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding())
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // TODO(codesome): dont panic.
|
||||||
|
}
|
||||||
|
|
||||||
ca, _ := c.Appender()
|
ca, _ := c.Appender()
|
||||||
|
|
||||||
for i := 0; i < s.Len(); i++ {
|
for i := 0; i < s.Len(); i++ {
|
||||||
ca.Append(s.Get(i).T(), s.Get(i).V())
|
switch sampleType {
|
||||||
|
case chunkenc.ValFloat:
|
||||||
|
ca.Append(s.Get(i).T(), s.Get(i).V())
|
||||||
|
case chunkenc.ValHistogram:
|
||||||
|
ca.AppendHistogram(s.Get(i).T(), s.Get(i).H())
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("unknown sample type %s", sampleType.String()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return chunks.Meta{
|
return chunks.Meta{
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
|
|
Loading…
Reference in New Issue