Browse Source

Add explicit error to Querier.Select

This has been a frequent source of debugging pain since errors are
potentially delayed to a much later point. They bubble up in an
unrelated execution path.
pull/5805/head
Fabian Reinartz 7 years ago
parent
commit
e5ce2bef43
  1. 5
      block.go
  2. 35
      db_test.go
  3. 5
      head.go
  4. 3
      head_test.go
  5. 5
      postings.go
  6. 65
      querier.go
  7. 8
      querier_test.go

5
block.go

@ -285,7 +285,10 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
}
pr := newPostingsReader(pb.indexr)
p, absent := pr.Select(ms...)
p, absent, err := pr.Select(ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
ir := pb.indexr

35
db_test.go

@ -36,8 +36,11 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
return db, func() { os.RemoveAll(tmpdir) }
}
// Convert a SeriesSet into a form useable with reflect.DeepEqual.
func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample {
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample {
ss, err := q.Select(matchers...)
Ok(t, err)
result := map[string][]sample{}
for ss.Next() {
@ -49,12 +52,12 @@ func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
}
require.NoError(t, it.Err())
Ok(t, it.Err())
name := series.Labels().String()
result[name] = samples
}
require.NoError(t, ss.Err())
Ok(t, ss.Err())
return result
}
@ -70,7 +73,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
querier, err := db.Querier(0, 1)
require.NoError(t, err)
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{})
require.NoError(t, querier.Close())
@ -82,7 +85,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
require.NoError(t, err)
defer querier.Close()
seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}})
}
@ -102,7 +105,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
require.NoError(t, err)
defer querier.Close()
seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar")))
seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar"))
require.Equal(t, seriesSet, map[string][]sample{})
}
@ -146,7 +149,7 @@ func TestDBAppenderAddRef(t *testing.T) {
q, err := db.Querier(0, 200)
require.NoError(t, err)
res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b")))
res := query(t, q, labels.NewEqualMatcher("a", "b"))
require.Equal(t, map[string][]sample{
labels.FromStrings("a", "b").String(): []sample{
@ -198,7 +201,8 @@ Outer:
q, err := db.Querier(0, numSamples)
require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b"))
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {
@ -294,8 +298,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
q, err := db.Querier(0, 10)
require.NoError(t, err)
ss := q.Select(labels.NewEqualMatcher("a", "b"))
ssMap := readSeriesSet(t, ss)
ssMap := query(t, q, labels.NewEqualMatcher("a", "b"))
require.Equal(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}},
@ -314,8 +317,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
q, err = db.Querier(0, 10)
require.NoError(t, err)
ss = q.Select(labels.NewEqualMatcher("a", "b"))
ssMap = readSeriesSet(t, ss)
ssMap = query(t, q, labels.NewEqualMatcher("a", "b"))
require.Equal(t, map[string][]sample{
labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}},
@ -352,7 +354,9 @@ func TestDB_Snapshot(t *testing.T) {
defer querier.Close()
// sum values
seriesSet := querier.Select(labels.NewEqualMatcher("foo", "bar"))
seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar"))
require.NoError(t, err)
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
@ -500,7 +504,8 @@ func TestDB_e2e(t *testing.T) {
q, err := db.Querier(mint, maxt)
require.NoError(t, err)
ss := q.Select(qry.ms...)
ss, err := q.Select(qry.ms...)
require.NoError(t, err)
result := map[string][]sample{}

5
head.go

@ -575,7 +575,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := h.indexRange(mint, maxt)
pr := newPostingsReader(ir)
p, absent := pr.Select(ms...)
p, absent, err := pr.Select(ms...)
if err != nil {
return errors.Wrap(err, "select series")
}
var stones []Stone

3
head_test.go

@ -328,7 +328,8 @@ Outer:
// Compare the result.
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
res := q.Select(labels.NewEqualMatcher("a", "b"))
res, err := q.Select(labels.NewEqualMatcher("a", "b"))
require.NoError(t, err)
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {

5
postings.go

@ -165,6 +165,11 @@ func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{}
// EmptyPostings returns a postings list that's always empty.
func EmptyPostings() Postings {
return emptyPostings
}
// Intersect returns a new postings list over the intersection of the
// input postings.
func Intersect(its ...Postings) Postings {

65
querier.go

@ -27,7 +27,7 @@ import (
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(...labels.Matcher) SeriesSet
Select(...labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error)
@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
return nil, fmt.Errorf("not implemented")
}
func (q *querier) Select(ms ...labels.Matcher) SeriesSet {
func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms)
}
func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet {
func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 {
return nopSeriesSet{}
return EmptySeriesSet(), nil
}
if len(qs) == 1 {
return qs[0].Select(ms...)
}
l := len(qs) / 2
return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms))
a, err := q.sel(qs[:l], ms)
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedSeriesSet(a, b), nil
}
func (q *querier) Close() error {
@ -141,10 +150,13 @@ type blockQuerier struct {
mint, maxt int64
}
func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) {
pr := newPostingsReader(q.index)
p, absent := pr.Select(ms...)
p, absent, err := pr.Select(ms...)
if err != nil {
return nil, err
}
return &blockSeriesSet{
set: &populatedChunkSeries{
@ -162,7 +174,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
mint: q.mint,
maxt: q.maxt,
}
}, nil
}
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
@ -205,7 +217,7 @@ func newPostingsReader(i IndexReader) *postingsReader {
return &postingsReader{index: i}
}
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string, error) {
var (
its []Postings
absent []string
@ -217,12 +229,16 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
absent = append(absent, m.Name())
continue
}
its = append(its, r.selectSingle(m))
it, err := r.selectSingle(m)
if err != nil {
return nil, nil, err
}
its = append(its, it)
}
p := Intersect(its...)
return r.index.SortedPostings(p), absent
return r.index.SortedPostings(p), absent, nil
}
// tuplesByPrefix uses binary search to find prefix matches within ts.
@ -256,33 +272,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
return matches, nil
}
func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
func (r *postingsReader) selectSingle(m labels.Matcher) (Postings, error) {
// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
it, err := r.index.Postings(em.Name(), em.Value())
if err != nil {
return errPostings{err: err}
return nil, err
}
return it
return it, nil
}
tpls, err := r.index.LabelValues(m.Name())
if err != nil {
return errPostings{err: err}
return nil, err
}
var res []string
if pm, ok := m.(*labels.PrefixMatcher); ok {
res, err = tuplesByPrefix(pm, tpls)
if err != nil {
return errPostings{err: err}
return nil, err
}
} else {
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errPostings{err: err}
return nil, err
}
if m.Matches(vals[0]) {
res = append(res, vals[0])
@ -291,7 +307,7 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
}
if len(res) == 0 {
return emptyPostings
return EmptyPostings(), nil
}
var rit []Postings
@ -299,12 +315,12 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
for _, v := range res {
it, err := r.index.Postings(m.Name(), v)
if err != nil {
return errPostings{err: err}
return nil, err
}
rit = append(rit, it)
}
return Merge(rit...)
return Merge(rit...), nil
}
func mergeStrings(a, b []string) []string {
@ -342,11 +358,12 @@ type SeriesSet interface {
Err() error
}
type nopSeriesSet struct{}
var emptySeriesSet = errSeriesSet{}
func (nopSeriesSet) Next() bool { return false }
func (nopSeriesSet) At() Series { return nil }
func (nopSeriesSet) Err() error { return nil }
// EmptySeriesSet returns a series set that's always empty.
func EmptySeriesSet() SeriesSet {
return emptySeriesSet
}
// mergedSeriesSet takes two series sets as a single series set. The input series sets
// must be sorted and sequential in time, i.e. if they have the same label set,

8
querier_test.go

@ -460,7 +460,8 @@ Outer:
maxt: c.maxt,
}
res := querier.Select(c.ms...)
res, err := querier.Select(c.ms...)
require.NoError(t, err)
for {
eok, rok := c.exp.Next(), res.Next()
@ -632,7 +633,8 @@ Outer:
maxt: c.maxt,
}
res := querier.Select(c.ms...)
res, err := querier.Select(c.ms...)
require.NoError(t, err)
for {
eok, rok := c.exp.Next(), res.Next()
@ -1228,7 +1230,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
return nopSeriesSet{}
return EmptySeriesSet()
}
if len(sets) == 1 {
return sets[0]

Loading…
Cancel
Save