diff --git a/storage/interface.go b/storage/interface.go index b96e70dc3..2cbf86126 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -90,3 +90,66 @@ type SeriesIterator interface { // Err returns the current error. Err() error } + +// dedupedSeriesSet takes two series sets and returns them deduplicated. +// The input sets must be sorted and identical if two series exist in both, i.e. +// if their label sets are equal, the datapoints must be equal as well. +type dedupedSeriesSet struct { + a, b SeriesSet + + cur Series + adone, bdone bool +} + +// DeduplicateSeriesSet merges two SeriesSet and removes duplicates. +// If two series exist in both sets, their datapoints must be equal. +func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet { + s := &dedupedSeriesSet{a: a, b: b} + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + + return s +} + +func (s *dedupedSeriesSet) At() Series { + return s.cur +} + +func (s *dedupedSeriesSet) Err() error { + if s.a.Err() != nil { + return s.a.Err() + } + return s.b.Err() +} + +func (s *dedupedSeriesSet) compare() int { + if s.adone { + return 1 + } + if s.bdone { + return -1 + } + return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) +} + +func (s *dedupedSeriesSet) Next() bool { + if s.adone && s.bdone || s.Err() != nil { + return false + } + + d := s.compare() + + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + s.cur = s.b.At() + s.bdone = !s.b.Next() + } else if d < 0 { + s.cur = s.a.At() + s.adone = !s.a.Next() + } else { + s.cur = s.a.At() + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + } + return true +}