Add support for native histograms to concreteSeriesIterator

Signed-off-by: Justin Lei <justin.lei@grafana.com>
pull/12192/head
Justin Lei 2 years ago
parent 211ae4f1f0
commit 83f43982c9

@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
@ -173,7 +174,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
return errSeriesSet{err: err}
}
lbls := labelProtosToLabels(ts.Labels)
series = append(series, &concreteSeries{labels: lbls, samples: ts.Samples})
series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms})
}
if sortSeries {
@ -359,8 +360,9 @@ func (c *concreteSeriesSet) Warnings() storage.Warnings { return nil }
// concreteSeries implements storage.Series.
type concreteSeries struct {
labels labels.Labels
samples []prompb.Sample
labels labels.Labels
floats []prompb.Sample
histograms []prompb.Histogram
}
func (c *concreteSeries) Labels() labels.Labels {
@ -372,84 +374,168 @@ func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
csi.reset(c)
return csi
}
return newConcreteSeriersIterator(c)
return newConcreteSeriesIterator(c)
}
// concreteSeriesIterator implements storage.SeriesIterator.
type concreteSeriesIterator struct {
cur int
series *concreteSeries
floatsCur int
histogramsCur int
curValType chunkenc.ValueType
series *concreteSeries
}
func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator {
func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator {
return &concreteSeriesIterator{
cur: -1,
series: series,
floatsCur: -1,
histogramsCur: -1,
curValType: chunkenc.ValNone,
series: series,
}
}
func (c *concreteSeriesIterator) reset(series *concreteSeries) {
c.cur = -1
c.floatsCur = -1
c.histogramsCur = -1
c.curValType = chunkenc.ValNone
c.series = series
}
// Seek implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
if c.cur == -1 {
c.cur = 0
if c.floatsCur == -1 {
c.floatsCur = 0
}
if c.histogramsCur == -1 {
c.histogramsCur = 0
}
if c.cur >= len(c.series.samples) {
if c.floatsCur >= len(c.series.floats) && c.histogramsCur >= len(c.series.histograms) {
return chunkenc.ValNone
}
// No-op check.
if s := c.series.samples[c.cur]; s.Timestamp >= t {
return chunkenc.ValFloat
if (c.curValType == chunkenc.ValFloat && c.series.floats[c.floatsCur].Timestamp >= t) ||
((c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram) && c.series.histograms[c.histogramsCur].Timestamp >= t) {
return c.curValType
}
// Do binary search between current position and end.
c.cur += sort.Search(len(c.series.samples)-c.cur, func(n int) bool {
return c.series.samples[n+c.cur].Timestamp >= t
c.curValType = chunkenc.ValNone
// Binary search between current position and end for both float and histograms samples.
c.floatsCur += sort.Search(len(c.series.floats)-c.floatsCur, func(n int) bool {
return c.series.floats[n+c.floatsCur].Timestamp >= t
})
c.histogramsCur += sort.Search(len(c.series.histograms)-c.histogramsCur, func(n int) bool {
return c.series.histograms[n+c.histogramsCur].Timestamp >= t
})
if c.cur < len(c.series.samples) {
return chunkenc.ValFloat
if c.floatsCur < len(c.series.floats) && c.histogramsCur < len(c.series.histograms) {
// If float samples and histogram samples have overlapping timestamps prefer the float samples.
if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp {
c.curValType = chunkenc.ValFloat
} else {
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
}
// When the timestamps do not overlap the cursor for the non-selected sample type has advanced too
// far; we decrement it back down here.
if c.series.floats[c.floatsCur].Timestamp != c.series.histograms[c.histogramsCur].Timestamp {
if c.curValType == chunkenc.ValFloat {
c.histogramsCur--
} else {
c.floatsCur--
}
}
} else if c.floatsCur < len(c.series.floats) {
c.curValType = chunkenc.ValFloat
} else if c.histogramsCur < len(c.series.histograms) {
c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur])
}
return chunkenc.ValNone
// TODO(beorn7): Add histogram support.
return c.curValType
}
func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType {
_, isInt := h.GetCount().(*prompb.Histogram_CountInt)
if isInt {
return chunkenc.ValHistogram
}
return chunkenc.ValFloatHistogram
}
// At implements chunkenc.Iterator.
func (c *concreteSeriesIterator) At() (t int64, v float64) {
s := c.series.samples[c.cur]
if c.curValType != chunkenc.ValFloat {
panic("iterator is not on a float sample")
}
s := c.series.floats[c.floatsCur]
return s.Timestamp, s.Value
}
// AtHistogram always returns (0, nil) because there is no support for histogram
// values yet.
// TODO(beorn7): Fix that for histogram support in remote storage.
// AtHistogram implements chunkenc.Iterator
func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) {
return 0, nil
if c.curValType != chunkenc.ValHistogram {
panic("iterator is not on an integer histogram sample")
}
h := c.series.histograms[c.histogramsCur]
return h.Timestamp, HistogramProtoToHistogram(h)
}
// AtFloatHistogram always returns (0, nil) because there is no support for histogram
// values yet.
// TODO(beorn7): Fix that for histogram support in remote storage.
// AtFloatHistogram implements chunkenc.Iterator
func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
return 0, nil
switch c.curValType {
case chunkenc.ValHistogram:
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, HistogramProtoToFloatHistogram(fh)
case chunkenc.ValFloatHistogram:
fh := c.series.histograms[c.histogramsCur]
return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh)
default:
panic("iterator is not on a histogram sample")
}
}
// AtT implements chunkenc.Iterator.
func (c *concreteSeriesIterator) AtT() int64 {
s := c.series.samples[c.cur]
return s.Timestamp
if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram {
return c.series.histograms[c.histogramsCur].Timestamp
}
return c.series.floats[c.floatsCur].Timestamp
}
const noTS = int64(math.MaxInt64)
// Next implements chunkenc.Iterator.
func (c *concreteSeriesIterator) Next() chunkenc.ValueType {
c.cur++
if c.cur < len(c.series.samples) {
return chunkenc.ValFloat
}
return chunkenc.ValNone
// TODO(beorn7): Add histogram support.
peekFloatTS := noTS
if c.floatsCur+1 < len(c.series.floats) {
peekFloatTS = c.series.floats[c.floatsCur+1].Timestamp
}
peekHistTS := noTS
if c.histogramsCur+1 < len(c.series.histograms) {
peekHistTS = c.series.histograms[c.histogramsCur+1].Timestamp
}
c.curValType = chunkenc.ValNone
if peekFloatTS < peekHistTS {
c.floatsCur++
c.curValType = chunkenc.ValFloat
} else if peekHistTS < peekFloatTS {
c.histogramsCur++
c.curValType = chunkenc.ValHistogram
} else if peekFloatTS == noTS && peekHistTS == noTS {
// This only happens when the iterator is exhausted; we set the cursors off the end to prevent
// Seek() from returning anything afterwards.
c.floatsCur = len(c.series.floats)
c.histogramsCur = len(c.series.histograms)
} else {
// Prefer float samples to histogram samples if there's a conflict. We advance the cursor for histograms
// anyway otherwise the histogram sample will get selected on the next call to Next().
c.floatsCur++
c.histogramsCur++
c.curValType = chunkenc.ValFloat
}
return c.curValType
}
// Err implements chunkenc.Iterator.
@ -557,10 +643,10 @@ func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
}
}
// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
// the proto message represents a float histogram and not an integer histogram.
func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
@ -575,6 +661,24 @@ func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogr
}
}
// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message
// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a
// float histogram.
func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: float64(hp.GetZeroCountInt()),
Count: float64(hp.GetCountInt()),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
}
}
func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
@ -584,6 +688,16 @@ func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span {
return spans
}
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},

