Browse Source

Pop intersected postings heap without popping (#10092)

See this comment for detailed explanation:

https://github.com/prometheus/prometheus/pull/9907#issuecomment-1002189932

TL;DR: if we don't call Pop() on the heap implementation, we don't need
to return our param as an `interface{}` so we save an allocation.
This would be popped for every label value, so it can be thousands of
saved allocations here (see benchmarks).

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
pull/10119/head
Oleg Zaytsev 3 years ago committed by GitHub
parent
commit
701545286d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      tsdb/index/postings.go
  2. 2
      tsdb/index/postings_test.go

41
tsdb/index/postings.go

@ -846,18 +846,17 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
return nil, it.Err()
}
}
if len(h) == 0 {
if h.empty() {
return nil, nil
}
h.Init()
for len(h) > 0 {
for !h.empty() {
if !p.Seek(h.At()) {
return indexes, p.Err()
}
if p.At() == h.At() {
found := heap.Pop(&h).(postingsWithIndex)
indexes = append(indexes, found.index)
indexes = append(indexes, h.popIndex())
} else if err := h.Next(); err != nil {
return nil, err
}
@ -869,10 +868,29 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
type postingsWithIndex struct {
index int
p Postings
// popped means that this postings shouldn't be considered anymore.
// See popIndex() comment to understand why we need this.
popped bool
}
type postingsWithIndexHeap []postingsWithIndex
// empty checks whether the heap is empty, which is true if it has no elements, of if the smallest element is popped.
func (h *postingsWithIndexHeap) empty() bool {
return len(*h) == 0 || (*h)[0].popped
}
// popIndex pops the smallest heap element and returns its index.
// In our implementation we don't actually do heap.Pop(), instead we mark the element as `popped` and fix its position, which
// should be after all the non-popped elements according to our sorting strategy.
// By skipping the `heap.Pop()` call we avoid an extra allocation in this heap's Pop() implementation which returns an interface{}.
func (h *postingsWithIndexHeap) popIndex() int {
index := (*h)[0].index
(*h)[0].popped = true
heap.Fix(h, 0)
return index
}
func (h *postingsWithIndexHeap) Init() { heap.Init(h) }
func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() }
func (h *postingsWithIndexHeap) Next() error {
@ -886,14 +904,18 @@ func (h *postingsWithIndexHeap) Next() error {
if err := pi.p.Err(); err != nil {
return errors.Wrapf(err, "postings %d", pi.index)
}
heap.Pop(h)
h.popIndex()
return nil
}
func (h postingsWithIndexHeap) Len() int { return len(h) }
func (h postingsWithIndexHeap) Less(i, j int) bool { return h[i].p.At() < h[j].p.At() }
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h postingsWithIndexHeap) Len() int { return len(h) }
func (h postingsWithIndexHeap) Less(i, j int) bool {
if h[i].popped != h[j].popped {
return h[j].popped
}
return h[i].p.At() < h[j].p.At()
}
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h *postingsWithIndexHeap) Push(x interface{}) {
*h = append(*h, x.(postingsWithIndex))
@ -901,6 +923,7 @@ func (h *postingsWithIndexHeap) Push(x interface{}) {
// Pop implements heap.Interface and pops the last element, which is NOT the min element,
// so this doesn't return the same heap.Pop()
// Although this method is implemented for correctness, we don't expect it to be used, see popIndex() method for details.
func (h *postingsWithIndexHeap) Pop() interface{} {
old := *h
n := len(old)

2
tsdb/index/postings_test.go

@ -1047,7 +1047,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
require.Equal(t, expected, h.At())
require.NoError(t, h.Next())
}
require.Empty(t, h)
require.True(t, h.empty())
})
t.Run("pop", func(t *testing.T) {

Loading…
Cancel
Save