diff --git a/storage/merge.go b/storage/merge.go index 9ef8690a7..3581f61d0 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -16,7 +16,6 @@ package storage import ( "bytes" "container/heap" - "fmt" "sort" "strings" "sync" @@ -419,7 +418,8 @@ func (h *genericSeriesSetHeap) Pop() interface{} { // with "almost" the same data, e.g. from 2 Prometheus HA replicas. This is fine, since from the Prometheus perspective // this never happens. // -// It's optimized for non-overlap cases as well. +// NOTE: Use this merge function only when you see potentially overlapping series, as this introduces a small overhead +// to handle overlaps between series. func ChainedSeriesMerge(series ...Series) Series { if len(series) == 0 { return nil @@ -438,12 +438,10 @@ func ChainedSeriesMerge(series ...Series) Series { // chainSampleIterator is responsible to iterate over samples from different iterators of the same time series in timestamps // order. If one or more samples overlap, one sample from random overlapped ones is kept and all others with the same -// timestamp are dropped. It's optimized for non-overlap cases as well. +// timestamp are dropped. type chainSampleIterator struct { iterators []chunkenc.Iterator h samplesIteratorHeap - - curr chunkenc.Iterator } func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { @@ -464,42 +462,43 @@ func (c *chainSampleIterator) Seek(t int64) bool { } func (c *chainSampleIterator) At() (t int64, v float64) { - if c.h == nil { + if len(c.h) == 0 { panic("chainSampleIterator.At() called after .Next() returned false.") } - return c.curr.At() + + return c.h[0].At() } func (c *chainSampleIterator) Next() bool { if c.h == nil { - c.curr = c.iterators[0] - for _, iter := range c.iterators[1:] { + for _, iter := range c.iterators { if iter.Next() { heap.Push(&c.h, iter) } } - } else if len(c.h) == 0 { - return c.curr != nil && c.curr.Next() + + return len(c.h) > 0 } - for { - if c.curr.Next() { - currt, _ := c.curr.At() - nextt, _ := c.h[0].At() - if currt < nextt { - return true - } - if currt == nextt { - fmt.Println("same ts", currt, nextt) - // Ignoring sample. - continue - } - heap.Push(&c.h, c.curr) + if len(c.h) == 0 { + return false + } + + currt, _ := c.At() + for len(c.h) > 0 { + nextt, _ := c.h[0].At() + // All but one of the overlapping samples will be dropped. + if nextt != currt { + break } - c.curr = heap.Pop(&c.h).(chunkenc.Iterator) - return true + iter := heap.Pop(&c.h).(chunkenc.Iterator) + if iter.Next() { + heap.Push(&c.h, iter) + } } + + return len(c.h) > 0 } func (c *chainSampleIterator) Err() error {