Browse Source

Fixed iterator regression: Avoid using heap for each sample when iterating.

Fixes: https://github.com/prometheus/prometheus/issues/7873

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
pull/7902/head
Julien Pivotto 4 years ago committed by Bartlomiej Plotka
parent
commit
2c8b2c5915
  1. 51
      storage/merge.go

51
storage/merge.go

@ -16,6 +16,7 @@ package storage
import (
"bytes"
"container/heap"
"fmt"
"sort"
"strings"
"sync"
@ -418,8 +419,7 @@ func (h *genericSeriesSetHeap) Pop() interface{} {
// with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective
// this never happens.
//
// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead
// to handle overlaps between series.
// It's optimized for non-overlap cases as well.
func ChainedSeriesMerge(series ...Series) Series {
if len(series) == 0 {
return nil
@ -438,10 +438,12 @@ func ChainedSeriesMerge(series ...Series) Series {
// chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps
// order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same
// timestamp are dropped.
// timestamp are dropped. It's optimized for non-overlap cases as well.
type chainSampleIterator struct {
iterators []chunkenc.Iterator
h samplesIteratorHeap
curr chunkenc.Iterator
}
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
@ -462,43 +464,42 @@ func (c *chainSampleIterator) Seek(t int64) bool {
}
func (c *chainSampleIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
if c.h == nil {
panic("chainSampleIterator.At() called after .Next() returned false.")
}
return c.h[0].At()
return c.curr.At()
}
func (c *chainSampleIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
c.curr = c.iterators[0]
for _, iter := range c.iterators[1:] {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
} else if len(c.h) == 0 {
return c.curr != nil && c.curr.Next()
}
currt, _ := c.At()
for len(c.h) > 0 {
nextt, _ := c.h[0].At()
// All but one of the overlapping samples will be dropped.
if nextt != currt {
break
for {
if c.curr.Next() {
currt, _ := c.curr.At()
nextt, _ := c.h[0].At()
if currt < nextt {
return true
}
if currt == nextt {
fmt.Println("same ts", currt, nextt)
// Ignoring sample.
continue
}
heap.Push(&c.h, c.curr)
}
iter := heap.Pop(&c.h).(chunkenc.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
return true
}
return len(c.h) > 0
}
func (c *chainSampleIterator) Err() error {

Loading…
Cancel
Save