Merge pull request #2079 from prometheus/fabxc-storage

storage: enhance Querier interface usage
pull/1952/merge
Fabian Reinartz 8 years ago committed by GitHub
commit 630b96c5f3

@ -218,22 +218,27 @@ func contextDone(ctx context.Context, env string) error {
// Engine handles the lifetime of queries from beginning to end. // Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier. // It is connected to a querier.
type Engine struct { type Engine struct {
// The querier on which the engine operates. // A Querier constructor against an underlying storage.
querier local.Querier queryable Queryable
// The gate limiting the maximum number of concurrent and waiting queries. // The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate gate *queryGate
options *EngineOptions options *EngineOptions
} }
// Queryable allows opening a storage querier.
type Queryable interface {
Querier() (local.Querier, error)
}
// NewEngine returns a new engine. // NewEngine returns a new engine.
func NewEngine(querier local.Querier, o *EngineOptions) *Engine { func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
if o == nil { if o == nil {
o = DefaultEngineOptions o = DefaultEngineOptions
} }
return &Engine{ return &Engine{
querier: querier, queryable: queryable,
gate: newQueryGate(o.MaxConcurrentQueries), gate: newQueryGate(o.MaxConcurrentQueries),
options: o, options: o,
} }
} }
@ -351,13 +356,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range. // execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
querier, err := ng.queryable.Querier()
if err != nil {
return nil, err
}
defer querier.Close()
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
err := ng.populateIterators(ctx, s) err = ng.populateIterators(ctx, querier, s)
prepareTimer.Stop() prepareTimer.Stop()
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer ng.closeIterators(s) defer ng.closeIterators(s)
evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start()
@ -463,20 +473,20 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return resMatrix, nil return resMatrix, nil
} }
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { func (ng *Engine) populateIterators(ctx context.Context, querier local.Querier, s *EvalStmt) error {
var queryErr error var queryErr error
Inspect(s.Expr, func(node Node) bool { Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
if s.Start.Equal(s.End) { if s.Start.Equal(s.End) {
n.iterators, queryErr = ng.querier.QueryInstant( n.iterators, queryErr = querier.QueryInstant(
ctx, ctx,
s.Start.Add(-n.Offset), s.Start.Add(-n.Offset),
StalenessDelta, StalenessDelta,
n.LabelMatchers..., n.LabelMatchers...,
) )
} else { } else {
n.iterators, queryErr = ng.querier.QueryRange( n.iterators, queryErr = querier.QueryRange(
ctx, ctx,
s.Start.Add(-n.Offset-StalenessDelta), s.Start.Add(-n.Offset-StalenessDelta),
s.End.Add(-n.Offset), s.End.Add(-n.Offset),
@ -487,7 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error {
return false return false
} }
case *MatrixSelector: case *MatrixSelector:
n.iterators, queryErr = ng.querier.QueryRange( n.iterators, queryErr = querier.QueryRange(
ctx, ctx,
s.Start.Add(-n.Offset-n.Range), s.Start.Add(-n.Offset-n.Range),
s.End.Add(-n.Offset), s.End.Add(-n.Offset),

@ -26,7 +26,8 @@ import (
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender. // are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface { type Storage interface {
Querier // Querier returns a new Querier on the storage.
Querier() (Querier, error)
// This SampleAppender needs multiple samples for the same fingerprint to be // This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has // submitted in chronological order, from oldest to newest. When Append has
@ -57,6 +58,9 @@ type Storage interface {
// Querier allows querying a time series storage. // Querier allows querying a time series storage.
type Querier interface { type Querier interface {
// Close closes the querier. Behavior for subsequent calls to Querier methods
// is undefined.
Close() error
// QueryRange returns a list of series iterators for the selected // QueryRange returns a list of series iterators for the selected
// time range and label matchers. The iterators need to be closed // time range and label matchers. The iterators need to be closed
// after usage. // after usage.

@ -40,23 +40,35 @@ func (s *NoopStorage) Stop() error {
func (s *NoopStorage) WaitForIndexing() { func (s *NoopStorage) WaitForIndexing() {
} }
// LastSampleForLabelMatchers implements Storage. // Querier implements Storage.
func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { func (s *NoopStorage) Querier() (Querier, error) {
return &NoopQuerier{}, nil
}
type NoopQuerier struct{}
// Close implements Querier.
func (s *NoopQuerier) Close() error {
return nil
}
// LastSampleForLabelMatchers implements Querier.
func (s *NoopQuerier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
return nil, nil return nil, nil
} }
// QueryRange implements Storage. // QueryRange implements Querier
func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { func (s *NoopQuerier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
return nil, nil return nil, nil
} }
// QueryInstant implements Storage. // QueryInstant implements Querier.
func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { func (s *NoopQuerier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
return nil, nil return nil, nil
} }
// MetricsForLabelMatchers implements Storage. // MetricsForLabelMatchers implements Querier.
func (s *NoopStorage) MetricsForLabelMatchers( func (s *NoopQuerier) MetricsForLabelMatchers(
ctx context.Context, ctx context.Context,
from, through model.Time, from, through model.Time,
matcherSets ...metric.LabelMatchers, matcherSets ...metric.LabelMatchers,
@ -64,8 +76,8 @@ func (s *NoopStorage) MetricsForLabelMatchers(
return nil, nil return nil, nil
} }
// LabelValuesForLabelName implements Storage. // LabelValuesForLabelName implements Querier.
func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { func (s *NoopQuerier) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
return nil, nil return nil, nil
} }

@ -403,6 +403,19 @@ func (s *MemorySeriesStorage) Stop() error {
return nil return nil
} }
type memorySeriesStorageQuerier struct {
*MemorySeriesStorage
}
func (memorySeriesStorageQuerier) Close() error {
return nil
}
// Querier implements the storage interface.
func (s *MemorySeriesStorage) Querier() (Querier, error) {
return memorySeriesStorageQuerier{s}, nil
}
// WaitForIndexing implements Storage. // WaitForIndexing implements Storage.
func (s *MemorySeriesStorage) WaitForIndexing() { func (s *MemorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing() s.persistence.waitForIndexing()

@ -226,7 +226,13 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
if !model.LabelNameRE.MatchString(name) { if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
} }
vals, err := api.Storage.LabelValuesForLabelName(api.context(r), model.LabelName(name)) q, err := api.Storage.Querier()
if err != nil {
return nil, &apiError{errorExec, err}
}
defer q.Close()
vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name))
if err != nil { if err != nil {
return nil, &apiError{errorExec, err} return nil, &apiError{errorExec, err}
} }
@ -272,7 +278,13 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
matcherSets = append(matcherSets, matchers) matcherSets = append(matcherSets, matchers)
} }
res, err := api.Storage.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...) q, err := api.Storage.Querier()
if err != nil {
return nil, &apiError{errorExec, err}
}
defer q.Close()
res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...)
if err != nil { if err != nil {
return nil, &apiError{errorExec, err} return nil, &apiError{errorExec, err}
} }

@ -50,7 +50,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
) )
w.Header().Set("Content-Type", string(format)) w.Header().Set("Content-Type", string(format))
vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) q, err := h.storage.Querier()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer q.Close()
vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

Loading…
Cancel
Save