Browse Source

Merge pull request #1759 from prometheus/separate-querier

Separate query interface out of local.Storage.
pull/1762/head
Fabian Reinartz 9 years ago committed by GitHub
parent
commit
6f6dddb230
  1. 2
      promql/analyzer.go
  2. 4
      promql/engine.go
  3. 55
      storage/local/interface.go
  4. 2
      storage/local/storage.go
  5. 8
      storage/local/storage_test.go
  6. 4
      storage/local/test_helpers.go

2
promql/analyzer.go

@ -27,7 +27,7 @@ import (
// from the storage. It is bound to a context that allows cancellation and timing out.
type Analyzer struct {
// The storage from which to query data.
Storage local.Storage
Storage local.Querier
// The expression being analyzed.
Expr Expr
// The time range for evaluation of Expr.

4
promql/engine.go

@ -218,7 +218,7 @@ func contextDone(ctx context.Context, env string) error {
// It is connected to a storage.
type Engine struct {
// The storage on which the engine operates.
storage local.Storage
storage local.Querier
// The base context for all queries and its cancellation function.
baseCtx context.Context
@ -230,7 +230,7 @@ type Engine struct {
}
// NewEngine returns a new engine.
func NewEngine(storage local.Storage, o *EngineOptions) *Engine {
func NewEngine(storage local.Querier, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}

55
storage/local/interface.go

@ -16,27 +16,45 @@ package local
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/metric"
)
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
type Storage interface {
prometheus.Collector
// Append stores a sample in the Storage. Multiple samples for the same
// fingerprint need to be submitted in chronological order, from oldest
// to newest. When Append has returned, the appended sample might not be
// queryable immediately. (Use WaitForIndexing to wait for complete
// processing.) The implementation might remove labels with empty value
// from the provided Sample as those labels are considered equivalent to
// a label not present at all.
Append(*model.Sample) error
// NeedsThrottling returns true if the Storage has too many chunks in memory
Querier
// This SampleAppender needs multiple samples for the same fingerprint to be
// submitted in chronological order, from oldest to newest. When Append has
// returned, the appended sample might not be queryable immediately. (Use
// WaitForIndexing to wait for complete processing.) The implementation might
// remove labels with empty value from the provided Sample as those labels
// are considered equivalent to a label not present at all.
//
// Appending is throttled if the Storage has too many chunks in memory
// already or has too many chunks waiting for persistence.
NeedsThrottling() bool
storage.SampleAppender
// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}
// Querier allows querying a time series storage.
type Querier interface {
// NewPreloader returns a new Preloader which allows preloading and pinning
// series data into memory for use within a query.
NewPreloader() Preloader
@ -56,19 +74,6 @@ type Storage interface {
LastSampleForFingerprint(model.Fingerprint) model.Sample
// Get all of the label values that are associated with a given label name.
LabelValuesForLabelName(model.LabelName) model.LabelValues
// Drop all time series associated with the given fingerprints.
DropMetricsForFingerprints(...model.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.
Start() error
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
// LabelValuesForLabelName and may lag behind.
WaitForIndexing()
}
// SeriesIterator enables efficient access of sample values in a series. Its

2
storage/local/storage.go

@ -189,7 +189,7 @@ type MemorySeriesStorageOptions struct {
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage {
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage {
s := &memorySeriesStorage{
fpLocker: newFingerprintLocker(o.NumMutexes),

8
storage/local/storage_test.go

@ -718,7 +718,7 @@ func TestLoop(t *testing.T) {
storage.Append(s)
}
storage.WaitForIndexing()
series, _ := storage.(*memorySeriesStorage).fpToSeries.get(model.Metric{}.FastFingerprint())
series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint())
cdsBefore := len(series.chunkDescs)
time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
cdsAfter := len(series.chunkDescs)
@ -1497,12 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
for _, sample := range samples[start:middle] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle])
verifyStorageRandom(b, s, samples[:middle])
for _, sample := range samples[middle:end] {
s.Append(sample)
}
verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end])
verifyStorageSequential(b, s.(*memorySeriesStorage), samples)
verifyStorageRandom(b, s, samples[:end])
verifyStorageSequential(b, s, samples)
}
}

4
storage/local/test_helpers.go

@ -52,7 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
SyncStrategy: Adaptive,
}
storage := NewMemorySeriesStorage(o)
storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest
storage.archiveHighWatermark = model.Latest
if err := storage.Start(); err != nil {
directory.Close()
t.Fatalf("Error creating storage: %s", err)
@ -63,5 +63,5 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage,
directory: directory,
}
return storage.(*memorySeriesStorage), closer
return storage, closer
}

Loading…
Cancel
Save