Get histograms from TSDB to the rate() function implementation

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/9759/head
Ganesh Vernekar 2021-11-02 20:31:32 +05:30
parent a9008f5423
commit c8b267efd6
No known key found for this signature in database
GPG Key ID: 0F8729A5EB59B965
13 changed files with 229 additions and 91 deletions

View File

@ -40,6 +40,7 @@ import (
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/stats"
)
@ -1735,29 +1736,57 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
}
buf := it.Buffer()
for buf.Next() {
t, v := buf.At()
if value.IsStaleNaN(v) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
if it.ChunkEncoding() == chunkenc.EncHistogram {
for buf.Next() {
t, h := buf.AtHistogram()
if value.IsStaleNaN(h.Sum) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples++
out = append(out, Point{T: t, H: &h})
}
}
} else {
for buf.Next() {
t, v := buf.At()
if value.IsStaleNaN(v) {
continue
}
// Values in the buffer are guaranteed to be smaller than maxt.
if t >= mint {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.currentSamples++
out = append(out, Point{T: t, V: v})
}
ev.currentSamples++
out = append(out, Point{T: t, V: v})
}
}
// The seeked sample might also be in the range.
if ok {
t, v := it.Values()
if t == maxt && !value.IsStaleNaN(v) {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
if it.ChunkEncoding() == chunkenc.EncHistogram {
t, h := it.HistogramValues()
if t == maxt && !value.IsStaleNaN(h.Sum) {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, H: &h})
ev.currentSamples++
}
} else {
t, v := it.Values()
if t == maxt && !value.IsStaleNaN(v) {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
out = append(out, Point{T: t, V: v})
ev.currentSamples++
}
}
return out

View File

@ -16,6 +16,7 @@ package promql
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"sort"
@ -30,6 +31,7 @@ import (
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)
func TestMain(m *testing.M) {
@ -2429,3 +2431,33 @@ func TestRangeQuery(t *testing.T) {
})
}
}
func TestSparseHistogramRate(t *testing.T) {
// Currently, this test it to only find panics or errors in the engine execution path.
// The panic stack trace will mostly tell you what code path is breaking and needs fixing for
// fetching the raw histograms and passing it rightly upto the rate() function implementation.
// TODO: Check the result for correctness once implementation is ready.
test, err := NewTest(t, "")
require.NoError(t, err)
defer test.Close()
seriesName := "sparse_histogram_series"
lbls := labels.FromStrings("__name__", seriesName)
app := test.Storage().Appender(context.TODO())
for i, h := range tsdb.GenerateTestHistograms(100) {
_, err := app.AppendHistogram(0, lbls, int64(i)*int64(15*time.Second/time.Millisecond), h)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
require.NoError(t, test.Run())
engine := test.QueryEngine()
queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
qry, err := engine.NewInstantQuery(test.Queryable(), queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
}

View File

@ -47,7 +47,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
{
Metric: labels.FromStrings("__name__", "metric1"),
Points: []Point{
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5},
{0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil},
},
},
},
@ -58,7 +58,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
{
Metric: labels.FromStrings("__name__", "metric1"),
Points: []Point{
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5},
{0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil},
},
},
},
@ -69,7 +69,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
{
Metric: labels.FromStrings("__name__", "metric1"),
Points: []Point{
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7},
{0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, {50000, 6, nil}, {60000, 7, nil},
},
},
},
@ -89,13 +89,13 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
{
Metric: labels.FromStrings("__name__", "metric1"),
Points: []Point{
{0, 1}, {10000, 1}, {20000, 1}, {30000, 1}, {40000, 1}, {50000, 1},
{0, 1, nil}, {10000, 1, nil}, {20000, 1, nil}, {30000, 1, nil}, {40000, 1, nil}, {50000, 1, nil},
},
},
{
Metric: labels.FromStrings("__name__", "metric2"),
Points: []Point{
{0, 1}, {10000, 2}, {20000, 3}, {30000, 4}, {40000, 5}, {50000, 6}, {60000, 7}, {70000, 8},
{0, 1, nil}, {10000, 2, nil}, {20000, 3, nil}, {30000, 4, nil}, {40000, 5, nil}, {50000, 6, nil}, {60000, 7, nil}, {70000, 8, nil},
},
},
},

View File

@ -78,9 +78,12 @@ func (s Series) String() string {
}
// Point represents a single data point for a given timestamp.
// If H is not nil, then this is a histogram point and only (T, H) is valid.
// If H is nil, then only (T, V) is valid.
type Point struct {
T int64
V float64
H *histogram.Histogram
}
func (p Point) String() string {

View File

@ -197,7 +197,7 @@ func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
Point: promql.Point(v),
Point: promql.Point{T: v.T, V: v.V},
Metric: labels.Labels{},
}}, nil
default:

