mirror of https://github.com/prometheus/prometheus
Fix error where we look into the future. (#2829)
* Fix error where we look into the future. So currently we are adding values that are in the future for an older timestamp. For example, if we have [(1, 1), (150, 2)] we will end up showing [(1, 1), (2,2)]. Further it is not advisable to call .At() after Next() returns false. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Retuen early if done Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Handle Seek() where we reach the end of iterator Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify code Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>pull/2777/merge
parent
669075c6b9
commit
baf5b0f0fc
|
@ -253,13 +253,23 @@ load 10s
|
||||||
{
|
{
|
||||||
Query: "metric",
|
Query: "metric",
|
||||||
Result: Matrix{Series{
|
Result: Matrix{Series{
|
||||||
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 2, T: 2000}},
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}},
|
||||||
Metric: labels.FromStrings("__name__", "metric")},
|
Metric: labels.FromStrings("__name__", "metric")},
|
||||||
},
|
},
|
||||||
Start: time.Unix(0, 0),
|
Start: time.Unix(0, 0),
|
||||||
End: time.Unix(2, 0),
|
End: time.Unix(2, 0),
|
||||||
Interval: time.Second,
|
Interval: time.Second,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Query: "metric",
|
||||||
|
Result: Matrix{Series{
|
||||||
|
Points: []Point{{V: 1, T: 0}, {V: 1, T: 5000}, {V: 2, T: 10000}},
|
||||||
|
Metric: labels.FromStrings("__name__", "metric")},
|
||||||
|
},
|
||||||
|
Start: time.Unix(0, 0),
|
||||||
|
End: time.Unix(10, 0),
|
||||||
|
Interval: 5 * time.Second,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
|
|
|
@ -13,7 +13,9 @@
|
||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import "math"
|
import (
|
||||||
|
"math"
|
||||||
|
)
|
||||||
|
|
||||||
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
|
||||||
type BufferedSeriesIterator struct {
|
type BufferedSeriesIterator struct {
|
||||||
|
@ -21,6 +23,7 @@ type BufferedSeriesIterator struct {
|
||||||
buf *sampleRing
|
buf *sampleRing
|
||||||
|
|
||||||
lastTime int64
|
lastTime int64
|
||||||
|
ok bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBuffer returns a new iterator that buffers the values within the time range
|
// NewBuffer returns a new iterator that buffers the values within the time range
|
||||||
|
@ -30,6 +33,7 @@ func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
|
||||||
it: it,
|
it: it,
|
||||||
buf: newSampleRing(delta, 16),
|
buf: newSampleRing(delta, 16),
|
||||||
lastTime: math.MinInt64,
|
lastTime: math.MinInt64,
|
||||||
|
ok: true,
|
||||||
}
|
}
|
||||||
it.Next()
|
it.Next()
|
||||||
|
|
||||||
|
@ -56,8 +60,8 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
|
||||||
if t0 > b.lastTime {
|
if t0 > b.lastTime {
|
||||||
b.buf.reset()
|
b.buf.reset()
|
||||||
|
|
||||||
ok := b.it.Seek(t0)
|
b.ok = b.it.Seek(t0)
|
||||||
if !ok {
|
if !b.ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
b.lastTime, _ = b.Values()
|
b.lastTime, _ = b.Values()
|
||||||
|
@ -77,14 +81,19 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
|
||||||
|
|
||||||
// Next advances the iterator to the next element.
|
// Next advances the iterator to the next element.
|
||||||
func (b *BufferedSeriesIterator) Next() bool {
|
func (b *BufferedSeriesIterator) Next() bool {
|
||||||
|
if !b.ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Add current element to buffer before advancing.
|
// Add current element to buffer before advancing.
|
||||||
b.buf.add(b.it.At())
|
b.buf.add(b.it.At())
|
||||||
|
|
||||||
ok := b.it.Next()
|
b.ok = b.it.Next()
|
||||||
if ok {
|
if b.ok {
|
||||||
b.lastTime, _ = b.Values()
|
b.lastTime, _ = b.Values()
|
||||||
}
|
}
|
||||||
return ok
|
|
||||||
|
return b.ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Values returns the current element of the iterator.
|
// Values returns the current element of the iterator.
|
||||||
|
|
|
@ -137,6 +137,26 @@ func TestBufferedSeriesIterator(t *testing.T) {
|
||||||
require.False(t, it.Next(), "next succeeded unexpectedly")
|
require.False(t, it.Next(), "next succeeded unexpectedly")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// At() should not be called once Next() returns false.
|
||||||
|
func TestBufferedSeriesIteratorNoBadAt(t *testing.T) {
|
||||||
|
done := false
|
||||||
|
|
||||||
|
m := &mockSeriesIterator{
|
||||||
|
seek: func(int64) bool { return false },
|
||||||
|
at: func() (int64, float64) {
|
||||||
|
require.False(t, done)
|
||||||
|
done = true
|
||||||
|
return 0, 0
|
||||||
|
},
|
||||||
|
next: func() bool { return !done },
|
||||||
|
err: func() error { return nil },
|
||||||
|
}
|
||||||
|
|
||||||
|
it := NewBuffer(m, 60)
|
||||||
|
it.Next()
|
||||||
|
it.Next()
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkBufferedSeriesIterator(b *testing.B) {
|
func BenchmarkBufferedSeriesIterator(b *testing.B) {
|
||||||
var (
|
var (
|
||||||
samples []sample
|
samples []sample
|
||||||
|
@ -166,13 +186,13 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) {
|
||||||
|
|
||||||
type mockSeriesIterator struct {
|
type mockSeriesIterator struct {
|
||||||
seek func(int64) bool
|
seek func(int64) bool
|
||||||
values func() (int64, float64)
|
at func() (int64, float64)
|
||||||
next func() bool
|
next func() bool
|
||||||
err func() error
|
err func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) }
|
func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) }
|
||||||
func (m *mockSeriesIterator) Values() (int64, float64) { return m.values() }
|
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
|
||||||
func (m *mockSeriesIterator) Next() bool { return m.next() }
|
func (m *mockSeriesIterator) Next() bool { return m.next() }
|
||||||
func (m *mockSeriesIterator) Err() error { return m.err() }
|
func (m *mockSeriesIterator) Err() error { return m.err() }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue