Browse Source

Tidy postingsWithIndexHeap (#10123)

Unexported postingsWithIndexHeap's methods that don't need to be
exported, and added detailed comments.

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
pull/10128/head
Oleg Zaytsev 3 years ago committed by GitHub
parent
commit
a83d46ee9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      tsdb/index/postings.go
  2. 14
      tsdb/index/postings_test.go

36
tsdb/index/postings.go

@ -849,15 +849,15 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
if h.empty() {
return nil, nil
}
h.Init()
heap.Init(&h)
for !h.empty() {
if !p.Seek(h.At()) {
if !p.Seek(h.at()) {
return indexes, p.Err()
}
if p.At() == h.At() {
if p.At() == h.at() {
indexes = append(indexes, h.popIndex())
} else if err := h.Next(); err != nil {
} else if err := h.next(); err != nil {
return nil, err
}
}
@ -865,14 +865,20 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
return indexes, nil
}
// postingsWithIndex is used as postingsWithIndexHeap elements by FindIntersectingPostings,
// keeping track of the original index of each postings while they move inside the heap.
type postingsWithIndex struct {
index int
p Postings
// popped means that this postings shouldn't be considered anymore.
// popped means that these postings shouldn't be considered anymore.
// See popIndex() comment to understand why we need this.
popped bool
}
// postingsWithIndexHeap implements heap.Interface,
// with root always pointing to the postings with minimum Postings.At() value.
// It also implements a special way of removing elements that marks them as popped and moves them to the bottom of the
// heap instead of actually removing them, see popIndex() for more details.
type postingsWithIndexHeap []postingsWithIndex
// empty checks whether the heap is empty, which is true if it has no elements, of if the smallest element is popped.
@ -891,9 +897,14 @@ func (h *postingsWithIndexHeap) popIndex() int {
return index
}
func (h *postingsWithIndexHeap) Init() { heap.Init(h) }
func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() }
func (h *postingsWithIndexHeap) Next() error {
// at provides the storage.SeriesRef where root Postings is pointing at this moment.
func (h postingsWithIndexHeap) at() storage.SeriesRef { return h[0].p.At() }
// next performs the Postings.Next() operation on the root of the heap, performing the related operation on the heap
// and conveniently returning the result of calling Postings.Err() if the result of calling Next() was false.
// If Next() succeeds, heap is fixed to move the root to its new position, according to its Postings.At() value.
// If Next() returns fails and there's no error reported by Postings.Err(), then root is marked as removed and heap is fixed.
func (h *postingsWithIndexHeap) next() error {
pi := (*h)[0]
next := pi.p.Next()
if next {
@ -908,15 +919,24 @@ func (h *postingsWithIndexHeap) Next() error {
return nil
}
// Len implements heap.Interface.
// Notice that Len() > 0 does not imply that heap is not empty as elements are not removed from this heap.
// Use empty() to check whether heap is empty or not.
func (h postingsWithIndexHeap) Len() int { return len(h) }
// Less implements heap.Interface, it puts all the popped elements at the bottom,
// and then sorts by Postings.At() property of each node.
func (h postingsWithIndexHeap) Less(i, j int) bool {
if h[i].popped != h[j].popped {
return h[j].popped
}
return h[i].p.At() < h[j].p.At()
}
// Swap implements heap.Interface.
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
// Push implements heap.Interface.
func (h *postingsWithIndexHeap) Push(x interface{}) {
*h = append(*h, x.(postingsWithIndex))
}

14
tsdb/index/postings_test.go

@ -1041,11 +1041,11 @@ func TestPostingsWithIndexHeap(t *testing.T) {
for _, node := range h {
node.p.Next()
}
h.Init()
heap.Init(&h)
for _, expected := range []storage.SeriesRef{1, 5, 10, 20, 25, 30, 50} {
require.Equal(t, expected, h.At())
require.NoError(t, h.Next())
require.Equal(t, expected, h.at())
require.NoError(t, h.next())
}
require.True(t, h.empty())
})
@ -1059,13 +1059,13 @@ func TestPostingsWithIndexHeap(t *testing.T) {
for _, node := range h {
node.p.Next()
}
h.Init()
heap.Init(&h)
for _, expected := range []storage.SeriesRef{1, 5, 10, 20} {
require.Equal(t, expected, h.At())
require.NoError(t, h.Next())
require.Equal(t, expected, h.at())
require.NoError(t, h.next())
}
require.Equal(t, storage.SeriesRef(25), h.At())
require.Equal(t, storage.SeriesRef(25), h.at())
node := heap.Pop(&h).(postingsWithIndex)
require.Equal(t, 2, node.index)
require.Equal(t, storage.SeriesRef(25), node.p.At())

Loading…
Cancel
Save