mirror of https://github.com/prometheus/prometheus
Test and fixes for buffered iterator
parent
0a94f58f1a
commit
869cccf080
28
querier.go
28
querier.go
|
@ -674,27 +674,25 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool {
|
|||
|
||||
// If the delta would cause us to seek backwards, preserve the buffer
|
||||
// and just continue regular advancment while filling the buffer on the way.
|
||||
if t0 <= b.lastTime {
|
||||
for b.Next() {
|
||||
if tcur, _ := b.it.Values(); tcur >= t {
|
||||
return true
|
||||
}
|
||||
if t0 > b.lastTime {
|
||||
b.buf.reset()
|
||||
|
||||
ok := b.it.Seek(t0)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
b.lastTime, _ = b.Values()
|
||||
}
|
||||
|
||||
b.buf.reset()
|
||||
|
||||
ok := b.it.Seek(t0)
|
||||
if !ok {
|
||||
return false
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
|
||||
for b.Next() {
|
||||
if ts, _ := b.Values(); ts >= t {
|
||||
if b.lastTime >= t {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -704,7 +702,9 @@ func (b *BufferedSeriesIterator) Next() bool {
|
|||
b.buf.add(b.it.Values())
|
||||
|
||||
ok := b.it.Next()
|
||||
b.lastTime, _ = b.Values()
|
||||
if ok {
|
||||
b.lastTime, _ = b.Values()
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
|
|
|
@ -49,11 +49,15 @@ func (it *listSeriesIterator) Next() bool {
|
|||
}
|
||||
|
||||
func (it *listSeriesIterator) Seek(t int64) bool {
|
||||
if it.idx == -1 {
|
||||
it.idx = 0
|
||||
}
|
||||
// Do binary search between current position and end.
|
||||
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
|
||||
s := it.list[i+it.idx]
|
||||
return s.t >= t
|
||||
})
|
||||
|
||||
return it.idx < len(it.list)
|
||||
}
|
||||
|
||||
|
@ -283,3 +287,57 @@ func TestSampleRing(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBufferedSeriesIterator(t *testing.T) {
|
||||
var it *BufferedSeriesIterator
|
||||
|
||||
bufferEq := func(exp []sample) {
|
||||
var b []sample
|
||||
bit := it.Buffer()
|
||||
for bit.Next() {
|
||||
t, v := bit.Values()
|
||||
b = append(b, sample{t: t, v: v})
|
||||
}
|
||||
require.Equal(t, exp, b, "buffer mismatch")
|
||||
}
|
||||
sampleEq := func(ets int64, ev float64) {
|
||||
ts, v := it.Values()
|
||||
require.Equal(t, ets, ts, "timestamp mismatch")
|
||||
require.Equal(t, ev, v, "value mismatch")
|
||||
}
|
||||
|
||||
it = NewBuffer(newListSeriesIterator([]sample{
|
||||
{t: 1, v: 2},
|
||||
{t: 2, v: 3},
|
||||
{t: 3, v: 4},
|
||||
{t: 4, v: 5},
|
||||
{t: 5, v: 6},
|
||||
{t: 99, v: 8},
|
||||
{t: 100, v: 9},
|
||||
{t: 101, v: 10},
|
||||
}), 2)
|
||||
|
||||
require.True(t, it.Seek(-123), "seek failed")
|
||||
sampleEq(1, 2)
|
||||
bufferEq(nil)
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(2, 3)
|
||||
bufferEq([]sample{{t: 1, v: 2}})
|
||||
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
require.True(t, it.Next(), "next failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(5), "seek failed")
|
||||
sampleEq(5, 6)
|
||||
bufferEq([]sample{{t: 2, v: 3}, {t: 3, v: 4}, {t: 4, v: 5}})
|
||||
|
||||
require.True(t, it.Seek(101), "seek failed")
|
||||
sampleEq(101, 10)
|
||||
bufferEq([]sample{{t: 99, v: 8}, {t: 100, v: 9}})
|
||||
|
||||
require.False(t, it.Next(), "next succeeded unexpectedly")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue