diff --git a/storage/remote/codec.go b/storage/remote/codec.go index ba802bccb..78534ed93 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "sort" "strings" @@ -173,7 +174,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet return errSeriesSet{err: err} } lbls := labelProtosToLabels(ts.Labels) - series = append(series, &concreteSeries{labels: lbls, samples: ts.Samples}) + series = append(series, &concreteSeries{labels: lbls, floats: ts.Samples, histograms: ts.Histograms}) } if sortSeries { @@ -359,8 +360,9 @@ func (c *concreteSeriesSet) Warnings() storage.Warnings { return nil } // concreteSeries implements storage.Series. type concreteSeries struct { - labels labels.Labels - samples []prompb.Sample + labels labels.Labels + floats []prompb.Sample + histograms []prompb.Histogram } func (c *concreteSeries) Labels() labels.Labels { @@ -372,84 +374,168 @@ func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { csi.reset(c) return csi } - return newConcreteSeriersIterator(c) + return newConcreteSeriesIterator(c) } // concreteSeriesIterator implements storage.SeriesIterator. type concreteSeriesIterator struct { - cur int - series *concreteSeries + floatsCur int + histogramsCur int + curValType chunkenc.ValueType + series *concreteSeries } -func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator { +func newConcreteSeriesIterator(series *concreteSeries) chunkenc.Iterator { return &concreteSeriesIterator{ - cur: -1, - series: series, + floatsCur: -1, + histogramsCur: -1, + curValType: chunkenc.ValNone, + series: series, } } func (c *concreteSeriesIterator) reset(series *concreteSeries) { - c.cur = -1 + c.floatsCur = -1 + c.histogramsCur = -1 + c.curValType = chunkenc.ValNone c.series = series } // Seek implements storage.SeriesIterator. func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { - if c.cur == -1 { - c.cur = 0 + if c.floatsCur == -1 { + c.floatsCur = 0 + } + if c.histogramsCur == -1 { + c.histogramsCur = 0 } - if c.cur >= len(c.series.samples) { + if c.floatsCur >= len(c.series.floats) && c.histogramsCur >= len(c.series.histograms) { return chunkenc.ValNone } + // No-op check. - if s := c.series.samples[c.cur]; s.Timestamp >= t { - return chunkenc.ValFloat + if (c.curValType == chunkenc.ValFloat && c.series.floats[c.floatsCur].Timestamp >= t) || + ((c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram) && c.series.histograms[c.histogramsCur].Timestamp >= t) { + return c.curValType } - // Do binary search between current position and end. - c.cur += sort.Search(len(c.series.samples)-c.cur, func(n int) bool { - return c.series.samples[n+c.cur].Timestamp >= t + + c.curValType = chunkenc.ValNone + + // Binary search between current position and end for both float and histograms samples. + c.floatsCur += sort.Search(len(c.series.floats)-c.floatsCur, func(n int) bool { + return c.series.floats[n+c.floatsCur].Timestamp >= t + }) + c.histogramsCur += sort.Search(len(c.series.histograms)-c.histogramsCur, func(n int) bool { + return c.series.histograms[n+c.histogramsCur].Timestamp >= t }) - if c.cur < len(c.series.samples) { - return chunkenc.ValFloat + + if c.floatsCur < len(c.series.floats) && c.histogramsCur < len(c.series.histograms) { + // If float samples and histogram samples have overlapping timestamps prefer the float samples. + if c.series.floats[c.floatsCur].Timestamp <= c.series.histograms[c.histogramsCur].Timestamp { + c.curValType = chunkenc.ValFloat + } else { + c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) + } + // When the timestamps do not overlap the cursor for the non-selected sample type has advanced too + // far; we decrement it back down here. + if c.series.floats[c.floatsCur].Timestamp != c.series.histograms[c.histogramsCur].Timestamp { + if c.curValType == chunkenc.ValFloat { + c.histogramsCur-- + } else { + c.floatsCur-- + } + } + } else if c.floatsCur < len(c.series.floats) { + c.curValType = chunkenc.ValFloat + } else if c.histogramsCur < len(c.series.histograms) { + c.curValType = getHistogramValType(&c.series.histograms[c.histogramsCur]) } - return chunkenc.ValNone - // TODO(beorn7): Add histogram support. + + return c.curValType +} + +func getHistogramValType(h *prompb.Histogram) chunkenc.ValueType { + _, isInt := h.GetCount().(*prompb.Histogram_CountInt) + if isInt { + return chunkenc.ValHistogram + } + return chunkenc.ValFloatHistogram } // At implements chunkenc.Iterator. func (c *concreteSeriesIterator) At() (t int64, v float64) { - s := c.series.samples[c.cur] + if c.curValType != chunkenc.ValFloat { + panic("iterator is not on a float sample") + } + s := c.series.floats[c.floatsCur] return s.Timestamp, s.Value } -// AtHistogram always returns (0, nil) because there is no support for histogram -// values yet. -// TODO(beorn7): Fix that for histogram support in remote storage. +// AtHistogram implements chunkenc.Iterator func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - return 0, nil + if c.curValType != chunkenc.ValHistogram { + panic("iterator is not on an integer histogram sample") + } + h := c.series.histograms[c.histogramsCur] + return h.Timestamp, HistogramProtoToHistogram(h) } -// AtFloatHistogram always returns (0, nil) because there is no support for histogram -// values yet. -// TODO(beorn7): Fix that for histogram support in remote storage. +// AtFloatHistogram implements chunkenc.Iterator func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return 0, nil + switch c.curValType { + case chunkenc.ValHistogram: + fh := c.series.histograms[c.histogramsCur] + return fh.Timestamp, HistogramProtoToFloatHistogram(fh) + case chunkenc.ValFloatHistogram: + fh := c.series.histograms[c.histogramsCur] + return fh.Timestamp, FloatHistogramProtoToFloatHistogram(fh) + default: + panic("iterator is not on a histogram sample") + } } // AtT implements chunkenc.Iterator. func (c *concreteSeriesIterator) AtT() int64 { - s := c.series.samples[c.cur] - return s.Timestamp + if c.curValType == chunkenc.ValHistogram || c.curValType == chunkenc.ValFloatHistogram { + return c.series.histograms[c.histogramsCur].Timestamp + } + return c.series.floats[c.floatsCur].Timestamp } +const noTS = int64(math.MaxInt64) + // Next implements chunkenc.Iterator. func (c *concreteSeriesIterator) Next() chunkenc.ValueType { - c.cur++ - if c.cur < len(c.series.samples) { - return chunkenc.ValFloat - } - return chunkenc.ValNone - // TODO(beorn7): Add histogram support. + peekFloatTS := noTS + if c.floatsCur+1 < len(c.series.floats) { + peekFloatTS = c.series.floats[c.floatsCur+1].Timestamp + } + peekHistTS := noTS + if c.histogramsCur+1 < len(c.series.histograms) { + peekHistTS = c.series.histograms[c.histogramsCur+1].Timestamp + } + c.curValType = chunkenc.ValNone + + if peekFloatTS < peekHistTS { + c.floatsCur++ + c.curValType = chunkenc.ValFloat + } else if peekHistTS < peekFloatTS { + c.histogramsCur++ + c.curValType = chunkenc.ValHistogram + } else if peekFloatTS == noTS && peekHistTS == noTS { + // This only happens when the iterator is exhausted; we set the cursors off the end to prevent + // Seek() from returning anything afterwards. + c.floatsCur = len(c.series.floats) + c.histogramsCur = len(c.series.histograms) + } else { + // Prefer float samples to histogram samples if there's a conflict. We advance the cursor for histograms + // anyway otherwise the histogram sample will get selected on the next call to Next(). + c.floatsCur++ + c.histogramsCur++ + c.curValType = chunkenc.ValFloat + } + + return c.curValType } // Err implements chunkenc.Iterator. @@ -557,10 +643,10 @@ func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram { } } -// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the +// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the // provided proto message to a Float Histogram. The caller has to make sure that -// the proto message represents an float histogram and not a integer histogram. -func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram { +// the proto message represents a float histogram and not an integer histogram. +func FloatHistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram { return &histogram.FloatHistogram{ CounterResetHint: histogram.CounterResetHint(hp.ResetHint), Schema: hp.Schema, @@ -575,6 +661,24 @@ func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogr } } +// HistogramProtoToFloatHistogram extracts and converts a (normal integer) histogram from the provided proto message +// to a float histogram. The caller has to make sure that the proto message represents an integer histogram and not a +// float histogram. +func HistogramProtoToFloatHistogram(hp prompb.Histogram) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(hp.ResetHint), + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: float64(hp.GetZeroCountInt()), + Count: float64(hp.GetCountInt()), + Sum: hp.Sum, + PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), + PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()), + NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), + NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()), + } +} + func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span { spans := make([]histogram.Span, len(s)) for i := 0; i < len(s); i++ { @@ -584,6 +688,16 @@ func spansProtoToSpans(s []prompb.BucketSpan) []histogram.Span { return spans } +func deltasToCounts(deltas []int64) []float64 { + counts := make([]float64, len(deltas)) + var cur float64 + for i, d := range deltas { + cur += float64(d) + counts[i] = cur + } + return counts +} + func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram { return prompb.Histogram{ Count: &prompb.Histogram_CountInt{CountInt: h.Count}, diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 4137f91aa..36d0d7c31 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tsdbutil" ) var testHistogram = histogram.Histogram{ @@ -174,12 +175,12 @@ func TestValidateLabelsAndMetricName(t *testing.T) { func TestConcreteSeriesSet(t *testing.T) { series1 := &concreteSeries{ - labels: labels.FromStrings("foo", "bar"), - samples: []prompb.Sample{{Value: 1, Timestamp: 2}}, + labels: labels.FromStrings("foo", "bar"), + floats: []prompb.Sample{{Value: 1, Timestamp: 2}}, } series2 := &concreteSeries{ - labels: labels.FromStrings("foo", "baz"), - samples: []prompb.Sample{{Value: 3, Timestamp: 4}}, + labels: labels.FromStrings("foo", "baz"), + floats: []prompb.Sample{{Value: 3, Timestamp: 4}}, } c := &concreteSeriesSet{ series: []storage.Series{series1, series2}, @@ -206,10 +207,10 @@ func TestConcreteSeriesClonesLabels(t *testing.T) { require.Equal(t, lbls, gotLabels) } -func TestConcreteSeriesIterator(t *testing.T) { +func TestConcreteSeriesIterator_FloatSamples(t *testing.T) { series := &concreteSeries{ labels: labels.FromStrings("foo", "bar"), - samples: []prompb.Sample{ + floats: []prompb.Sample{ {Value: 1, Timestamp: 1}, {Value: 1.5, Timestamp: 1}, {Value: 2, Timestamp: 2}, @@ -255,6 +256,165 @@ func TestConcreteSeriesIterator(t *testing.T) { require.Equal(t, chunkenc.ValNone, it.Seek(2)) } +func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) { + histograms := tsdbutil.GenerateTestHistograms(5) + histProtos := make([]prompb.Histogram, len(histograms)) + for i, h := range histograms { + // Results in ts sequence of 1, 1, 2, 3, 4. + var ts int64 + if i == 0 { + ts = 1 + } else { + ts = int64(i) + } + histProtos[i] = HistogramToHistogramProto(ts, h) + } + series := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + histograms: histProtos, + } + it := series.Iterator(nil) + + // Seek to the first sample with ts=1. + require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) + ts, v := it.AtHistogram() + require.Equal(t, int64(1), ts) + require.Equal(t, histograms[0], v) + + // Seek one further, next sample still has ts=1. + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, v = it.AtHistogram() + require.Equal(t, int64(1), ts) + require.Equal(t, histograms[1], v) + + // Seek again to 1 and make sure we stay where we are. + require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) + ts, v = it.AtHistogram() + require.Equal(t, int64(1), ts) + require.Equal(t, histograms[1], v) + + // Another seek. + require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) + ts, v = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, histograms[3], v) + + // And we don't go back. + require.Equal(t, chunkenc.ValHistogram, it.Seek(2)) + ts, v = it.AtHistogram() + require.Equal(t, int64(3), ts) + require.Equal(t, histograms[3], v) + + // Seek beyond the end. + require.Equal(t, chunkenc.ValNone, it.Seek(5)) + // And we don't go back. (This exposes issue #10027.) + require.Equal(t, chunkenc.ValNone, it.Seek(2)) +} + +func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { + // Series starts as histograms, then transitions to floats at ts=8 (with an overlap from ts=8 to ts=10), then + // transitions back to histograms at ts=16. + histograms := tsdbutil.GenerateTestHistograms(15) + histProtos := make([]prompb.Histogram, len(histograms)) + for i, h := range histograms { + if i < 10 { + histProtos[i] = HistogramToHistogramProto(int64(i+1), h) + } else { + histProtos[i] = HistogramToHistogramProto(int64(i+6), h) + } + } + series := &concreteSeries{ + labels: labels.FromStrings("foo", "bar"), + floats: []prompb.Sample{ + {Value: 1, Timestamp: 8}, + {Value: 2, Timestamp: 9}, + {Value: 3, Timestamp: 10}, + {Value: 4, Timestamp: 11}, + {Value: 5, Timestamp: 12}, + {Value: 6, Timestamp: 13}, + {Value: 7, Timestamp: 14}, + {Value: 8, Timestamp: 15}, + }, + histograms: histProtos, + } + it := series.Iterator(nil) + + var ( + ts int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram + ) + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(1), ts) + require.Equal(t, histograms[0], h) + + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(2), ts) + require.Equal(t, histograms[1], h) + + // Seek to the first float/histogram sample overlap at ts=8 (should prefer the float sample). + require.Equal(t, chunkenc.ValFloat, it.Seek(8)) + ts, v = it.At() + require.Equal(t, int64(8), ts) + require.Equal(t, 1., v) + + // Attempting to seek backwards should do nothing. + require.Equal(t, chunkenc.ValFloat, it.Seek(1)) + ts, v = it.At() + require.Equal(t, int64(8), ts) + require.Equal(t, 1., v) + + // Seeking to 8 again should also do nothing. + require.Equal(t, chunkenc.ValFloat, it.Seek(8)) + ts, v = it.At() + require.Equal(t, int64(8), ts) + require.Equal(t, 1., v) + + // Again, should prefer the float sample. + require.Equal(t, chunkenc.ValFloat, it.Next()) + ts, v = it.At() + require.Equal(t, int64(9), ts) + require.Equal(t, 2., v) + + // Seek to ts=11 where there are only float samples. + require.Equal(t, chunkenc.ValFloat, it.Seek(11)) + ts, v = it.At() + require.Equal(t, int64(11), ts) + require.Equal(t, 4., v) + + // Seek to ts=15 right before the transition back to histogram samples. + require.Equal(t, chunkenc.ValFloat, it.Seek(15)) + ts, v = it.At() + require.Equal(t, int64(15), ts) + require.Equal(t, 8., v) + + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, h = it.AtHistogram() + require.Equal(t, int64(16), ts) + require.Equal(t, histograms[10], h) + + // Getting a float histogram from an int histogram works. + require.Equal(t, chunkenc.ValHistogram, it.Next()) + ts, fh = it.AtFloatHistogram() + require.Equal(t, int64(17), ts) + expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11])) + require.Equal(t, expected, fh) + + // Keep calling Next() until the end. + for i := 0; i < 3; i++ { + require.Equal(t, chunkenc.ValHistogram, it.Next()) + } + + // The iterator is exhausted. + require.Equal(t, chunkenc.ValNone, it.Next()) + require.Equal(t, chunkenc.ValNone, it.Next()) + // Should also not be able to seek backwards again. + require.Equal(t, chunkenc.ValNone, it.Seek(1)) +} + func TestFromQueryResultWithDuplicates(t *testing.T) { ts1 := prompb.TimeSeries{ Labels: []prompb.Label{ @@ -368,7 +528,7 @@ func TestNilHistogramProto(t *testing.T) { // This function will panic if it impromperly handles nil // values, causing the test to fail. HistogramProtoToHistogram(prompb.Histogram{}) - HistogramProtoToFloatHistogram(prompb.Histogram{}) + FloatHistogramProtoToFloatHistogram(prompb.Histogram{}) } func exampleHistogram() histogram.Histogram { @@ -563,7 +723,7 @@ func TestFloatHistogramToProtoConvert(t *testing.T) { require.Equal(t, p, FloatHistogramToHistogramProto(1337, &h)) - require.Equal(t, h, *HistogramProtoToFloatHistogram(p)) + require.Equal(t, h, *FloatHistogramProtoToFloatHistogram(p)) } } diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 45304c43c..5aa113088 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -126,7 +126,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err for _, hp := range ts.Histograms { if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram. - fhs := HistogramProtoToFloatHistogram(hp) + fhs := FloatHistogramProtoToFloatHistogram(hp) _, err = app.AppendHistogram(0, labels, hp.Timestamp, nil, fhs) } else { hs := HistogramProtoToHistogram(hp) diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 58c4439fa..9c787f17e 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -69,7 +69,7 @@ func TestRemoteWriteHandler(t *testing.T) { for _, hp := range ts.Histograms { if hp.GetCountFloat() > 0 || hp.GetZeroCountFloat() > 0 { // It is a float histogram. - fh := HistogramProtoToFloatHistogram(hp) + fh := FloatHistogramProtoToFloatHistogram(hp) require.Equal(t, mockHistogram{labels, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { h := HistogramProtoToHistogram(hp) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index c550cbc78..1ebef3eb1 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -96,7 +96,7 @@ type Iterator interface { // timestamp equal or greater than t. If the current sample found by a // previous `Next` or `Seek` operation already has this property, Seek // has no effect. If a sample has been found, Seek returns the type of - // its value. Otherwise, it returns ValNone, after with the iterator is + // its value. Otherwise, it returns ValNone, after which the iterator is // exhausted. Seek(t int64) ValueType // At returns the current timestamp/value pair if the value is a float.