From e3b775b78bd444c1611a5ffbbdb433f38acff86d Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Wed, 18 Jul 2018 06:10:28 +0200 Subject: [PATCH] Simplify BufferedSeriesIterator usage (#4294) * Allow for BufferedSeriesIterator instances to be created without an underlying iterator, to simplify their usage. Signed-off-by: Alin Sinpalean --- promql/engine.go | 32 ++++++++------------------------ storage/buffer.go | 11 +++++++++-- storage/buffer_test.go | 6 +++--- storage/noop.go | 27 ++++++++++++++++++++++++++- web/federate.go | 3 ++- 5 files changed, 48 insertions(+), 31 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 29b9618cc..ca879ca51 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -837,13 +837,9 @@ func (ev *evaluator) eval(expr Expr) Value { inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{out: make(Vector, 0, 1)} // Process all the calls for one time series at a time. - var it *storage.BufferedSeriesIterator + it := storage.NewBuffer(selRange) for i, s := range sel.series { - if it == nil { - it = storage.NewBuffer(s.Iterator(), selRange) - } else { - it.Reset(s.Iterator()) - } + it.Reset(s.Iterator()) ss := Series{ // For all range vector functions, the only change to the // output labels is dropping the metric name so just do @@ -945,13 +941,9 @@ func (ev *evaluator) eval(expr Expr) Value { case *VectorSelector: mat := make(Matrix, 0, len(e.series)) - var it *storage.BufferedSeriesIterator + it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) for i, s := range e.series { - if it == nil { - it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) - } else { - it.Reset(s.Iterator()) - } + it.Reset(s.Iterator()) ss := Series{ Metric: e.series[i].Labels(), Points: getPointSlice(numSteps), @@ -986,13 +978,9 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector { vec = make(Vector, 0, len(node.series)) ) - var it *storage.BufferedSeriesIterator + it := storage.NewBuffer(durationMilliseconds(LookbackDelta)) for i, s := range node.series { - if it == nil { - it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta)) - } else { - it.Reset(s.Iterator()) - } + it.Reset(s.Iterator()) t, v, ok := ev.vectorSelectorSingle(it, node, ts) if ok { @@ -1058,16 +1046,12 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { matrix = make(Matrix, 0, len(node.series)) ) - var it *storage.BufferedSeriesIterator + it := storage.NewBuffer(durationMilliseconds(node.Range)) for i, s := range node.series { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) } - if it == nil { - it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) - } else { - it.Reset(s.Iterator()) - } + it.Reset(s.Iterator()) ss := Series{ Metric: node.series[i].Labels(), } diff --git a/storage/buffer.go b/storage/buffer.go index 7df4027c5..b45b55265 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -27,8 +27,15 @@ type BufferedSeriesIterator struct { } // NewBuffer returns a new iterator that buffers the values within the time range -// of the current element and the duration of delta before. -func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { +// of the current element and the duration of delta before, initialized with an +// empty iterator. Use Reset() to set an actual iterator to be buffered. +func NewBuffer(delta int64) *BufferedSeriesIterator { + return NewBufferIterator(&NoopSeriesIt, delta) +} + +// NewBufferIterator returns a new iterator that buffers the values within the +// time range of the current element and the duration of delta before. +func NewBufferIterator(it SeriesIterator, delta int64) *BufferedSeriesIterator { bit := &BufferedSeriesIterator{ buf: newSampleRing(delta, 16), } diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 5b752dec2..237756f41 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -106,7 +106,7 @@ func TestBufferedSeriesIterator(t *testing.T) { require.Equal(t, ev, v, "value mismatch") } - it = NewBuffer(newListSeriesIterator([]sample{ + it = NewBufferIterator(newListSeriesIterator([]sample{ {t: 1, v: 2}, {t: 2, v: 3}, {t: 3, v: 4}, @@ -157,7 +157,7 @@ func TestBufferedSeriesIteratorNoBadAt(t *testing.T) { err: func() error { return nil }, } - it := NewBuffer(m, 60) + it := NewBufferIterator(m, 60) it.Next() it.Next() } @@ -177,7 +177,7 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) { } // Simulate a 5 minute rate. - it := NewBuffer(newListSeriesIterator(samples), 5*60) + it := NewBufferIterator(newListSeriesIterator(samples), 5*60) b.SetBytes(int64(b.N * 16)) b.ReportAllocs() diff --git a/storage/noop.go b/storage/noop.go index fa024430f..7bf92dbd7 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -13,7 +13,11 @@ package storage -import "github.com/prometheus/prometheus/pkg/labels" +import ( + "math" + + "github.com/prometheus/prometheus/pkg/labels" +) type noopQuerier struct{} @@ -52,3 +56,24 @@ func (noopSeriesSet) At() Series { func (noopSeriesSet) Err() error { return nil } + +type noopSeriesIterator struct{} + +// NoopSeriesIt is a SeriesIterator that does nothing. +var NoopSeriesIt = noopSeriesIterator{} + +func (noopSeriesIterator) At() (int64, float64) { + return math.MinInt64, 0 +} + +func (noopSeriesIterator) Seek(t int64) bool { + return false +} + +func (noopSeriesIterator) Next() bool { + return false +} + +func (noopSeriesIterator) Err() error { + return nil +} diff --git a/web/federate.go b/web/federate.go index e5a176517..ac500fb61 100644 --- a/web/federate.go +++ b/web/federate.go @@ -84,12 +84,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { } set := storage.NewMergeSeriesSet(sets) + it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6)) for set.Next() { s := set.At() // TODO(fabxc): allow fast path for most recent sample either // in the storage itself or caching layer in Prometheus. - it := storage.NewBuffer(s.Iterator(), int64(promql.LookbackDelta/1e6)) + it.Reset(s.Iterator()) var t int64 var v float64