Browse Source

index: reduce empty postings trees (#509)

Improved Merge when all is empty.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
pull/5805/head
Bartek Płotka 6 years ago committed by Krasi Georgiev
parent
commit
3ab5f4e579
  1. 6
      index/index_test.go
  2. 49
      index/postings.go
  3. 365
      index/postings_test.go
  4. 1
      querier.go

6
index/index_test.go

@ -212,7 +212,7 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.AddSeries(3, series[2]))
testutil.Ok(t, iw.AddSeries(4, series[3]))
err = iw.WritePostings("a", "1", newListPostings([]uint64{1, 2, 3, 4}))
err = iw.WritePostings("a", "1", newListPostings(1, 2, 3, 4))
testutil.Ok(t, err)
testutil.Ok(t, iw.Close())
@ -323,9 +323,9 @@ func TestPersistence_index_e2e(t *testing.T) {
for i := range all {
all[i] = uint64(i)
}
err = iw.WritePostings("", "", newListPostings(all))
err = iw.WritePostings("", "", newListPostings(all...))
testutil.Ok(t, err)
mi.WritePostings("", "", newListPostings(all))
mi.WritePostings("", "", newListPostings(all...))
for n, e := range postings.m {
for v := range e {

49
index/postings.go

@ -92,7 +92,7 @@ func (p *MemPostings) Get(name, value string) Postings {
if lp == nil {
return EmptyPostings()
}
return newListPostings(lp)
return newListPostings(lp...)
}
// All returns a postings list over all documents ever added.
@ -202,7 +202,7 @@ func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
for n, e := range p.m {
for v, p := range e {
if err := f(labels.Label{Name: n, Value: v}, newListPostings(p)); err != nil {
if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil {
return err
}
}
@ -283,6 +283,8 @@ func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{}
// EmptyPostings returns a postings list that's always empty.
// NOTE: Returning EmptyPostings sentinel when index.Postings struct has no postings is recommended.
// It triggers optimized flow in other functions like Intersect, Without etc.
func EmptyPostings() Postings {
return emptyPostings
}
@ -296,13 +298,20 @@ func ErrPostings(err error) Postings {
// input postings.
func Intersect(its ...Postings) Postings {
if len(its) == 0 {
return emptyPostings
return EmptyPostings()
}
if len(its) == 1 {
return its[0]
}
l := len(its) / 2
return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...))
a := Intersect(its[:l]...)
b := Intersect(its[l:]...)
if a == EmptyPostings() || b == EmptyPostings() {
return EmptyPostings()
}
return newIntersectPostings(a, b)
}
type intersectPostings struct {
@ -366,7 +375,12 @@ func Merge(its ...Postings) Postings {
if len(its) == 1 {
return its[0]
}
return newMergedPostings(its)
p, ok := newMergedPostings(its)
if !ok {
return EmptyPostings()
}
return p
}
type postingsHeap []Postings
@ -395,18 +409,24 @@ type mergedPostings struct {
err error
}
func newMergedPostings(p []Postings) *mergedPostings {
func newMergedPostings(p []Postings) (m *mergedPostings, nonEmpty bool) {
ph := make(postingsHeap, 0, len(p))
for _, it := range p {
// NOTE: mergedPostings struct requires the user to issue an initial Next.
if it.Next() {
ph = append(ph, it)
} else {
if it.Err() != nil {
return &mergedPostings{err: it.Err()}
return &mergedPostings{err: it.Err()}, true
}
}
}
return &mergedPostings{h: ph}
if len(ph) == 0 {
return nil, false
}
return &mergedPostings{h: ph}, true
}
func (it *mergedPostings) Next() bool {
@ -495,8 +515,15 @@ func (it mergedPostings) Err() error {
}
// Without returns a new postings list that contains all elements from the full list that
// are not in the drop list
// are not in the drop list.
func Without(full, drop Postings) Postings {
if full == EmptyPostings() {
return EmptyPostings()
}
if drop == EmptyPostings() {
return full
}
return newRemovedPostings(full, drop)
}
@ -580,10 +607,10 @@ type listPostings struct {
}
func NewListPostings(list []uint64) Postings {
return newListPostings(list)
return newListPostings(list...)
}
func newListPostings(list []uint64) *listPostings {
func newListPostings(list ...uint64) *listPostings {
return &listPostings{list: list}
}

365
index/postings_test.go

@ -62,38 +62,120 @@ func TestMemPostings_ensureOrder(t *testing.T) {
}
func TestIntersect(t *testing.T) {
a := newListPostings(1, 2, 3)
b := newListPostings(2, 3, 4)
var cases = []struct {
a, b []uint64
res []uint64
in []Postings
res Postings
}{
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{6, 7, 8, 9, 10},
res: nil,
in: []Postings{},
res: EmptyPostings(),
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{4, 5, 6, 7, 8},
res: []uint64{4, 5},
in: []Postings{a, b, EmptyPostings()},
res: EmptyPostings(),
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint64{1, 4, 10},
}, {
a: []uint64{1},
b: []uint64{0, 1},
res: []uint64{1},
in: []Postings{b, a, EmptyPostings()},
res: EmptyPostings(),
},
{
in: []Postings{EmptyPostings(), b, a},
res: EmptyPostings(),
},
{
in: []Postings{EmptyPostings(), a, b},
res: EmptyPostings(),
},
{
in: []Postings{a, EmptyPostings(), b},
res: EmptyPostings(),
},
{
in: []Postings{b, EmptyPostings(), a},
res: EmptyPostings(),
},
{
in: []Postings{b, EmptyPostings(), a, a, b, a, a, a},
res: EmptyPostings(),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 5),
newListPostings(6, 7, 8, 9, 10),
},
res: newListPostings(),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 5),
newListPostings(4, 5, 6, 7, 8),
},
res: newListPostings(4, 5),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 9, 10),
newListPostings(1, 4, 5, 6, 7, 8, 10, 11),
},
res: newListPostings(1, 4, 10),
},
{
in: []Postings{
newListPostings(1),
newListPostings(0, 1),
},
res: newListPostings(1),
},
{
in: []Postings{
newListPostings(1),
},
res: newListPostings(1),
},
{
in: []Postings{
newListPostings(1),
newListPostings(),
},
res: newListPostings(),
},
{
in: []Postings{
newListPostings(),
newListPostings(),
},
res: newListPostings(),
},
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
t.Run("", func(t *testing.T) {
if c.res == nil {
t.Fatal("intersect result expectancy cannot be nil")
}
res, err := ExpandPostings(Intersect(a, b))
testutil.Ok(t, err)
testutil.Equals(t, c.res, res)
expected, err := ExpandPostings(c.res)
testutil.Ok(t, err)
i := Intersect(c.in...)
if c.res == EmptyPostings() {
testutil.Equals(t, EmptyPostings(), i)
return
}
if i == EmptyPostings() {
t.Fatal("intersect unexpected result: EmptyPostings sentinel")
}
res, err := ExpandPostings(i)
testutil.Ok(t, err)
testutil.Equals(t, expected, res)
})
}
}
@ -128,7 +210,7 @@ func TestMultiIntersect(t *testing.T) {
for _, c := range cases {
ps := make([]Postings, 0, len(c.p))
for _, postings := range c.p {
ps = append(ps, newListPostings(postings))
ps = append(ps, newListPostings(postings...))
}
res, err := ExpandPostings(Intersect(ps...))
@ -157,10 +239,10 @@ func BenchmarkIntersect(t *testing.B) {
d = append(d, uint64(i))
}
i1 := newListPostings(a)
i2 := newListPostings(b)
i3 := newListPostings(c)
i4 := newListPostings(d)
i1 := newListPostings(a...)
i2 := newListPostings(b...)
i3 := newListPostings(c...)
i4 := newListPostings(d...)
t.ResetTimer()
@ -172,60 +254,117 @@ func BenchmarkIntersect(t *testing.B) {
}
func TestMultiMerge(t *testing.T) {
var cases = []struct {
a, b, c []uint64
res []uint64
}{
{
a: []uint64{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []uint64{2, 4, 5, 6, 7, 8, 999, 1001},
c: []uint64{1, 2, 5, 6, 7, 8, 1001, 1200},
res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200},
},
}
i1 := newListPostings(1, 2, 3, 4, 5, 6, 1000, 1001)
i2 := newListPostings(2, 4, 5, 6, 7, 8, 999, 1001)
i3 := newListPostings(1, 2, 5, 6, 7, 8, 1001, 1200)
for _, c := range cases {
i1 := newListPostings(c.a)
i2 := newListPostings(c.b)
i3 := newListPostings(c.c)
res, err := ExpandPostings(Merge(i1, i2, i3))
testutil.Ok(t, err)
testutil.Equals(t, c.res, res)
}
res, err := ExpandPostings(Merge(i1, i2, i3))
testutil.Ok(t, err)
testutil.Equals(t, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, res)
}
func TestMergedPostings(t *testing.T) {
var cases = []struct {
a, b []uint64
res []uint64
in []Postings
res Postings
}{
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{6, 7, 8, 9, 10},
res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
in: []Postings{},
res: EmptyPostings(),
},
{
a: []uint64{1, 2, 3, 4, 5},
b: []uint64{4, 5, 6, 7, 8},
res: []uint64{1, 2, 3, 4, 5, 6, 7, 8},
in: []Postings{
newListPostings(),
newListPostings(),
},
res: EmptyPostings(),
},
{
a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
in: []Postings{
newListPostings(),
},
res: newListPostings(),
},
{
in: []Postings{
EmptyPostings(),
EmptyPostings(),
EmptyPostings(),
EmptyPostings(),
},
res: EmptyPostings(),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 5),
newListPostings(6, 7, 8, 9, 10),
},
res: newListPostings(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 5),
newListPostings(4, 5, 6, 7, 8),
},
res: newListPostings(1, 2, 3, 4, 5, 6, 7, 8),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 9, 10),
newListPostings(1, 4, 5, 6, 7, 8, 10, 11),
},
res: newListPostings(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
in: []Postings{
newListPostings(1, 2, 3, 4, 9, 10),
EmptyPostings(),
newListPostings(1, 4, 5, 6, 7, 8, 10, 11),
},
res: newListPostings(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
},
{
in: []Postings{
newListPostings(1, 2),
newListPostings(),
},
res: newListPostings(1, 2),
},
{
in: []Postings{
newListPostings(1, 2),
EmptyPostings(),
},
res: newListPostings(1, 2),
},
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
t.Run("", func(t *testing.T) {
if c.res == nil {
t.Fatal("merge result expectancy cannot be nil")
}
res, err := ExpandPostings(Merge(a, b))
testutil.Ok(t, err)
testutil.Equals(t, c.res, res)
}
expected, err := ExpandPostings(c.res)
testutil.Ok(t, err)
m := Merge(c.in...)
if c.res == EmptyPostings() {
testutil.Equals(t, EmptyPostings(), m)
return
}
if m == EmptyPostings() {
t.Fatal("merge unexpected result: EmptyPostings sentinel")
}
res, err := ExpandPostings(m)
testutil.Ok(t, err)
testutil.Equals(t, expected, res)
})
}
}
func TestMergedPostingsSeek(t *testing.T) {
@ -271,8 +410,8 @@ func TestMergedPostingsSeek(t *testing.T) {
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
a := newListPostings(c.a...)
b := newListPostings(c.b...)
p := Merge(a, b)
@ -333,8 +472,8 @@ func TestRemovedPostings(t *testing.T) {
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
a := newListPostings(c.a...)
b := newListPostings(c.b...)
res, err := ExpandPostings(newRemovedPostings(a, b))
testutil.Ok(t, err)
@ -353,8 +492,8 @@ func TestRemovedNextStackoverflow(t *testing.T) {
remove = append(remove, i)
}
flp := newListPostings(full)
rlp := newListPostings(remove)
flp := newListPostings(full...)
rlp := newListPostings(remove...)
rp := newRemovedPostings(flp, rlp)
gotElem := false
for rp.Next() {
@ -432,8 +571,8 @@ func TestRemovedPostingsSeek(t *testing.T) {
}
for _, c := range cases {
a := newListPostings(c.a)
b := newListPostings(c.b)
a := newListPostings(c.a...)
b := newListPostings(c.b...)
p := newRemovedPostings(a, b)
@ -526,13 +665,13 @@ func TestBigEndian(t *testing.T) {
}
func TestIntersectWithMerge(t *testing.T) {
// One of the reproduceable cases for:
// One of the reproducible cases for:
// https://github.com/prometheus/prometheus/issues/2616
a := newListPostings([]uint64{21, 22, 23, 24, 25, 30})
a := newListPostings(21, 22, 23, 24, 25, 30)
b := Merge(
newListPostings([]uint64{10, 20, 30}),
newListPostings([]uint64{15, 26, 30}),
newListPostings(10, 20, 30),
newListPostings(15, 26, 30),
)
p := Intersect(a, b)
@ -541,3 +680,81 @@ func TestIntersectWithMerge(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, []uint64{30}, res)
}
func TestWithoutPostings(t *testing.T) {
var cases = []struct {
base Postings
drop Postings
res Postings
}{
{
base: EmptyPostings(),
drop: EmptyPostings(),
res: EmptyPostings(),
},
{
base: EmptyPostings(),
drop: newListPostings(1, 2),
res: EmptyPostings(),
},
{
base: newListPostings(1, 2),
drop: EmptyPostings(),
res: newListPostings(1, 2),
},
{
base: newListPostings(),
drop: newListPostings(),
res: newListPostings(),
},
{
base: newListPostings(1, 2, 3),
drop: newListPostings(),
res: newListPostings(1, 2, 3),
},
{
base: newListPostings(1, 2, 3),
drop: newListPostings(4, 5, 6),
res: newListPostings(1, 2, 3),
},
{
base: newListPostings(1, 2, 3),
drop: newListPostings(3, 4, 5),
res: newListPostings(1, 2),
},
}
for _, c := range cases {
t.Run("", func(t *testing.T) {
if c.res == nil {
t.Fatal("without result expectancy cannot be nil")
}
expected, err := ExpandPostings(c.res)
testutil.Ok(t, err)
w := Without(c.base, c.drop)
if c.res == EmptyPostings() {
testutil.Equals(t, EmptyPostings(), w)
return
}
if w == EmptyPostings() {
t.Fatal("without unexpected result: EmptyPostings sentinel")
}
res, err := ExpandPostings(w)
testutil.Ok(t, err)
testutil.Equals(t, expected, res)
})
}
}

1
querier.go

@ -264,6 +264,7 @@ func (q *blockQuerier) Close() error {
// PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. It returns a list of label names that must be manually
// checked to not exist in series the postings list points to.
// It returns EmptyPostings() if it can be determined beforehand that no results will be found.
func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, error) {
var its []index.Postings

Loading…
Cancel
Save