diff --git a/promql/engine.go b/promql/engine.go index 8d0b1b380..9fba6c68b 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1411,7 +1411,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } mat := make(Matrix, 0, len(e.Series)) - it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) + it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) for i, s := range e.Series { it.Reset(s.Iterator()) ss := Series{ @@ -1542,7 +1542,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws}) } vec := make(Vector, 0, len(node.Series)) - it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta)) + it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) for i, s := range node.Series { it.Reset(s.Iterator()) @@ -1564,7 +1564,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } // vectorSelectorSingle evaluates a instant vector for the iterator of one time series. -func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { +func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 @@ -1581,7 +1581,7 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, no } if !ok || t > refTime { - t, v, ok = it.PeekBack(1) + t, v, ok = it.PeekPrev() if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) { return 0, 0, false } diff --git a/storage/buffer_test.go b/storage/buffer_test.go index a6fb2d2c1..b67af6de9 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -103,6 +103,12 @@ func TestBufferedSeriesIterator(t *testing.T) { require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ev, v, "value mismatch") } + prevSampleEq := func(ets int64, ev float64, eok bool) { + ts, v, ok := it.PeekBack(1) + require.Equal(t, eok, ok, "exist mismatch") + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } it = NewBufferIterator(NewListSeriesIterator(samples{ sample{t: 1, v: 2}, @@ -117,24 +123,29 @@ func TestBufferedSeriesIterator(t *testing.T) { require.True(t, it.Seek(-123), "seek failed") sampleEq(1, 2) + prevSampleEq(0, 0, false) bufferEq(nil) require.True(t, it.Next(), "next failed") sampleEq(2, 3) + prevSampleEq(1, 2, true) bufferEq([]sample{{t: 1, v: 2}}) require.True(t, it.Next(), "next failed") require.True(t, it.Next(), "next failed") require.True(t, it.Next(), "next failed") sampleEq(5, 6) + prevSampleEq(4, 5, true) bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) require.True(t, it.Seek(5), "seek failed") sampleEq(5, 6) + prevSampleEq(4, 5, true) bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}}) require.True(t, it.Seek(101), "seek failed") sampleEq(101, 10) + prevSampleEq(100, 9, true) bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}}) require.False(t, it.Next(), "next succeeded unexpectedly") diff --git a/storage/memoized_iterator.go b/storage/memoized_iterator.go new file mode 100644 index 000000000..5ed0f7065 --- /dev/null +++ b/storage/memoized_iterator.go @@ -0,0 +1,123 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "math" + + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// MemoizedSeriesIterator wraps an iterator with a buffer to look-back the previous element. +type MemoizedSeriesIterator struct { + it chunkenc.Iterator + delta int64 + + lastTime int64 + ok bool + + // Keep track of the previously returned value. + prevTime int64 + prevValue float64 +} + +// NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator. +func NewMemoizedEmptyIterator(delta int64) *MemoizedSeriesIterator { + return NewMemoizedIterator(chunkenc.NewNopIterator(), delta) +} + +// NewMemoizedIterator returns a new iterator that buffers the values within the +// time range of the current element and the duration of delta before. +func NewMemoizedIterator(it chunkenc.Iterator, delta int64) *MemoizedSeriesIterator { + bit := &MemoizedSeriesIterator{ + delta: delta, + prevTime: math.MinInt64, + } + bit.Reset(it) + + return bit +} + +// Reset the internal state to reuse the wrapper with the provided iterator. +func (b *MemoizedSeriesIterator) Reset(it chunkenc.Iterator) { + b.it = it + b.lastTime = math.MinInt64 + b.ok = true + b.prevTime = math.MinInt64 + it.Next() +} + +// PeekPrev returns the previous element of the iterator. If there is none buffered, +// ok is false. +func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, ok bool) { + if b.prevTime == math.MinInt64 { + return 0, 0, false + } + return b.prevTime, b.prevValue, true +} + +// Seek advances the iterator to the element at time t or greater. +func (b *MemoizedSeriesIterator) Seek(t int64) bool { + t0 := t - b.delta + + if t0 > b.lastTime { + // Reset the previously stored element because the seek advanced + // more than the delta. + b.prevTime = math.MinInt64 + + b.ok = b.it.Seek(t0) + if !b.ok { + return false + } + b.lastTime, _ = b.it.At() + } + + if b.lastTime >= t { + return true + } + for b.Next() { + if b.lastTime >= t { + return true + } + } + + return false +} + +// Next advances the iterator to the next element. +func (b *MemoizedSeriesIterator) Next() bool { + if !b.ok { + return false + } + + // Keep track of the previous element. + b.prevTime, b.prevValue = b.it.At() + + b.ok = b.it.Next() + if b.ok { + b.lastTime, _ = b.it.At() + } + + return b.ok +} + +// Values returns the current element of the iterator. +func (b *MemoizedSeriesIterator) Values() (int64, float64) { + return b.it.At() +} + +// Err returns the last encountered error. +func (b *MemoizedSeriesIterator) Err() error { + return b.it.Err() +} diff --git a/storage/memoized_iterator_test.go b/storage/memoized_iterator_test.go new file mode 100644 index 000000000..849256587 --- /dev/null +++ b/storage/memoized_iterator_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMemoizedSeriesIterator(t *testing.T) { + var it *MemoizedSeriesIterator + + sampleEq := func(ets int64, ev float64) { + ts, v := it.Values() + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + prevSampleEq := func(ets int64, ev float64, eok bool) { + ts, v, ok := it.PeekPrev() + require.Equal(t, eok, ok, "exist mismatch") + require.Equal(t, ets, ts, "timestamp mismatch") + require.Equal(t, ev, v, "value mismatch") + } + + it = NewMemoizedIterator(NewListSeriesIterator(samples{ + sample{t: 1, v: 2}, + sample{t: 2, v: 3}, + sample{t: 3, v: 4}, + sample{t: 4, v: 5}, + sample{t: 5, v: 6}, + sample{t: 99, v: 8}, + sample{t: 100, v: 9}, + sample{t: 101, v: 10}, + }), 2) + + require.True(t, it.Seek(-123), "seek failed") + sampleEq(1, 2) + prevSampleEq(0, 0, false) + + require.True(t, it.Next(), "next failed") + sampleEq(2, 3) + prevSampleEq(1, 2, true) + + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + require.True(t, it.Next(), "next failed") + sampleEq(5, 6) + prevSampleEq(4, 5, true) + + require.True(t, it.Seek(5), "seek failed") + sampleEq(5, 6) + prevSampleEq(4, 5, true) + + require.True(t, it.Seek(101), "seek failed") + sampleEq(101, 10) + prevSampleEq(100, 9, true) + + require.False(t, it.Next(), "next succeeded unexpectedly") +} + +func BenchmarkMemoizedSeriesIterator(b *testing.B) { + // Simulate a 5 minute rate. + it := NewMemoizedIterator(newFakeSeriesIterator(int64(b.N), 30), 5*60) + + b.SetBytes(int64(b.N * 16)) + b.ReportAllocs() + b.ResetTimer() + + for it.Next() { + // scan everything + } + require.NoError(b, it.Err()) +}