diff --git a/storage/fanout.go b/storage/fanout.go index 061d993af..ae74ad2ca 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -225,7 +225,7 @@ func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) { } seriesSets = append(seriesSets, set) } - return newMergeSeriesSet(seriesSets), nil + return NewMergeSeriesSet(seriesSets), nil } // LabelValues returns all potential values for a label name. @@ -300,7 +300,9 @@ type mergeSeriesSet struct { sets []SeriesSet } -func newMergeSeriesSet(sets []SeriesSet) SeriesSet { +// NewMergeSeriesSet returns a new series set that merges (deduplicates) +// series returned by the input series sets when iterating. +func NewMergeSeriesSet(sets []SeriesSet) SeriesSet { // Sets need to be pre-advanced, so we can introspect the label of the // series under the cursor. var h seriesSetHeap diff --git a/storage/fanout_test.go b/storage/fanout_test.go index d8354b442..c8094ea33 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -97,7 +97,7 @@ func TestMergeSeriesSet(t *testing.T) { ), }, } { - merged := newMergeSeriesSet(tc.input) + merged := NewMergeSeriesSet(tc.input) for merged.Next() { require.True(t, tc.expected.Next()) actualSeries := merged.At() diff --git a/storage/interface.go b/storage/interface.go index 71261b2c9..e04126cc9 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -110,73 +110,3 @@ type SeriesIterator interface { // Err returns the current error. Err() error } - -// dedupedSeriesSet takes two series sets and returns them deduplicated. -// The input sets must be sorted and identical if two series exist in both, i.e. -// if their label sets are equal, the datapoints must be equal as well. -type dedupedSeriesSet struct { - a, b SeriesSet - - cur Series - adone, bdone bool -} - -// DeduplicateSeriesSet merges two SeriesSet and removes duplicates. -// If two series exist in both sets, their datapoints must be equal. -func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet { - if a == nil { - return b - } - if b == nil { - return a - } - - s := &dedupedSeriesSet{a: a, b: b} - s.adone = !s.a.Next() - s.bdone = !s.b.Next() - - return s -} - -func (s *dedupedSeriesSet) At() Series { - return s.cur -} - -func (s *dedupedSeriesSet) Err() error { - if s.a.Err() != nil { - return s.a.Err() - } - return s.b.Err() -} - -func (s *dedupedSeriesSet) compare() int { - if s.adone { - return 1 - } - if s.bdone { - return -1 - } - return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) -} - -func (s *dedupedSeriesSet) Next() bool { - if s.adone && s.bdone || s.Err() != nil { - return false - } - - d := s.compare() - - // Both sets contain the current series. Chain them into a single one. - if d > 0 { - s.cur = s.b.At() - s.bdone = !s.b.Next() - } else if d < 0 { - s.cur = s.a.At() - s.adone = !s.a.Next() - } else { - s.cur = s.a.At() - s.adone = !s.a.Next() - s.bdone = !s.b.Next() - } - return true -} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 56d0d37ed..257ebcd29 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -394,18 +394,17 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { } defer q.Close() - var set storage.SeriesSet - + var sets []storage.SeriesSet for _, mset := range matcherSets { s, err := q.Select(mset...) if err != nil { return nil, &apiError{errorExec, err} } - set = storage.DeduplicateSeriesSet(set, s) + sets = append(sets, s) } + set := storage.NewMergeSeriesSet(sets) metrics := []labels.Labels{} - for set.Next() { metrics = append(metrics, set.At().Labels()) } diff --git a/web/federate.go b/web/federate.go index 43028a3ac..8640a886d 100644 --- a/web/federate.go +++ b/web/federate.go @@ -72,8 +72,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec := make(promql.Vector, 0, 8000) - var set storage.SeriesSet - + var sets []storage.SeriesSet for _, mset := range matcherSets { s, err := q.Select(mset...) if err != nil { @@ -81,12 +80,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - set = storage.DeduplicateSeriesSet(set, s) - } - if set == nil { - return + sets = append(sets, s) } + set := storage.NewMergeSeriesSet(sets) for set.Next() { s := set.At()