Merge pull request #98 from prometheus/balance

Properly balance k-way operations
pull/5805/head
Fabian Reinartz 2017-06-13 10:09:55 +02:00 committed by GitHub
commit 3a36551b90
2 changed files with 32 additions and 30 deletions

View File

@ -78,12 +78,11 @@ func Intersect(its ...Postings) Postings {
if len(its) == 0 {
return emptyPostings
}
a := its[0]
for _, b := range its[1:] {
a = newIntersectPostings(a, b)
if len(its) == 1 {
return its[0]
}
return a
l := len(its) / 2
return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...))
}
type intersectPostings struct {
@ -145,12 +144,11 @@ func Merge(its ...Postings) Postings {
if len(its) == 0 {
return nil
}
a := its[0]
for _, b := range its[1:] {
a = newMergedPostings(a, b)
if len(its) == 1 {
return its[0]
}
return a
l := len(its) / 2
return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...))
}
type mergedPostings struct {

View File

@ -75,22 +75,26 @@ func (s *DB) Querier(mint, maxt int64) Querier {
}
func (q *querier) LabelValues(n string) ([]string, error) {
if len(q.blocks) == 0 {
return q.lvals(q.blocks, n)
}
func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
if len(qs) == 0 {
return nil, nil
}
res, err := q.blocks[0].LabelValues(n)
if len(qs) == 1 {
return qs[0].LabelValues(n)
}
l := len(qs) / 2
s1, err := q.lvals(qs[:l], n)
if err != nil {
return nil, err
}
for _, bq := range q.blocks[1:] {
pr, err := bq.LabelValues(n)
if err != nil {
return nil, err
}
// Merge new values into deduplicated result.
res = mergeStrings(res, pr)
s2, err := q.lvals(qs[l:], n)
if err != nil {
return nil, err
}
return res, nil
return mergeStrings(s1, s2), nil
}
func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
@ -98,19 +102,19 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
}
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
// Sets from different blocks have no time overlap. The reference numbers
// they emit point to series sorted in lexicographic order.
// We can fully connect partial series by simply comparing with the previous
// label set.
if len(q.blocks) == 0 {
return q.sel(q.blocks, ms)
}
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet {
if len(qs) == 0 {
return nopSeriesSet{}
}
r := q.blocks[0].Select(ms...)
for _, s := range q.blocks[1:] {
r = newMergedSeriesSet(r, s.Select(ms...))
if len(qs) == 1 {
return qs[0].Select(ms...)
}
return r
l := len(qs) / 2
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
}
func (q *querier) Close() error {