diff --git a/head.go b/head.go index b15ba3461..7d19011dd 100644 --- a/head.go +++ b/head.go @@ -112,7 +112,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { h := &headBlock{ dir: dir, wal: wal, - series: []*memSeries{}, + series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. hashes: map[uint64][]*memSeries{}, values: map[string]stringset{}, postings: &memPostings{m: make(map[term][]uint32)}, diff --git a/postings.go b/postings.go index 950cd2e3c..f2f452ac1 100644 --- a/postings.go +++ b/postings.go @@ -207,19 +207,15 @@ func (it *mergedPostings) Next() bool { } func (it *mergedPostings) Seek(id uint32) bool { + if it.cur >= id { + return true + } + it.aok = it.a.Seek(id) it.bok = it.b.Seek(id) it.initialized = true - acur, bcur := it.a.At(), it.b.At() - if acur < bcur { - it.cur = acur - } else if acur > bcur { - it.cur = bcur - } else { - it.cur = acur - it.bok = it.b.Next() - } - return it.aok && it.bok + + return it.Next() } func (it *mergedPostings) Err() error { @@ -249,10 +245,16 @@ func (it *listPostings) Next() bool { it.list = it.list[1:] return true } + it.cur = 0 return false } func (it *listPostings) Seek(x uint32) bool { + // If the current value satisfies, then return. + if it.cur >= x { + return true + } + // Do binary search between current position and end. i := sort.Search(len(it.list), func(i int) bool { return it.list[i] >= x @@ -295,6 +297,10 @@ func (it *bigEndianPostings) Next() bool { } func (it *bigEndianPostings) Seek(x uint32) bool { + if it.cur >= x { + return true + } + num := len(it.list) / 4 // Do binary search between current position and end. i := sort.Search(num, func(i int) bool { diff --git a/postings_test.go b/postings_test.go index 61db9ad44..efffe3d98 100644 --- a/postings_test.go +++ b/postings_test.go @@ -78,23 +78,39 @@ func TestIntersect(t *testing.T) { func TestMultiIntersect(t *testing.T) { var cases = []struct { - a, b, c []uint32 - res []uint32 + p [][]uint32 + res []uint32 }{ { - a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001}, - b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001}, - c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200}, + p: [][]uint32{ + {1, 2, 3, 4, 5, 6, 1000, 1001}, + {2, 4, 5, 6, 7, 8, 999, 1001}, + {1, 2, 5, 6, 7, 8, 1001, 1200}, + }, res: []uint32{2, 5, 6, 1001}, }, + // One of the reproduceable cases for: + // https://github.com/prometheus/prometheus/issues/2616 + // The initialisation of intersectPostings was moving the iterator forward + // prematurely making us miss some postings. + { + p: [][]uint32{ + {1, 2}, + {1, 2}, + {1, 2}, + {2}, + }, + res: []uint32{2}, + }, } for _, c := range cases { - pa := newListPostings(c.a) - pb := newListPostings(c.b) - pc := newListPostings(c.c) + ps := make([]Postings, 0, len(c.p)) + for _, postings := range c.p { + ps = append(ps, newListPostings(postings)) + } - res, err := expandPostings(Intersect(pa, pb, pc)) + res, err := expandPostings(Intersect(ps...)) require.NoError(t, err) require.Equal(t, c.res, res) @@ -200,12 +216,12 @@ func TestMergedPostingsSeek(t *testing.T) { res []uint32 }{ { - a: []uint32{1, 2, 3, 4, 5}, + a: []uint32{2, 3, 4, 5}, b: []uint32{6, 7, 8, 9, 10}, - seek: 0, + seek: 1, success: true, - res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, }, { a: []uint32{1, 2, 3, 4, 5}, @@ -240,9 +256,16 @@ func TestMergedPostingsSeek(t *testing.T) { p := newMergedPostings(a, b) require.Equal(t, c.success, p.Seek(c.seek)) - lst, err := expandPostings(p) - require.NoError(t, err) - require.Equal(t, c.res, lst) + + // After Seek(), At() should be called. + if c.success { + start := p.At() + lst, err := expandPostings(p) + require.NoError(t, err) + + lst = append([]uint32{start}, lst...) + require.Equal(t, c.res, lst) + } } return @@ -293,16 +316,16 @@ func TestBigEndian(t *testing.T) { ls[600] + 1, ls[601], true, }, { - ls[600] + 1, ls[602], true, + ls[600] + 1, ls[601], true, }, { - ls[600] + 1, ls[603], true, + ls[600] + 1, ls[601], true, }, { - ls[0], ls[604], true, + ls[0], ls[601], true, }, { - ls[600], ls[605], true, + ls[600], ls[601], true, }, { ls[999], ls[999], true, @@ -321,3 +344,20 @@ func TestBigEndian(t *testing.T) { } }) } + +func TestIntersectWithMerge(t *testing.T) { + // One of the reproduceable cases for: + // https://github.com/prometheus/prometheus/issues/2616 + a := newListPostings([]uint32{21, 22, 23, 24, 25, 30}) + + b := newMergedPostings( + newListPostings([]uint32{10, 20, 30}), + newListPostings([]uint32{15, 26, 30}), + ) + + p := Intersect(a, b) + res, err := expandPostings(p) + + require.NoError(t, err) + require.Equal(t, []uint32{30}, res) +}