diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 3f7b63a60..22aaf7f20 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -849,15 +849,15 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, if h.empty() { return nil, nil } - h.Init() + heap.Init(&h) for !h.empty() { - if !p.Seek(h.At()) { + if !p.Seek(h.at()) { return indexes, p.Err() } - if p.At() == h.At() { + if p.At() == h.at() { indexes = append(indexes, h.popIndex()) - } else if err := h.Next(); err != nil { + } else if err := h.next(); err != nil { return nil, err } } @@ -865,14 +865,20 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int, return indexes, nil } +// postingsWithIndex is used as postingsWithIndexHeap elements by FindIntersectingPostings, +// keeping track of the original index of each postings while they move inside the heap. type postingsWithIndex struct { index int p Postings - // popped means that this postings shouldn't be considered anymore. + // popped means that these postings shouldn't be considered anymore. // See popIndex() comment to understand why we need this. popped bool } +// postingsWithIndexHeap implements heap.Interface, +// with root always pointing to the postings with minimum Postings.At() value. +// It also implements a special way of removing elements that marks them as popped and moves them to the bottom of the +// heap instead of actually removing them, see popIndex() for more details. type postingsWithIndexHeap []postingsWithIndex // empty checks whether the heap is empty, which is true if it has no elements, of if the smallest element is popped. @@ -891,9 +897,14 @@ func (h *postingsWithIndexHeap) popIndex() int { return index } -func (h *postingsWithIndexHeap) Init() { heap.Init(h) } -func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() } -func (h *postingsWithIndexHeap) Next() error { +// at provides the storage.SeriesRef where root Postings is pointing at this moment. +func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() } + +// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap +// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false. +// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value. +// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed. +func (h *postingsWithIndexHeap) next() error { pi := (*h)[0] next := pi.p.Next() if next { @@ -908,15 +919,24 @@ func (h *postingsWithIndexHeap) Next() error { return nil } +// Len implements heap.Interface. +// Notice that Len() > 0 does not imply that heap is not empty as elements are not removed from this heap. +// Use empty() to check whether heap is empty or not. func (h postingsWithIndexHeap) Len() int { return len(h) } + +// Less implements heap.Interface, it puts all the popped elements at the bottom, +// and then sorts by Postings.At() property of each node. func (h postingsWithIndexHeap) Less(i, j int) bool { if h[i].popped != h[j].popped { return h[j].popped } return h[i].p.At() < h[j].p.At() } + +// Swap implements heap.Interface. func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } +// Push implements heap.Interface. func (h *postingsWithIndexHeap) Push(x interface{}) { *h = append(*h, x.(postingsWithIndex)) } diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 888725f8d..75e1677b2 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1041,11 +1041,11 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, node := range h { node.p.Next() } - h.Init() + heap.Init(&h) for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} { - require.Equal(t, expected, h.At()) - require.NoError(t, h.Next()) + require.Equal(t, expected, h.at()) + require.NoError(t, h.next()) } require.True(t, h.empty()) }) @@ -1059,13 +1059,13 @@ func TestPostingsWithIndexHeap(t *testing.T) { for _, node := range h { node.p.Next() } - h.Init() + heap.Init(&h) for _, expected := range []storage.SeriesRef{1, 5, 10, 20} { - require.Equal(t, expected, h.At()) - require.NoError(t, h.Next()) + require.Equal(t, expected, h.at()) + require.NoError(t, h.next()) } - require.Equal(t, storage.SeriesRef(25), h.At()) + require.Equal(t, storage.SeriesRef(25), h.at()) node := heap.Pop(&h).(postingsWithIndex) require.Equal(t, 2, node.index) require.Equal(t, storage.SeriesRef(25), node.p.At())