Browse Source

Separate query interface out of local.Storage.

PromQL only requires a much narrower interface than local.Storage in
order to run queries. Narrower interfaces are easier to replace and
test, too.

We could also change the web interface to use local.Querier, except that
we'll probably use appending functions from there in the future.
pull/1759/head
Julius Volz 9 years ago
parent
commit
b7b6717438
  1. 2
      promql/analyzer.go
  2. 4
      promql/engine.go
  3. 55
      storage/local/interface.go
  4. 2
      storage/local/storage.go
  5. 8
      storage/local/storage_test.go
  6. 4
      storage/local/test_helpers.go

2
promql/analyzer.go

@ -27,7 +27,7 @@ import (
// from the storage. It is bound to a context that allows cancellation and timing out.
type Analyzer struct {
// The storage from which to query data.
Storage local.Storage
Storage local.Querier
// The expression being analyzed.
Expr Expr
// The time range for evaluation of Expr.

4
promql/engine.go

@ -218,7 +218,7 @@ func contextDone(ctx context.Context, env string) error {
// It is connected to a storage.
type Engine struct {
// The storage on which the engine operates.
storage local.Storage
storage local.Querier
// The base context for all queries and its cancellation function.
baseCtx context.Context
@ -230,7 +230,7 @@ type Engine struct {
}
// NewEngine returns a new engine.
func NewEngine(storage local.Storage, o *EngineOptions) *Engine {
func NewEngine(storage local.Querier, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}

55
storage/local/interface.go

@ -16,27 +16,45 @@ package local
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
)
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
prometheus.Collector
// Append stores a sample in the Storage. Multiple samples for the same
// fingerprint need to be submitted in chronological order, from oldest
// to newest. When Append has returned, the appended sample might not be
// queryable immediately. (Use WaitForIndexing to wait for complete
// processing.) The implementation might remove labels with empty value
// from the provided Sample as those labels are considered equivalent to
// a label not present at all.
Append(*model.Sample) error
// NeedsThrottling returns true if the Storage has too many chunks in memory
Querier
// This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has
// returned, the appended sample might not be queryable immediately. (Use
// WaitForIndexing to wait for complete processing.) The implementation might
// remove labels with empty value from the provided Sample as those labels
// are considered equivalent to a label not present at all.
//
// Appending is throttled if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
NeedsThrottling() bool
storage.SampleAppender
// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}
// Querier allows querying a time series storage.
type Querier interface {
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader
@ -56,19 +74,6 @@ type Storage interface {
LastSampleForFingerprint(model.Fingerprint) model.Sample
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues
// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}
// SeriesIterator enables efficient access of sample values in a series. Its

2
storage/local/storage.go

@ -189,7 +189,7 @@ type MemorySeriesStorageOptions struct {
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage {
s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(o.NumMutexes),

8
storage/local/storage_test.go

@ -718,7 +718,7 @@ func TestLoop(t *testing.T) {
storage.Append(s)
}
storage.WaitForIndexing()
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(model.Metric{}.FastFingerprint())
series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint())
cdsBefore := len(series.chunkDescs)
time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
cdsAfter := len(series.chunkDescs)
@ -1497,12 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
for _, sample := range samples[start:middle] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle])
verifyStorageRandom(b, s, samples[:middle])
for _, sample := range samples[middle:end] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end])
verifyStorageSequential(b, s.(*memorySeriesStorage), samples)
verifyStorageRandom(b, s, samples[:end])
verifyStorageSequential(b, s, samples)
}
}

4
storage/local/test_helpers.go

@ -52,7 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
SyncStrategy: Adaptive,
}
storage := NewMemorySeriesStorage(o)
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
storage.archiveHighWatermark = model.Latest
if err := storage.Start(); err != nil {
directory.Close()
t.Fatalf("Error creating storage: %s", err)
@ -63,5 +63,5 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
directory: directory,
}
return storage.(*memorySeriesStorage), closer
return storage, closer
}

Loading…
Cancel
Save