Optimized vector selector

Signed-off-by: Marco Pracucci <marco@pracucci.com>
pull/8585/head
Marco Pracucci 4 years ago
parent f690b811c5
commit b92c03023d
No known key found for this signature in database
GPG Key ID: 74C1BD403D2DF9B5

@ -1411,7 +1411,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws})
}
mat := make(Matrix, 0, len(e.Series))
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
for i, s := range e.Series {
it.Reset(s.Iterator())
ss := Series{
@ -1542,7 +1542,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
ev.error(errWithWarnings{errors.Wrap(err, "expanding series"), ws})
}
vec := make(Vector, 0, len(node.Series))
it := storage.NewBuffer(durationMilliseconds(ev.lookbackDelta))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
for i, s := range node.Series {
it.Reset(s.Iterator())
@ -1564,7 +1564,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
}
// vectorSelectorSingle evaluates a instant vector for the iterator of one time series.
func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) {
func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) (int64, float64, bool) {
refTime := ts - durationMilliseconds(node.Offset)
var t int64
var v float64
@ -1581,7 +1581,7 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.BufferedSeriesIterator, no
}
if !ok || t > refTime {
t, v, ok = it.PeekBack(1)
t, v, ok = it.PeekPrev()
if !ok || t < refTime-durationMilliseconds(ev.lookbackDelta) {
return 0, 0, false
}

@ -103,6 +103,12 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")
}
prevSampleEq := func(ets int64, ev float64, eok bool) {
ts, v, ok := it.PeekBack(1)
require.Equal(t, eok, ok, "exist mismatch")
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")
}
it = NewBufferIterator(NewListSeriesIterator(samples{
sample{t: 1, v: 2},
@ -117,24 +123,29 @@ func TestBufferedSeriesIterator(t *testing.T) {
require.True(t, it.Seek(-123), "seek failed")
sampleEq(1, 2)
prevSampleEq(0, 0, false)
bufferEq(nil)
require.True(t, it.Next(), "next failed")
sampleEq(2, 3)
prevSampleEq(1, 2, true)
bufferEq([]sample{{t: 1, v: 2}})
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
sampleEq(5, 6)
prevSampleEq(4, 5, true)
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
require.True(t, it.Seek(5), "seek failed")
sampleEq(5, 6)
prevSampleEq(4, 5, true)
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
require.True(t, it.Seek(101), "seek failed")
sampleEq(101, 10)
prevSampleEq(100, 9, true)
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}})
require.False(t, it.Next(), "next succeeded unexpectedly")

@ -0,0 +1,123 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"math"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
// MemoizedSeriesIterator wraps an iterator with a buffer to look-back the previous element.
type MemoizedSeriesIterator struct {
it chunkenc.Iterator
delta int64
lastTime int64
ok bool
// Keep track of the previously returned value.
prevTime int64
prevValue float64
}
// NewMemoizedEmptyIterator is like NewMemoizedIterator but it's initialised with an empty iterator.
func NewMemoizedEmptyIterator(delta int64) *MemoizedSeriesIterator {
return NewMemoizedIterator(chunkenc.NewNopIterator(), delta)
}
// NewMemoizedIterator returns a new iterator that buffers the values within the
// time range of the current element and the duration of delta before.
func NewMemoizedIterator(it chunkenc.Iterator, delta int64) *MemoizedSeriesIterator {
bit := &MemoizedSeriesIterator{
delta: delta,
prevTime: math.MinInt64,
}
bit.Reset(it)
return bit
}
// Reset the internal state to reuse the wrapper with the provided iterator.
func (b *MemoizedSeriesIterator) Reset(it chunkenc.Iterator) {
b.it = it
b.lastTime = math.MinInt64
b.ok = true
b.prevTime = math.MinInt64
it.Next()
}
// PeekPrev returns the previous element of the iterator. If there is none buffered,
// ok is false.
func (b *MemoizedSeriesIterator) PeekPrev() (t int64, v float64, ok bool) {
if b.prevTime == math.MinInt64 {
return 0, 0, false
}
return b.prevTime, b.prevValue, true
}
// Seek advances the iterator to the element at time t or greater.
func (b *MemoizedSeriesIterator) Seek(t int64) bool {
t0 := t - b.delta
if t0 > b.lastTime {
// Reset the previously stored element because the seek advanced
// more than the delta.
b.prevTime = math.MinInt64
b.ok = b.it.Seek(t0)
if !b.ok {
return false
}
b.lastTime, _ = b.it.At()
}
if b.lastTime >= t {
return true
}
for b.Next() {
if b.lastTime >= t {
return true
}
}
return false
}
// Next advances the iterator to the next element.
func (b *MemoizedSeriesIterator) Next() bool {
if !b.ok {
return false
}
// Keep track of the previous element.
b.prevTime, b.prevValue = b.it.At()
b.ok = b.it.Next()
if b.ok {
b.lastTime, _ = b.it.At()
}
return b.ok
}
// Values returns the current element of the iterator.
func (b *MemoizedSeriesIterator) Values() (int64, float64) {
return b.it.At()
}
// Err returns the last encountered error.
func (b *MemoizedSeriesIterator) Err() error {
return b.it.Err()
}

@ -0,0 +1,85 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMemoizedSeriesIterator(t *testing.T) {
var it *MemoizedSeriesIterator
sampleEq := func(ets int64, ev float64) {
ts, v := it.Values()
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")
}
prevSampleEq := func(ets int64, ev float64, eok bool) {
ts, v, ok := it.PeekPrev()
require.Equal(t, eok, ok, "exist mismatch")
require.Equal(t, ets, ts, "timestamp mismatch")
require.Equal(t, ev, v, "value mismatch")
}
it = NewMemoizedIterator(NewListSeriesIterator(samples{
sample{t: 1, v: 2},
sample{t: 2, v: 3},
sample{t: 3, v: 4},
sample{t: 4, v: 5},
sample{t: 5, v: 6},
sample{t: 99, v: 8},
sample{t: 100, v: 9},
sample{t: 101, v: 10},
}), 2)
require.True(t, it.Seek(-123), "seek failed")
sampleEq(1, 2)
prevSampleEq(0, 0, false)
require.True(t, it.Next(), "next failed")
sampleEq(2, 3)
prevSampleEq(1, 2, true)
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
require.True(t, it.Next(), "next failed")
sampleEq(5, 6)
prevSampleEq(4, 5, true)
require.True(t, it.Seek(5), "seek failed")
sampleEq(5, 6)
prevSampleEq(4, 5, true)
require.True(t, it.Seek(101), "seek failed")
sampleEq(101, 10)
prevSampleEq(100, 9, true)
require.False(t, it.Next(), "next succeeded unexpectedly")
}
func BenchmarkMemoizedSeriesIterator(b *testing.B) {
// Simulate a 5 minute rate.
it := NewMemoizedIterator(newFakeSeriesIterator(int64(b.N), 30), 5*60)
b.SetBytes(int64(b.N * 16))
b.ReportAllocs()
b.ResetTimer()
for it.Next() {
// scan everything
}
require.NoError(b, it.Err())
}
Loading…
Cancel
Save