Merge pull request #3569 from EdSchouten/faster-federation

Deprecate DeduplicateSeriesSet() in favor of NewMergeSeriesSet().
pull/3578/head
Tom Wilkie 2017-12-11 09:37:15 -06:00 committed by GitHub
commit 73fa721dd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 11 additions and 83 deletions

View File

@ -225,7 +225,7 @@ func (q *mergeQuerier) Select(matchers ...*labels.Matcher) (SeriesSet, error) {
} }
seriesSets = append(seriesSets, set) seriesSets = append(seriesSets, set)
} }
return newMergeSeriesSet(seriesSets), nil return NewMergeSeriesSet(seriesSets), nil
} }
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
@ -300,7 +300,9 @@ type mergeSeriesSet struct {
sets []SeriesSet sets []SeriesSet
} }
func newMergeSeriesSet(sets []SeriesSet) SeriesSet { // NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating.
func NewMergeSeriesSet(sets []SeriesSet) SeriesSet {
// Sets need to be pre-advanced, so we can introspect the label of the // Sets need to be pre-advanced, so we can introspect the label of the
// series under the cursor. // series under the cursor.
var h seriesSetHeap var h seriesSetHeap

View File

@ -97,7 +97,7 @@ func TestMergeSeriesSet(t *testing.T) {
), ),
}, },
} { } {
merged := newMergeSeriesSet(tc.input) merged := NewMergeSeriesSet(tc.input)
for merged.Next() { for merged.Next() {
require.True(t, tc.expected.Next()) require.True(t, tc.expected.Next())
actualSeries := merged.At() actualSeries := merged.At()

View File

@ -110,73 +110,3 @@ type SeriesIterator interface {
// Err returns the current error. // Err returns the current error.
Err() error Err() error
} }
// dedupedSeriesSet takes two series sets and returns them deduplicated.
// The input sets must be sorted and identical if two series exist in both, i.e.
// if their label sets are equal, the datapoints must be equal as well.
type dedupedSeriesSet struct {
a, b SeriesSet
cur Series
adone, bdone bool
}
// DeduplicateSeriesSet merges two SeriesSet and removes duplicates.
// If two series exist in both sets, their datapoints must be equal.
func DeduplicateSeriesSet(a, b SeriesSet) SeriesSet {
if a == nil {
return b
}
if b == nil {
return a
}
s := &dedupedSeriesSet{a: a, b: b}
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return s
}
func (s *dedupedSeriesSet) At() Series {
return s.cur
}
func (s *dedupedSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}
func (s *dedupedSeriesSet) compare() int {
if s.adone {
return 1
}
if s.bdone {
return -1
}
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
}
func (s *dedupedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}
d := s.compare()
// Both sets contain the current series. Chain them into a single one.
if d > 0 {
s.cur = s.b.At()
s.bdone = !s.b.Next()
} else if d < 0 {
s.cur = s.a.At()
s.adone = !s.a.Next()
} else {
s.cur = s.a.At()
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
}
return true
}

View File

@ -394,18 +394,17 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
} }
defer q.Close() defer q.Close()
var set storage.SeriesSet var sets []storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, err := q.Select(mset...) s, err := q.Select(mset...)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err} return nil, &apiError{errorExec, err}
} }
set = storage.DeduplicateSeriesSet(set, s) sets = append(sets, s)
} }
set := storage.NewMergeSeriesSet(sets)
metrics := []labels.Labels{} metrics := []labels.Labels{}
for set.Next() { for set.Next() {
metrics = append(metrics, set.At().Labels()) metrics = append(metrics, set.At().Labels())
} }

View File

@ -72,8 +72,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
vec := make(promql.Vector, 0, 8000) vec := make(promql.Vector, 0, 8000)
var set storage.SeriesSet var sets []storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, err := q.Select(mset...) s, err := q.Select(mset...)
if err != nil { if err != nil {
@ -81,12 +80,10 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
set = storage.DeduplicateSeriesSet(set, s) sets = append(sets, s)
}
if set == nil {
return
} }
set := storage.NewMergeSeriesSet(sets)
for set.Next() { for set.Next() {
s := set.At() s := set.At()