Dedupe samples in the mergeIterator.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/4185/head
Tom Wilkie 2018-05-23 12:15:47 +01:00
parent b5f94667f7
commit ba418780be
2 changed files with 20 additions and 4 deletions

View File

@ -432,7 +432,6 @@ func (c *mergeIterator) At() (t int64, v float64) {
panic("mergeIterator.At() called after .Next() returned false.")
}
// TODO do I need to dedupe or just merge?
return c.h[0].At()
}
@ -443,6 +442,7 @@ func (c *mergeIterator) Next() bool {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
@ -450,10 +450,18 @@ func (c *mergeIterator) Next() bool {
return false
}
currt, currv := c.At()
for len(c.h) > 0 {
nextt, nextv := c.h[0].At()
if nextt != currt || nextv != currv {
break
}
iter := heap.Pop(&c.h).(SeriesIterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}

View File

@ -136,6 +136,14 @@ func TestMergeIterator(t *testing.T) {
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
},
{
input: []SeriesIterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{0, 0}, {2, 2}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
},
} {
merged := newMergeIterator(tc.input)
actual := drainSamples(merged)