Browse Source

Simplify BufferedSeriesIterator usage (#4294)

* Allow for BufferedSeriesIterator instances to be created without an underlying iterator, to simplify their usage.

Signed-off-by: Alin Sinpalean <alin.sinpalean@gmail.com>
pull/4347/merge
Alin Sinpalean 6 years ago committed by Brian Brazil
parent
commit
e3b775b78b
  1. 32
      promql/engine.go
  2. 11
      storage/buffer.go
  3. 6
      storage/buffer_test.go
  4. 27
      storage/noop.go
  5. 3
      web/federate.go

32
promql/engine.go

@ -837,13 +837,9 @@ func (ev *evaluator) eval(expr Expr) Value {
inArgs[matrixArgIndex] = inMatrix inArgs[matrixArgIndex] = inMatrix
enh := &EvalNodeHelper{out: make(Vector, 0, 1)} enh := &EvalNodeHelper{out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time. // Process all the calls for one time series at a time.
var it *storage.BufferedSeriesIterator it := storage.NewBuffer(selRange)
for i, s := range sel.series { for i, s := range sel.series {
if it == nil { it.Reset(s.Iterator())
it = storage.NewBuffer(s.Iterator(), selRange)
} else {
it.Reset(s.Iterator())
}
ss := Series{ ss := Series{
// For all range vector functions, the only change to the // For all range vector functions, the only change to the
// output labels is dropping the metric name so just do // output labels is dropping the metric name so just do
@ -945,13 +941,9 @@ func (ev *evaluator) eval(expr Expr) Value {
case *VectorSelector: case *VectorSelector:
mat := make(Matrix, 0, len(e.series)) mat := make(Matrix, 0, len(e.series))
var it *storage.BufferedSeriesIterator it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
for i, s := range e.series { for i, s := range e.series {
if it == nil { it.Reset(s.Iterator())
it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta))
} else {
it.Reset(s.Iterator())
}
ss := Series{ ss := Series{
Metric: e.series[i].Labels(), Metric: e.series[i].Labels(),
Points: getPointSlice(numSteps), Points: getPointSlice(numSteps),
@ -986,13 +978,9 @@ func (ev *evaluator) vectorSelector(node *VectorSelector, ts int64) Vector {
vec = make(Vector, 0, len(node.series)) vec = make(Vector, 0, len(node.series))
) )
var it *storage.BufferedSeriesIterator it := storage.NewBuffer(durationMilliseconds(LookbackDelta))
for i, s := range node.series { for i, s := range node.series {
if it == nil { it.Reset(s.Iterator())
it = storage.NewBuffer(s.Iterator(), durationMilliseconds(LookbackDelta))
} else {
it.Reset(s.Iterator())
}
t, v, ok := ev.vectorSelectorSingle(it, node, ts) t, v, ok := ev.vectorSelectorSingle(it, node, ts)
if ok { if ok {
@ -1058,16 +1046,12 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
matrix = make(Matrix, 0, len(node.series)) matrix = make(Matrix, 0, len(node.series))
) )
var it *storage.BufferedSeriesIterator it := storage.NewBuffer(durationMilliseconds(node.Range))
for i, s := range node.series { for i, s := range node.series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
if it == nil { it.Reset(s.Iterator())
it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range))
} else {
it.Reset(s.Iterator())
}
ss := Series{ ss := Series{
Metric: node.series[i].Labels(), Metric: node.series[i].Labels(),
} }

11
storage/buffer.go

@ -27,8 +27,15 @@ type BufferedSeriesIterator struct {
} }
// NewBuffer returns a new iterator that buffers the values within the time range // NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before. // of the current element and the duration of delta before, initialized with an
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator { // empty iterator. Use Reset() to set an actual iterator to be buffered.
func NewBuffer(delta int64) *BufferedSeriesIterator {
return NewBufferIterator(&NoopSeriesIt, delta)
}
// NewBufferIterator returns a new iterator that buffers the values within the
// time range of the current element and the duration of delta before.
func NewBufferIterator(it SeriesIterator, delta int64) *BufferedSeriesIterator {
bit := &BufferedSeriesIterator{ bit := &BufferedSeriesIterator{
buf: newSampleRing(delta, 16), buf: newSampleRing(delta, 16),
} }

6
storage/buffer_test.go

@ -106,7 +106,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.Equal(t, ev, v, "value mismatch") require.Equal(t, ev, v, "value mismatch")
} }
it = NewBuffer(newListSeriesIterator([]sample{ it = NewBufferIterator(newListSeriesIterator([]sample{
{t: 1, v: 2}, {t: 1, v: 2},
{t: 2, v: 3}, {t: 2, v: 3},
{t: 3, v: 4}, {t: 3, v: 4},
@ -157,7 +157,7 @@ func TestBufferedSeriesIteratorNoBadAt(t *testing.T) {
err: func() error { return nil }, err: func() error { return nil },
} }
it := NewBuffer(m, 60) it := NewBufferIterator(m, 60)
it.Next() it.Next()
it.Next() it.Next()
} }
@ -177,7 +177,7 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) {
} }
// Simulate a 5 minute rate. // Simulate a 5 minute rate.
it := NewBuffer(newListSeriesIterator(samples), 5*60) it := NewBufferIterator(newListSeriesIterator(samples), 5*60)
b.SetBytes(int64(b.N * 16)) b.SetBytes(int64(b.N * 16))
b.ReportAllocs() b.ReportAllocs()

27
storage/noop.go

@ -13,7 +13,11 @@
package storage package storage
import "github.com/prometheus/prometheus/pkg/labels" import (
"math"
"github.com/prometheus/prometheus/pkg/labels"
)
type noopQuerier struct{} type noopQuerier struct{}
@ -52,3 +56,24 @@ func (noopSeriesSet) At() Series {
func (noopSeriesSet) Err() error { func (noopSeriesSet) Err() error {
return nil return nil
} }
type noopSeriesIterator struct{}
// NoopSeriesIt is a SeriesIterator that does nothing.
var NoopSeriesIt = noopSeriesIterator{}
func (noopSeriesIterator) At() (int64, float64) {
return math.MinInt64, 0
}
func (noopSeriesIterator) Seek(t int64) bool {
return false
}
func (noopSeriesIterator) Next() bool {
return false
}
func (noopSeriesIterator) Err() error {
return nil
}

3
web/federate.go

@ -84,12 +84,13 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
} }
set := storage.NewMergeSeriesSet(sets) set := storage.NewMergeSeriesSet(sets)
it := storage.NewBuffer(int64(promql.LookbackDelta / 1e6))
for set.Next() { for set.Next() {
s := set.At() s := set.At()
// TODO(fabxc): allow fast path for most recent sample either // TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus. // in the storage itself or caching layer in Prometheus.
it := storage.NewBuffer(s.Iterator(), int64(promql.LookbackDelta/1e6)) it.Reset(s.Iterator())
var t int64 var t int64
var v float64 var v float64

Loading…
Cancel
Save