storage: enhance Querier interface usage

This extracts Querier as an instantiateable and closeable object
rather than just defining extending methods of the storage interface.
This improves composability and allows abstracting query transactions,
which can be useful for transaction-level caches, consistent data views,
and encapsulating teardown.
pull/2079/head
Fabian Reinartz 2016-10-12 19:34:22 +02:00
parent 4f96d28e60
commit 8fa18d564a
6 changed files with 84 additions and 26 deletions

View File

@ -218,20 +218,25 @@ func contextDone(ctx context.Context, env string) error {
// Engine handles the lifetime of queries from beginning to end.
// It is connected to a querier.
type Engine struct {
// The querier on which the engine operates.
querier local.Querier
// A Querier constructor against an underlying storage.
queryable Queryable
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate
options *EngineOptions
}
// Queryable allows opening a storage querier.
type Queryable interface {
Querier() (local.Querier, error)
}
// NewEngine returns a new engine.
func NewEngine(querier local.Querier, o *EngineOptions) *Engine {
func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}
return &Engine{
querier: querier,
queryable: queryable,
gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
}
@ -351,13 +356,18 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) {
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
querier, err := ng.queryable.Querier()
if err != nil {
return nil, err
}
defer querier.Close()
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
err := ng.populateIterators(ctx, s)
err = ng.populateIterators(ctx, querier, s)
prepareTimer.Stop()
if err != nil {
return nil, err
}
defer ng.closeIterators(s)
evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start()
@ -463,20 +473,20 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
return resMatrix, nil
}
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error {
func (ng *Engine) populateIterators(ctx context.Context, querier local.Querier, s *EvalStmt) error {
var queryErr error
Inspect(s.Expr, func(node Node) bool {
switch n := node.(type) {
case *VectorSelector:
if s.Start.Equal(s.End) {
n.iterators, queryErr = ng.querier.QueryInstant(
n.iterators, queryErr = querier.QueryInstant(
ctx,
s.Start.Add(-n.Offset),
StalenessDelta,
n.LabelMatchers...,
)
} else {
n.iterators, queryErr = ng.querier.QueryRange(
n.iterators, queryErr = querier.QueryRange(
ctx,
s.Start.Add(-n.Offset-StalenessDelta),
s.End.Add(-n.Offset),
@ -487,7 +497,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error {
return false
}
case *MatrixSelector:
n.iterators, queryErr = ng.querier.QueryRange(
n.iterators, queryErr = querier.QueryRange(
ctx,
s.Start.Add(-n.Offset-n.Range),
s.End.Add(-n.Offset),

View File

@ -26,7 +26,8 @@ import (
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
Querier
// Querier returns a new Querier on the storage.
Querier() (Querier, error)
// This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has
@ -57,6 +58,9 @@ type Storage interface {
// Querier allows querying a time series storage.
type Querier interface {
// Close closes the querier. Behavior for subsequent calls to Querier methods
// is undefined.
Close() error
// QueryRange returns a list of series iterators for the selected
// time range and label matchers. The iterators need to be closed
// after usage.

View File

@ -40,23 +40,35 @@ func (s *NoopStorage) Stop() error {
func (s *NoopStorage) WaitForIndexing() {
}
// LastSampleForLabelMatchers implements Storage.
func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// Querier implements Storage.
func (s *NoopStorage) Querier() (Querier, error) {
return &NoopQuerier{}, nil
}
type NoopQuerier struct{}
// Close implements Querier.
func (s *NoopQuerier) Close() error {
return nil
}
// LastSampleForLabelMatchers implements Querier.
func (s *NoopQuerier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
return nil, nil
}
// QueryRange implements Storage.
func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
// QueryRange implements Querier
func (s *NoopQuerier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
return nil, nil
}
// QueryInstant implements Storage.
func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
// QueryInstant implements Querier.
func (s *NoopQuerier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
return nil, nil
}
// MetricsForLabelMatchers implements Storage.
func (s *NoopStorage) MetricsForLabelMatchers(
// MetricsForLabelMatchers implements Querier.
func (s *NoopQuerier) MetricsForLabelMatchers(
ctx context.Context,
from, through model.Time,
matcherSets ...metric.LabelMatchers,
@ -64,8 +76,8 @@ func (s *NoopStorage) MetricsForLabelMatchers(
return nil, nil
}
// LabelValuesForLabelName implements Storage.
func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
// LabelValuesForLabelName implements Querier.
func (s *NoopQuerier) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
return nil, nil
}

View File

@ -403,6 +403,19 @@ func (s *MemorySeriesStorage) Stop() error {
return nil
}
type memorySeriesStorageQuerier struct {
*MemorySeriesStorage
}
func (memorySeriesStorageQuerier) Close() error {
return nil
}
// Querier implements the storage interface.
func (s *MemorySeriesStorage) Querier() (Querier, error) {
return memorySeriesStorageQuerier{s}, nil
}
// WaitForIndexing implements Storage.
func (s *MemorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing()

View File

@ -226,7 +226,13 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
if !model.LabelNameRE.MatchString(name) {
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
}
vals, err := api.Storage.LabelValuesForLabelName(api.context(r), model.LabelName(name))
q, err := api.Storage.Querier()
if err != nil {
return nil, &apiError{errorExec, err}
}
defer q.Close()
vals, err := q.LabelValuesForLabelName(api.context(r), model.LabelName(name))
if err != nil {
return nil, &apiError{errorExec, err}
}
@ -272,7 +278,13 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
matcherSets = append(matcherSets, matchers)
}
res, err := api.Storage.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...)
q, err := api.Storage.Querier()
if err != nil {
return nil, &apiError{errorExec, err}
}
defer q.Close()
res, err := q.MetricsForLabelMatchers(api.context(r), start, end, matcherSets...)
if err != nil {
return nil, &apiError{errorExec, err}
}

View File

@ -50,7 +50,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
)
w.Header().Set("Content-Type", string(format))
vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...)
q, err := h.storage.Querier()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer q.Close()
vector, err := q.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return