View File

@ -40,8 +40,9 @@ func NewBuffer(delta int64) *BufferedSeriesIterator {
// NewBufferIterator returns a new iterator that buffers the values within the
// time range of the current element and the duration of delta before.
func NewBufferIterator(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator {
// TODO(codesome): based on encoding, allocate different buffer.
bit := &BufferedSeriesIterator{
buf: newSampleRing(delta, 16),
buf: newSampleRing(delta, 16, it.ChunkEncoding()),
delta: delta,
}
bit.Reset(it)
@ -67,8 +68,9 @@ func (b *BufferedSeriesIterator) ReduceDelta(delta int64) bool {
// PeekBack returns the nth previous element of the iterator. If there is none buffered,
// ok is false.
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, ok bool) {
return b.buf.nthLast(n)
func (b *BufferedSeriesIterator) PeekBack(n int) (t int64, v float64, h *histogram.Histogram, ok bool) {
s, ok := b.buf.nthLast(n)
return s.t, s.v, s.h, ok
}
// Buffer returns an iterator over the buffered data. Invalidates previously
@ -90,7 +92,11 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
if !b.ok {
return false
}
b.lastTime, _ = b.Values()
if b.it.ChunkEncoding() == chunkenc.EncHistogram {
b.lastTime, _ = b.HistogramValues()
} else {
b.lastTime, _ = b.Values()
}
}
if b.lastTime >= t {
@ -112,11 +118,21 @@ func (b *BufferedSeriesIterator) Next() bool {
}
// Add current element to buffer before advancing.
b.buf.add(b.it.At())
if b.it.ChunkEncoding() == chunkenc.EncHistogram {
t, h := b.it.AtHistogram()
b.buf.add(sample{t: t, h: &h})
} else {
t, v := b.it.At()
b.buf.add(sample{t: t, v: v})
}
b.ok = b.it.Next()
if b.ok {
b.lastTime, _ = b.Values()
if b.it.ChunkEncoding() == chunkenc.EncHistogram {
b.lastTime, _ = b.HistogramValues()
} else {
b.lastTime, _ = b.Values()
}
}
return b.ok
@ -127,6 +143,16 @@ func (b *BufferedSeriesIterator) Values() (int64, float64) {
return b.it.At()
}
// HistogramValues returns the current histogram element of the iterator.
func (b *BufferedSeriesIterator) HistogramValues() (int64, histogram.Histogram) {
return b.it.AtHistogram()
}
// ChunkEncoding return the chunk encoding of the underlying iterator.
func (b *BufferedSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return b.it.ChunkEncoding()
}
// Err returns the last encountered error.
func (b *BufferedSeriesIterator) Err() error {
return b.it.Err()
@ -135,6 +161,7 @@ func (b *BufferedSeriesIterator) Err() error {
type sample struct {
t int64
v float64
h *histogram.Histogram
}
func (s sample) T() int64 {
@ -145,9 +172,14 @@ func (s sample) V() float64 {
return s.v
}
func (s sample) H() *histogram.Histogram {
return s.h
}
type sampleRing struct {
delta int64
enc chunkenc.Encoding
buf []sample // lookback buffer
i int // position of most recent element in ring buffer
f int // position of first element in ring buffer
@ -156,8 +188,8 @@ type sampleRing struct {
it sampleRingIterator
}
func newSampleRing(delta int64, sz int) *sampleRing {
r := &sampleRing{delta: delta, buf: make([]sample, sz)}
func newSampleRing(delta int64, sz int, enc chunkenc.Encoding) *sampleRing {
r := &sampleRing{delta: delta, buf: make([]sample, sz), enc: enc}
r.reset()
return r
@ -200,13 +232,12 @@ func (it *sampleRingIterator) At() (int64, float64) {
// AtHistogram always returns (0, histogram.Histogram{}) because there is no
// support for histogram values yet.
// TODO(beorn7): Fix that for histogram support in PromQL.
func (it *sampleRingIterator) AtHistogram() (int64, histogram.Histogram) {
return 0, histogram.Histogram{}
return it.r.atHistogram(it.i)
}
func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
return it.r.enc
}
func (r *sampleRing) at(i int) (int64, float64) {
@ -215,9 +246,20 @@ func (r *sampleRing) at(i int) (int64, float64) {
return s.t, s.v
}
func (r *sampleRing) atHistogram(i int) (int64, histogram.Histogram) {
j := (r.f + i) % len(r.buf)
s := r.buf[j]
return s.t, *s.h
}
func (r *sampleRing) atSample(i int) sample {
j := (r.f + i) % len(r.buf)
return r.buf[j]
}
// add adds a sample to the ring buffer and frees all samples that fall
// out of the delta range.
func (r *sampleRing) add(t int64, v float64) {
func (r *sampleRing) add(s sample) {
l := len(r.buf)
// Grow the ring buffer if it fits no more elements.
if l == r.l {
@ -236,11 +278,11 @@ func (r *sampleRing) add(t int64, v float64) {
}
}
r.buf[r.i] = sample{t: t, v: v}
r.buf[r.i] = s
r.l++
// Free head of the buffer of samples that just fell out of the range.
tmin := t - r.delta
tmin := s.t - r.delta
for r.buf[r.f].t < tmin {
r.f++
if r.f >= l {
@ -276,12 +318,11 @@ func (r *sampleRing) reduceDelta(delta int64) bool {
}
// nthLast returns the nth most recent element added to the ring.
func (r *sampleRing) nthLast(n int) (int64, float64, bool) {
func (r *sampleRing) nthLast(n int) (sample, bool) {
if n > r.l {
return 0, 0, false
return sample{}, false
}
t, v := r.at(r.l - n)
return t, v, true
return r.atSample(r.l - n), true
}
func (r *sampleRing) samples() []sample {

View File

@ -297,19 +297,26 @@ func (e errChunksIterator) Err() error { return e.err }
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
// sample implementations. if nil, sample type from this package will be used.
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64) tsdbutil.Sample) ([]tsdbutil.Sample, error) {
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, v float64, h *histogram.Histogram) tsdbutil.Sample) ([]tsdbutil.Sample, error) {
if newSampleFn == nil {
newSampleFn = func(t int64, v float64) tsdbutil.Sample { return sample{t, v} }
newSampleFn = func(t int64, v float64, h *histogram.Histogram) tsdbutil.Sample { return sample{t, v, h} }
}
var result []tsdbutil.Sample
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
if iter.ChunkEncoding() == chunkenc.EncHistogram {
for iter.Next() {
t, h := iter.AtHistogram()
result = append(result, newSampleFn(t, 0, &h))
}
} else {
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, newSampleFn(t, v, nil))
}
result = append(result, newSampleFn(t, v))
}
return result, iter.Err()
}

View File

@ -1336,7 +1336,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
timeStep := DefaultBlockDuration / int64(numHistograms)
expHists := make([]timedHist, 0, numHistograms)
l := labels.Labels{{Name: "a", Value: "b"}}
for i, h := range generateHistograms(numHistograms) {
for i, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, int64(i)*timeStep, h)
require.NoError(t, err)
expHists = append(expHists, timedHist{int64(i) * timeStep, h})
@ -1715,8 +1715,8 @@ func TestSparseHistogramCompactionAndQuery(t *testing.T) {
}
expHists := make(map[string][]timedHist)
series1Histograms := generateHistograms(20)
series2Histograms := generateHistograms(20)
series1Histograms := GenerateTestHistograms(20)
series2Histograms := GenerateTestHistograms(20)
idx1, idx2 := -1, -1
addNextHists := func(ts int64, app storage.Appender) {
lbls1 := labels.Labels{{Name: "a", Value: "b"}}

View File

@ -1496,11 +1496,13 @@ type histogramSample struct {
type sample struct {
t int64
v float64
h *histogram.Histogram
}
func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} }
func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v, nil} }
func (s sample) T() int64 { return s.t }
func (s sample) V() float64 { return s.v }
func (s sample) H() *histogram.Histogram { return s.h }
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
@ -1658,3 +1660,22 @@ func (h *Head) updateWALReplayStatusRead(current int) {
h.stats.WALReplayStatus.Current = current
}
func GenerateTestHistograms(n int) (r []histogram.Histogram) {
for i := 0; i < n; i++ {
r = append(r, histogram.Histogram{
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
})
}
return r
}

View File

@ -444,6 +444,7 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf
msIter.histogramBuf = s.histogramBuf
msIter.histogramSeries = s.histogramSeries
return msIter
}
return &memSafeIterator{
@ -452,18 +453,20 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
i: -1,
stopAfter: stopAfter,
},
total: numSamples,
buf: s.sampleBuf,
histogramBuf: s.histogramBuf,
total: numSamples,
buf: s.sampleBuf,
histogramBuf: s.histogramBuf,
histogramSeries: s.histogramSeries,
}
}
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
histogramBuf [4]histogramSample
histogramSeries bool
total int
buf [4]sample
histogramBuf [4]histogramSample
}
func (it *memSafeIterator) Seek(t int64) bool {
@ -471,15 +474,29 @@ func (it *memSafeIterator) Seek(t int64) bool {
return false
}
ts, _ := it.At()
for t > ts || it.i == -1 {
if !it.Next() {
return false
}
var ts int64
if it.histogramSeries {
ts, _ = it.AtHistogram()
} else {
ts, _ = it.At()
}
if it.histogramSeries {
for t > ts || it.i == -1 {
if !it.Next() {
return false
}
ts, _ = it.AtHistogram()
}
} else {
for t > ts || it.i == -1 {
if !it.Next() {
return false
}
ts, _ = it.At()
}
}
return true
}

View File

@ -2544,7 +2544,7 @@ func TestAppendHistogram(t *testing.T) {
h histogram.Histogram
}
expHistograms := make([]timedHistogram, 0, numHistograms)
for i, h := range generateHistograms(numHistograms) {
for i, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, int64(i), h)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{int64(i), h})
@ -2591,7 +2591,7 @@ func TestHistogramInWAL(t *testing.T) {
h histogram.Histogram
}
expHistograms := make([]timedHistogram, 0, numHistograms)
for i, h := range generateHistograms(numHistograms) {
for i, h := range GenerateTestHistograms(numHistograms) {
h.NegativeSpans = h.PositiveSpans
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, l, int64(i), h)
@ -2630,25 +2630,6 @@ func TestHistogramInWAL(t *testing.T) {
require.Equal(t, expHistograms, actHistograms)
}
func generateHistograms(n int) (r []histogram.Histogram) {
for i := 0; i < n; i++ {
r = append(r, histogram.Histogram{
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
})
}
return r
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, false)
defer func() {
@ -2962,7 +2943,7 @@ func TestHistogramMetrics(t *testing.T) {
for x := 0; x < 5; x++ {
expHSeries++
l := labels.Labels{{Name: "a", Value: fmt.Sprintf("b%d", x)}}
for i, h := range generateHistograms(10) {
for i, h := range GenerateTestHistograms(10) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, l, int64(i), h)
require.NoError(t, err)
@ -3039,7 +3020,7 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in the same appender.
app := head.Appender(context.Background())
for _, h := range generateHistograms(numHistograms) {
for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
@ -3058,7 +3039,7 @@ func TestHistogramStaleSample(t *testing.T) {
// Adding stale in different appender and continuing series after a stale sample.
app = head.Appender(context.Background())
for _, h := range generateHistograms(2 * numHistograms)[numHistograms:] {
for _, h := range GenerateTestHistograms(2 * numHistograms)[numHistograms:] {
_, err := app.AppendHistogram(0, l, 100*int64(len(expHistograms)), h)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{100 * int64(len(expHistograms)), h})
@ -3112,7 +3093,7 @@ func TestHistogramCounterResetHeader(t *testing.T) {
require.Equal(t, expHeaders[len(expHeaders)-1], ms.headChunk.chunk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
h := generateHistograms(1)[0]
h := GenerateTestHistograms(1)[0]
if len(h.NegativeBuckets) == 0 {
h.NegativeSpans = append([]histogram.Span{}, h.PositiveSpans...)
h.NegativeBuckets = append([]int64{}, h.PositiveBuckets...)

View File

@ -102,6 +102,7 @@ func (b *BufferedSeriesIterator) Err() error {
type sample struct {
t int64
v float64
h *histogram.Histogram
}
func (s sample) T() int64 {
@ -112,6 +113,10 @@ func (s sample) V() float64 {
return s.v
}
func (s sample) H() *histogram.Histogram {
return s.h
}
type sampleRing struct {
delta int64

View File

@ -14,6 +14,7 @@
package tsdbutil
import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
@ -26,6 +27,7 @@ type Samples interface {
type Sample interface {
T() int64
V() float64
H() *histogram.Histogram
}
type SampleSlice []Sample
@ -61,7 +63,7 @@ func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
func PopulatedChunk(numSamples int, minTime int64) chunks.Meta {
samples := make([]Sample, numSamples)
for i := 0; i < numSamples; i++ {
samples[i] = sample{minTime + int64(i*1000), 1.0}
samples[i] = sample{t: minTime + int64(i*1000), v: 1.0}
}
return ChunkFromSamples(samples)
}