@ -29,6 +29,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
var testHistogram = histogram.Histogram{
@ -174,12 +175,12 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
func TestConcreteSeriesSet(t *testing.T) {
series1 := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []prompb.Sample{{Value: 1, Timestamp: 2}},
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{{Value: 1, Timestamp: 2}},
}
series2 := &concreteSeries{
labels: labels.FromStrings("foo", "baz"),
samples: []prompb.Sample{{Value: 3, Timestamp: 4}},
labels: labels.FromStrings("foo", "baz"),
floats: []prompb.Sample{{Value: 3, Timestamp: 4}},
}
c := &concreteSeriesSet{
series: []storage.Series{series1, series2},
@ -206,10 +207,10 @@ func TestConcreteSeriesClonesLabels(t *testing.T) {
require.Equal(t, lbls, gotLabels)
}
func TestConcreteSeriesIterator(t *testing.T) {
func TestConcreteSeriesIterator_FloatSamples(t *testing.T) {
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []prompb.Sample{
floats: []prompb.Sample{
{Value: 1, Timestamp: 1},
{Value: 1.5, Timestamp: 1},
{Value: 2, Timestamp: 2},
@ -255,6 +256,165 @@ func TestConcreteSeriesIterator(t *testing.T) {
require.Equal(t, chunkenc.ValNone, it.Seek(2))
}
func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) {
histograms := tsdbutil.GenerateTestHistograms(5)
histProtos := make([]prompb.Histogram, len(histograms))
for i, h := range histograms {
// Results in ts sequence of 1, 1, 2, 3, 4.
var ts int64
if i == 0 {
ts = 1
} else {
ts = int64(i)
}
histProtos[i] = HistogramToHistogramProto(ts, h)
}
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
histograms: histProtos,
}
it := series.Iterator(nil)
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
ts, v := it.AtHistogram()
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[0], v)
// Seek one further, next sample still has ts=1.
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, v = it.AtHistogram()
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[1], v)
// Seek again to 1 and make sure we stay where we are.
require.Equal(t, chunkenc.ValHistogram, it.Seek(1))
ts, v = it.AtHistogram()
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[1], v)
// Another seek.
require.Equal(t, chunkenc.ValHistogram, it.Seek(3))
ts, v = it.AtHistogram()
require.Equal(t, int64(3), ts)
require.Equal(t, histograms[3], v)
// And we don't go back.
require.Equal(t, chunkenc.ValHistogram, it.Seek(2))
ts, v = it.AtHistogram()
require.Equal(t, int64(3), ts)
require.Equal(t, histograms[3], v)
// Seek beyond the end.
require.Equal(t, chunkenc.ValNone, it.Seek(5))
// And we don't go back. (This exposes issue #10027.)
require.Equal(t, chunkenc.ValNone, it.Seek(2))
}
func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) {
// Series starts as histograms, then transitions to floats at ts=8 (with an overlap from ts=8 to ts=10), then
// transitions back to histograms at ts=16.
histograms := tsdbutil.GenerateTestHistograms(15)
histProtos := make([]prompb.Histogram, len(histograms))
for i, h := range histograms {
if i < 10 {
histProtos[i] = HistogramToHistogramProto(int64(i+1), h)
} else {
histProtos[i] = HistogramToHistogramProto(int64(i+6), h)
}
}
series := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
floats: []prompb.Sample{
{Value: 1, Timestamp: 8},
{Value: 2, Timestamp: 9},
{Value: 3, Timestamp: 10},
{Value: 4, Timestamp: 11},
{Value: 5, Timestamp: 12},
{Value: 6, Timestamp: 13},
{Value: 7, Timestamp: 14},
{Value: 8, Timestamp: 15},
},
histograms: histProtos,
}
it := series.Iterator(nil)
var (
ts int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
require.Equal(t, int64(1), ts)
require.Equal(t, histograms[0], h)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
require.Equal(t, int64(2), ts)
require.Equal(t, histograms[1], h)
// Seek to the first float/histogram sample overlap at ts=8 (should prefer the float sample).
require.Equal(t, chunkenc.ValFloat, it.Seek(8))
ts, v = it.At()
require.Equal(t, int64(8), ts)
require.Equal(t, 1., v)
// Attempting to seek backwards should do nothing.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))
ts, v = it.At()
require.Equal(t, int64(8), ts)
require.Equal(t, 1., v)
// Seeking to 8 again should also do nothing.
require.Equal(t, chunkenc.ValFloat, it.Seek(8))
ts, v = it.At()
require.Equal(t, int64(8), ts)
require.Equal(t, 1., v)
// Again, should prefer the float sample.
require.Equal(t, chunkenc.ValFloat, it.Next())
ts, v = it.At()
require.Equal(t, int64(9), ts)
require.Equal(t, 2., v)
// Seek to ts=11 where there are only float samples.
require.Equal(t, chunkenc.ValFloat, it.Seek(11))
ts, v = it.At()
require.Equal(t, int64(11), ts)
require.Equal(t, 4., v)
// Seek to ts=15 right before the transition back to histogram samples.
require.Equal(t, chunkenc.ValFloat, it.Seek(15))
ts, v = it.At()
require.Equal(t, int64(15), ts)
require.Equal(t, 8., v)
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, h = it.AtHistogram()
require.Equal(t, int64(16), ts)
require.Equal(t, histograms[10], h)
// Getting a float histogram from an int histogram works.
require.Equal(t, chunkenc.ValHistogram, it.Next())
ts, fh = it.AtFloatHistogram()
require.Equal(t, int64(17), ts)
expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11]))
require.Equal(t, expected, fh)
// Keep calling Next() until the end.
for i := 0; i < 3; i++ {
require.Equal(t, chunkenc.ValHistogram, it.Next())
}
// The iterator is exhausted.
require.Equal(t, chunkenc.ValNone, it.Next())
require.Equal(t, chunkenc.ValNone, it.Next())
// Should also not be able to seek backwards again.
require.Equal(t, chunkenc.ValNone, it.Seek(1))
}
func TestFromQueryResultWithDuplicates(t *testing.T) {
ts1 := prompb.TimeSeries{
Labels: []prompb.Label{
@ -368,7 +528,7 @@ func TestNilHistogramProto(t *testing.T) {
// This function will panic if it impromperly handles nil
// values, causing the test to fail.
HistogramProtoToHistogram(prompb.Histogram{})
HistogramProtoToFloatHistogram(prompb.Histogram{})
FloatHistogramProtoToFloatHistogram(prompb.Histogram{})
}
func exampleHistogram() histogram.Histogram {
@ -563,7 +723,7 @@ func TestFloatHistogramToProtoConvert(t *testing.T) {
require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h))
require.Equal(t, h, *HistogramProtoToFloatHistogram(p))
require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p))
}
}

@ -126,7 +126,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
for _, hp := range ts.Histograms {
if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram.
fhs := HistogramProtoToFloatHistogram(hp)
fhs := FloatHistogramProtoToFloatHistogram(hp)
_, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs)
} else {
hs := HistogramProtoToHistogram(hp)

@ -69,7 +69,7 @@ func TestRemoteWriteHandler(t *testing.T) {
for _, hp := range ts.Histograms {
if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram.
fh := HistogramProtoToFloatHistogram(hp)
fh := FloatHistogramProtoToFloatHistogram(hp)
require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k])
} else {
h := HistogramProtoToHistogram(hp)

@ -96,7 +96,7 @@ type Iterator interface {
// timestamp equal or greater than t. If the current sample found by a
// previous `Next` or `Seek` operation already has this property, Seek
// has no effect. If a sample has been found, Seek returns the type of
// its value. Otherwise, it returns ValNone, after with the iterator is
// its value. Otherwise, it returns ValNone, after which the iterator is
// exhausted.
Seek(t int64) ValueType
// At returns the current timestamp/value pair if the value is a float.

Loading…
Cancel
Save