Browse Source

promql: propagate storage errors

pull/3963/head
Anton Tereshchenkov 7 years ago
parent
commit
18bbec050c
  1. 5
      promql/engine.go
  2. 55
      promql/engine_test.go

5
promql/engine.go

@ -497,6 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
}
Inspect(s.Expr, func(node Node, path []Node) bool {
var set storage.SeriesSet
params := &storage.SelectParams{
Step: int64(s.Interval / time.Millisecond),
}
@ -505,7 +506,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
case *VectorSelector:
params.Func = extractFuncFromPath(path)
set, err := querier.Select(params, n.LabelMatchers...)
set, err = querier.Select(params, n.LabelMatchers...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return false
@ -524,7 +525,7 @@ func (ng *Engine) populateIterators(ctx context.Context, q storage.Queryable, s
case *MatrixSelector:
params.Func = extractFuncFromPath(path)
set, err := querier.Select(params, n.LabelMatchers...)
set, err = querier.Select(params, n.LabelMatchers...)
if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
return false

55
promql/engine_test.go

@ -22,6 +22,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)
func TestQueryConcurrency(t *testing.T) {
@ -142,6 +143,60 @@ func TestQueryCancel(t *testing.T) {
}
}
// errQuerier implements storage.Querier which always returns error.
type errQuerier struct {
err error
}
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, error) {
return errSeriesSet{err: q.err}, q.err
}
func (*errQuerier) LabelValues(name string) ([]string, error) { return nil, nil }
func (*errQuerier) Close() error { return nil }
// errSeriesSet implements storage.SeriesSet which always returns error.
type errSeriesSet struct {
err error
}
func (errSeriesSet) Next() bool { return false }
func (errSeriesSet) At() storage.Series { return nil }
func (e errSeriesSet) Err() error { return e.err }
func TestQueryError(t *testing.T) {
engine := NewEngine(nil, nil, 10, 10*time.Second)
errStorage := ErrStorage(fmt.Errorf("storage error"))
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &errQuerier{err: errStorage}, nil
})
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
vectorQuery, err := engine.NewInstantQuery(queryable, "foo", time.Unix(1, 0))
if err != nil {
t.Fatalf("unexpected error creating query: %q", err)
}
res := vectorQuery.Exec(ctx)
if res.Err == nil {
t.Fatalf("expected error on failed select but got none")
}
if res.Err != errStorage {
t.Fatalf("expected error %q, got %q", errStorage, res.Err)
}
matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0))
if err != nil {
t.Fatalf("unexpected error creating query: %q", err)
}
res = matrixQuery.Exec(ctx)
if res.Err == nil {
t.Fatalf("expected error on failed select but got none")
}
if res.Err != errStorage {
t.Fatalf("expected error %q, got %q", errStorage, res.Err)
}
}
func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil, nil, 10, 10*time.Second)
ctx, cancelCtx := context.WithCancel(context.Background())

Loading…
Cancel
Save