diff --git a/promql/analyzer.go b/promql/analyzer.go deleted file mode 100644 index a3319b211..000000000 --- a/promql/analyzer.go +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2013 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package promql - -import ( - "errors" - "time" - - "github.com/prometheus/common/model" - "golang.org/x/net/context" - - "github.com/prometheus/prometheus/storage/local" -) - -// An Analyzer traverses an expression and determines which data has to be requested -// from the storage. It is bound to a context that allows cancellation and timing out. -type Analyzer struct { - // The storage from which to query data. - Storage local.Querier - // The expression being analyzed. - Expr Expr - // The time range for evaluation of Expr. - Start, End model.Time - - // The preload times for different query time offsets. - offsetPreloadTimes map[time.Duration]preloadTimes -} - -// preloadTimes tracks which instants or ranges to preload for a set of -// fingerprints. One of these structs is collected for each offset by the query -// analyzer. -type preloadTimes struct { - // Ranges require loading a range of samples. They can be triggered by - // two type of expressions: First a range expression AKA matrix - // selector, where the Duration in the ranges map is the length of the - // range in the range expression. Second an instant expression AKA - // vector selector, where the Duration in the ranges map is the - // StalenessDelta. In preloading, both types of expressions result in - // the same effect: Preload everything between the specified start time - // minus the Duration in the ranges map up to the specified end time. - ranges map[model.Fingerprint]time.Duration - // Instants require a single sample to be loaded. This only happens for - // instant expressions AKA vector selectors iff the specified start ond - // end time are the same, Thus, instants is only populated if start and - // end time are the same. - instants map[model.Fingerprint]struct{} -} - -// Analyze the provided expression and attach metrics and fingerprints to data-selecting -// AST nodes that are later used to preload the data from the storage. -func (a *Analyzer) Analyze(ctx context.Context) error { - a.offsetPreloadTimes = map[time.Duration]preloadTimes{} - - getPreloadTimes := func(offset time.Duration) preloadTimes { - if pt, ok := a.offsetPreloadTimes[offset]; ok { - return pt - } - pt := preloadTimes{ - instants: map[model.Fingerprint]struct{}{}, - ranges: map[model.Fingerprint]time.Duration{}, - } - a.offsetPreloadTimes[offset] = pt - return pt - } - - // Retrieve fingerprints and metrics for the required time range for - // each metric or matrix selector node. - Inspect(a.Expr, func(node Node) bool { - switch n := node.(type) { - case *VectorSelector: - n.metrics = a.Storage.MetricsForLabelMatchers( - a.Start.Add(-n.Offset-StalenessDelta), a.End.Add(-n.Offset), - n.LabelMatchers..., - ) - n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) - - pt := getPreloadTimes(n.Offset) - for fp := range n.metrics { - r, alreadyInRanges := pt.ranges[fp] - if a.Start.Equal(a.End) && !alreadyInRanges { - // A true instant, we only need one value. - pt.instants[fp] = struct{}{} - continue - } - if r < StalenessDelta { - pt.ranges[fp] = StalenessDelta - } - } - case *MatrixSelector: - n.metrics = a.Storage.MetricsForLabelMatchers( - a.Start.Add(-n.Offset-n.Range), a.End.Add(-n.Offset), - n.LabelMatchers..., - ) - n.iterators = make(map[model.Fingerprint]local.SeriesIterator, len(n.metrics)) - - pt := getPreloadTimes(n.Offset) - for fp := range n.metrics { - if pt.ranges[fp] < n.Range { - pt.ranges[fp] = n.Range - // Delete the fingerprint from the instants. Ranges always contain more - // points and span more time than instants, so we don't need to track - // an instant for the same fingerprint, should we have one. - delete(pt.instants, fp) - } - } - } - return true - }) - - // Currently we do not return an error but we might place a context check in here - // or extend the stage in some other way. - return nil -} - -// Prepare the expression evaluation by preloading all required chunks from the storage -// and setting the respective storage iterators in the AST nodes. -func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { - const env = "query preparation" - - if a.offsetPreloadTimes == nil { - return nil, errors.New("analysis must be performed before preparing query") - } - var err error - // The preloader must not be closed unless an error occured as closing - // unpins the preloaded chunks. - p := a.Storage.NewPreloader() - defer func() { - if err != nil { - p.Close() - } - }() - - // Preload all analyzed ranges. - iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{} - for offset, pt := range a.offsetPreloadTimes { - itersForDuration := map[model.Fingerprint]local.SeriesIterator{} - iters[offset] = itersForDuration - start := a.Start.Add(-offset) - end := a.End.Add(-offset) - for fp, rangeDuration := range pt.ranges { - if err = contextDone(ctx, env); err != nil { - return nil, err - } - itersForDuration[fp] = p.PreloadRange(fp, start.Add(-rangeDuration), end) - } - for fp := range pt.instants { - if err = contextDone(ctx, env); err != nil { - return nil, err - } - itersForDuration[fp] = p.PreloadInstant(fp, start, StalenessDelta) - } - } - - // Attach storage iterators to AST nodes. - Inspect(a.Expr, func(node Node) bool { - switch n := node.(type) { - case *VectorSelector: - for fp := range n.metrics { - n.iterators[fp] = iters[n.Offset][fp] - } - case *MatrixSelector: - for fp := range n.metrics { - n.iterators[fp] = iters[n.Offset][fp] - } - } - return true - }) - - return p, nil -} diff --git a/promql/ast.go b/promql/ast.go index e1d62f8af..b3ccd2570 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -135,9 +135,8 @@ type MatrixSelector struct { Offset time.Duration LabelMatchers metric.LabelMatchers - // The series iterators are populated at query analysis time. - iterators map[model.Fingerprint]local.SeriesIterator - metrics map[model.Fingerprint]metric.Metric + // The series iterators are populated at query preparation time. + iterators []local.SeriesIterator } // NumberLiteral represents a number. @@ -169,9 +168,8 @@ type VectorSelector struct { Offset time.Duration LabelMatchers metric.LabelMatchers - // The series iterators are populated at query analysis time. - iterators map[model.Fingerprint]local.SeriesIterator - metrics map[model.Fingerprint]metric.Metric + // The series iterators are populated at query preparation time. + iterators []local.SeriesIterator } func (e *AggregateExpr) Type() model.ValueType { return model.ValVector } diff --git a/promql/engine.go b/promql/engine.go index 603776d9d..bb566850e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -216,10 +216,10 @@ func contextDone(ctx context.Context, env string) error { } // Engine handles the lifetime of queries from beginning to end. -// It is connected to a storage. +// It is connected to a querier. type Engine struct { - // The storage on which the engine operates. - storage local.Querier + // The querier on which the engine operates. + querier local.Querier // The base context for all queries and its cancellation function. baseCtx context.Context @@ -231,13 +231,13 @@ type Engine struct { } // NewEngine returns a new engine. -func NewEngine(storage local.Querier, o *EngineOptions) *Engine { +func NewEngine(querier local.Querier, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } ctx, cancel := context.WithCancel(context.Background()) return &Engine{ - storage: storage, + querier: querier, baseCtx: ctx, cancelQueries: cancel, gate: newQueryGate(o.MaxConcurrentQueries), @@ -309,9 +309,8 @@ func (ng *Engine) newQuery(expr Expr, start, end model.Time, interval time.Durat // of an arbitrary function during handling. It is used to test the Engine. type testStmt func(context.Context) error -func (testStmt) String() string { return "test statement" } -func (testStmt) DotGraph() string { return "test statement" } -func (testStmt) stmt() {} +func (testStmt) String() string { return "test statement" } +func (testStmt) stmt() {} func (ng *Engine) newTestQuery(f func(context.Context) error) Query { qry := &query{ @@ -365,35 +364,14 @@ func (ng *Engine) exec(q *query) (model.Value, error) { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { - prepareTimer := query.stats.GetTimer(stats.TotalQueryPreparationTime).Start() - analyzeTimer := query.stats.GetTimer(stats.QueryAnalysisTime).Start() - - // Only one execution statement per query is allowed. - analyzer := &Analyzer{ - Storage: ng.storage, - Expr: s.Expr, - Start: s.Start, - End: s.End, - } - err := analyzer.Analyze(ctx) - if err != nil { - analyzeTimer.Stop() - prepareTimer.Stop() - return nil, err - } - analyzeTimer.Stop() - - preloadTimer := query.stats.GetTimer(stats.PreloadTime).Start() - closer, err := analyzer.Prepare(ctx) - if err != nil { - preloadTimer.Stop() - prepareTimer.Stop() - return nil, err - } - defer closer.Close() - - preloadTimer.Stop() + prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() + err := ng.populateIterators(s) prepareTimer.Stop() + if err != nil { + return nil, err + } + + defer ng.closeIterators(s) evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() // Instant evaluation. @@ -498,8 +476,70 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } +func (ng *Engine) populateIterators(s *EvalStmt) error { + var queryErr error + Inspect(s.Expr, func(node Node) bool { + switch n := node.(type) { + case *VectorSelector: + var iterators []local.SeriesIterator + var err error + if s.Start.Equal(s.End) { + iterators, err = ng.querier.QueryInstant( + s.Start.Add(-n.Offset), + StalenessDelta, + n.LabelMatchers..., + ) + } else { + iterators, err = ng.querier.QueryRange( + s.Start.Add(-n.Offset-StalenessDelta), + s.End.Add(-n.Offset), + n.LabelMatchers..., + ) + } + if err != nil { + queryErr = err + return false + } + for _, it := range iterators { + n.iterators = append(n.iterators, it) + } + case *MatrixSelector: + iterators, err := ng.querier.QueryRange( + s.Start.Add(-n.Offset-n.Range), + s.End.Add(-n.Offset), + n.LabelMatchers..., + ) + if err != nil { + queryErr = err + return false + } + for _, it := range iterators { + n.iterators = append(n.iterators, it) + } + } + return true + }) + return queryErr +} + +func (ng *Engine) closeIterators(s *EvalStmt) { + Inspect(s.Expr, func(node Node) bool { + switch n := node.(type) { + case *VectorSelector: + for _, it := range n.iterators { + it.Close() + } + case *MatrixSelector: + for _, it := range n.iterators { + it.Close() + } + } + return true + }) +} + // An evaluator evaluates given expressions at a fixed timestamp. It is attached to an -// engine through which it connects to a storage and reports errors. On timeout or +// engine through which it connects to a querier and reports errors. On timeout or // cancellation of its context it terminates. type evaluator struct { ctx context.Context @@ -681,14 +721,14 @@ func (ev *evaluator) eval(expr Expr) model.Value { // vectorSelector evaluates a *VectorSelector expression. func (ev *evaluator) vectorSelector(node *VectorSelector) vector { vec := vector{} - for fp, it := range node.iterators { + for _, it := range node.iterators { refTime := ev.Timestamp.Add(-node.Offset) samplePair := it.ValueAtOrBeforeTime(refTime) if samplePair.Timestamp.Before(refTime.Add(-StalenessDelta)) { continue // Sample outside of staleness policy window. } vec = append(vec, &sample{ - Metric: node.metrics[fp], + Metric: it.Metric(), Value: samplePair.Value, Timestamp: ev.Timestamp, }) @@ -704,7 +744,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix { } sampleStreams := make([]*sampleStream, 0, len(node.iterators)) - for fp, it := range node.iterators { + for _, it := range node.iterators { samplePairs := it.RangeValues(interval) if len(samplePairs) == 0 { continue @@ -717,7 +757,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) matrix { } sampleStream := &sampleStream{ - Metric: node.metrics[fp], + Metric: it.Metric(), Values: samplePairs, } sampleStreams = append(sampleStreams, sampleStream) diff --git a/promql/engine_test.go b/promql/engine_test.go index bf74e12eb..0064c180f 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -50,7 +50,7 @@ func TestQueryConcurrency(t *testing.T) { select { case <-processing: - t.Fatalf("Query above concurrency threhosld being executed") + t.Fatalf("Query above concurrency threshold being executed") case <-time.After(20 * time.Millisecond): // Expected. } diff --git a/storage/local/interface.go b/storage/local/interface.go index 7ecc8ba9e..a02606cf7 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -38,8 +38,9 @@ type Storage interface { // already or has too many chunks waiting for persistence. storage.SampleAppender - // Drop all time series associated with the given fingerprints. - DropMetricsForFingerprints(...model.Fingerprint) + // Drop all time series associated with the given label matchers. Returns + // the number series that were dropped. + DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error) // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. @@ -55,25 +56,32 @@ type Storage interface { // Querier allows querying a time series storage. type Querier interface { - // NewPreloader returns a new Preloader which allows preloading and pinning - // series data into memory for use within a query. - NewPreloader() Preloader + // QueryRange returns a list of series iterators for the selected + // time range and label matchers. The iterators need to be closed + // after usage. + QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) + // QueryInstant returns a list of series iterators for the selected + // instant and label matchers. The iterators need to be closed after usage. + QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) // MetricsForLabelMatchers returns the metrics from storage that satisfy - // the given label matchers. At least one label matcher must be - // specified that does not match the empty string, otherwise an empty - // map is returned. The times from and through are hints for the storage - // to optimize the search. The storage MAY exclude metrics that have no - // samples in the specified interval from the returned map. In doubt, - // specify model.Earliest for from and model.Latest for through. - MetricsForLabelMatchers(from, through model.Time, matchers ...*metric.LabelMatcher) map[model.Fingerprint]metric.Metric + // the given sets of label matchers. Each set of matchers must contain at + // least one label matcher that does not match the empty string. Otherwise, + // an empty list is returned. Within one set of matchers, the intersection + // of matching series is computed. The final return value will be the union + // of the per-set results. The times from and through are hints for the + // storage to optimize the search. The storage MAY exclude metrics that + // have no samples in the specified interval from the returned map. In + // doubt, specify model.Earliest for from and model.Latest for through. + MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) // LastSampleForFingerprint returns the last sample that has been - // ingested for the provided fingerprint. If this instance of the + // ingested for the given sets of label matchers. If this instance of the // Storage has never ingested a sample for the provided fingerprint (or // the last ingestion is so long ago that the series has been archived), - // ZeroSample is returned. - LastSampleForFingerprint(model.Fingerprint) model.Sample + // ZeroSample is returned. The label matching behavior is the same as in + // MetricsForLabelMatchers. + LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) // Get all of the label values that are associated with a given label name. - LabelValuesForLabelName(model.LabelName) model.LabelValues + LabelValuesForLabelName(model.LabelName) (model.LabelValues, error) } // SeriesIterator enables efficient access of sample values in a series. Its @@ -88,21 +96,9 @@ type SeriesIterator interface { ValueAtOrBeforeTime(model.Time) model.SamplePair // Gets all values contained within a given interval. RangeValues(metric.Interval) []model.SamplePair -} - -// A Preloader preloads series data necessary for a query into memory, pins it -// until released via Close(), and returns an iterator for the pinned data. Its -// methods are generally not goroutine-safe. -type Preloader interface { - PreloadRange( - fp model.Fingerprint, - from, through model.Time, - ) SeriesIterator - PreloadInstant( - fp model.Fingerprint, - timestamp model.Time, stalenessDelta time.Duration, - ) SeriesIterator - // Close unpins any previously requested series data from memory. + // Returns the metric of the series that the iterator corresponds to. + Metric() metric.Metric + // Closes the iterator and releases the underlying data. Close() } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index ba15e7a2f..3618a9c60 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -343,13 +343,13 @@ func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerp // name. This method is goroutine-safe but take into account that metrics queued // for indexing with IndexMetric might not have made it into the index // yet. (Same applies correspondingly to UnindexMetric.) -func (p *persistence) labelValuesForLabelName(ln model.LabelName) model.LabelValues { +func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) { lvs, _, err := p.labelNameToLabelValues.Lookup(ln) if err != nil { p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err)) - return nil + return nil, err } - return lvs + return lvs, nil } // persistChunks persists a number of consecutive chunks of a series. It is the diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index f3f4dcea8..a111c6109 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -1045,7 +1045,10 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet // Compare label name -> label values mappings. for ln, lvs := range b.expectedLnToLvs { - outLvs := p.labelValuesForLabelName(ln) + outLvs, err := p.labelValuesForLabelName(ln) + if err != nil { + t.Fatal(err) + } outSet := codable.LabelValueSet{} for _, lv := range outLvs { diff --git a/storage/local/preload.go b/storage/local/preload.go deleted file mode 100644 index 8e11de3ea..000000000 --- a/storage/local/preload.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package local - -import ( - "time" - - "github.com/prometheus/common/model" -) - -// memorySeriesPreloader is a Preloader for the MemorySeriesStorage. -type memorySeriesPreloader struct { - storage *MemorySeriesStorage - pinnedChunkDescs []*chunkDesc -} - -// PreloadRange implements Preloader. -func (p *memorySeriesPreloader) PreloadRange( - fp model.Fingerprint, - from model.Time, through model.Time, -) SeriesIterator { - cds, iter := p.storage.preloadChunksForRange(fp, from, through) - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter -} - -// PreloadInstant implements Preloader -func (p *memorySeriesPreloader) PreloadInstant( - fp model.Fingerprint, - timestamp model.Time, stalenessDelta time.Duration, -) SeriesIterator { - cds, iter := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp) - p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) - return iter -} - -// Close implements Preloader. -func (p *memorySeriesPreloader) Close() { - for _, cd := range p.pinnedChunkDescs { - cd.unpin(p.storage.evictRequests) - } - chunkOps.WithLabelValues(unpin).Add(float64(len(p.pinnedChunkDescs))) - -} diff --git a/storage/local/series.go b/storage/local/series.go index 2993cfe08..b24aa122f 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -340,7 +340,7 @@ func (s *memorySeries) dropChunks(t model.Time) error { // preloadChunks is an internal helper method. func (s *memorySeries) preloadChunks( indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, -) ([]*chunkDesc, SeriesIterator, error) { +) (SeriesIterator, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { @@ -364,7 +364,7 @@ func (s *memorySeries) preloadChunks( cd.unpin(mss.evictRequests) } chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs))) - return nil, nopIter, err + return nopIter, err } for i, c := range chunks { s.chunkDescs[loadIndexes[i]].setChunk(c) @@ -380,18 +380,22 @@ func (s *memorySeries) preloadChunks( } iter := &boundedIterator{ - it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries), + it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests), start: model.Now().Add(-mss.dropAfter), } - return pinnedChunkDescs, iter, nil + return iter, nil } // newIterator returns a new SeriesIterator for the provided chunkDescs (which // must be pinned). // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine func(error)) SeriesIterator { +func (s *memorySeries) newIterator( + pinnedChunkDescs []*chunkDesc, + quarantine func(error), + evictRequests chan<- evictRequest, +) SeriesIterator { chunks := make([]chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the @@ -399,9 +403,12 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun chunks = append(chunks, cd.c) } return &memorySeriesIterator{ - chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), - quarantine: quarantine, + chunks: chunks, + chunkIts: make([]chunkIterator, len(chunks)), + quarantine: quarantine, + metric: s.metric, + pinnedChunkDescs: pinnedChunkDescs, + evictRequests: evictRequests, } } @@ -413,7 +420,7 @@ func (s *memorySeries) preloadChunksForInstant( fp model.Fingerprint, from model.Time, through model.Time, mss *MemorySeriesStorage, -) ([]*chunkDesc, SeriesIterator, error) { +) (SeriesIterator, error) { // If we have a lastSamplePair in the series, and thas last samplePair // is in the interval, just take it in a singleSampleSeriesIterator. No // need to pin or load anything. @@ -422,10 +429,13 @@ func (s *memorySeries) preloadChunksForInstant( !from.After(lastSample.Timestamp) && lastSample != ZeroSamplePair { iter := &boundedIterator{ - it: &singleSampleSeriesIterator{samplePair: lastSample}, + it: &singleSampleSeriesIterator{ + samplePair: lastSample, + metric: s.metric, + }, start: model.Now().Add(-mss.dropAfter), } - return nil, iter, nil + return iter, nil } // If we are here, we are out of luck and have to delegate to the more // expensive method. @@ -438,7 +448,7 @@ func (s *memorySeries) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, mss *MemorySeriesStorage, -) ([]*chunkDesc, SeriesIterator, error) { +) (SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { firstChunkDescTime = s.chunkDescs[0].firstTime() @@ -446,7 +456,7 @@ func (s *memorySeries) preloadChunksForRange( if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) { cds, err := mss.loadChunkDescs(fp, s.persistWatermark) if err != nil { - return nil, nopIter, err + return nopIter, err } s.chunkDescs = append(cds, s.chunkDescs...) s.chunkDescsOffset = 0 @@ -455,7 +465,7 @@ func (s *memorySeries) preloadChunksForRange( } if len(s.chunkDescs) == 0 || through.Before(firstChunkDescTime) { - return nil, nopIter, nil + return nopIter, nil } // Find first chunk with start time after "from". @@ -471,10 +481,10 @@ func (s *memorySeries) preloadChunksForRange( // series ends before "from" and we don't need to do anything. lt, err := s.chunkDescs[len(s.chunkDescs)-1].lastTime() if err != nil { - return nil, nopIter, err + return nopIter, err } if lt.Before(from) { - return nil, nopIter, nil + return nopIter, nil } } if fromIdx > 0 { @@ -547,10 +557,20 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { - chunkIt chunkIterator // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIts []chunkIterator // Caches chunkIterators. - chunks []chunk - quarantine func(error) // Call to quarantine the series this iterator belongs to. + // Last chunkIterator used by ValueAtOrBeforeTime. + chunkIt chunkIterator + // Caches chunkIterators. + chunkIts []chunkIterator + // The actual sample chunks. + chunks []chunk + // Call to quarantine the series this iterator belongs to. + quarantine func(error) + // The metric corresponding to the iterator. + metric model.Metric + // Chunks that were pinned for this iterator. + pinnedChunkDescs []*chunkDesc + // Where to send evict requests when unpinning pinned chunks. + evictRequests chan<- evictRequest } // ValueAtOrBeforeTime implements SeriesIterator. @@ -630,6 +650,10 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa return values } +func (it *memorySeriesIterator) Metric() metric.Metric { + return metric.Metric{Metric: it.metric} +} + // chunkIterator returns the chunkIterator for the chunk at position i (and // creates it if needed). func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { @@ -641,11 +665,19 @@ func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { return chunkIt } +func (it *memorySeriesIterator) Close() { + for _, cd := range it.pinnedChunkDescs { + cd.unpin(it.evictRequests) + } + chunkOps.WithLabelValues(unpin).Add(float64(len(it.pinnedChunkDescs))) +} + // singleSampleSeriesIterator implements Series Iterator. It is a "shortcut -// iterator" that returns a single samplee only. The sample is saved in the +// iterator" that returns a single sample only. The sample is saved in the // iterator itself, so no chunks need to be pinned. type singleSampleSeriesIterator struct { samplePair model.SamplePair + metric model.Metric } // ValueAtTime implements SeriesIterator. @@ -665,6 +697,13 @@ func (it *singleSampleSeriesIterator) RangeValues(in metric.Interval) []model.Sa return []model.SamplePair{it.samplePair} } +func (it *singleSampleSeriesIterator) Metric() metric.Metric { + return metric.Metric{Metric: it.metric} +} + +// Close implements SeriesIterator. +func (it *singleSampleSeriesIterator) Close() {} + // nopSeriesIterator implements Series Iterator. It never returns any values. type nopSeriesIterator struct{} @@ -678,4 +717,12 @@ func (i nopSeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { return []model.SamplePair{} } +// Metric implements SeriesIterator. +func (i nopSeriesIterator) Metric() metric.Metric { + return metric.Metric{} +} + +// Close implements SeriesIterator. +func (i nopSeriesIterator) Close() {} + var nopIter nopSeriesIterator // A nopSeriesIterator for convenience. Can be shared. diff --git a/storage/local/storage.go b/storage/local/storage.go index e4308f96e..ef4ff89ed 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -142,6 +142,9 @@ const ( // synced or not. It does not need to be goroutine safe. type syncStrategy func() bool +// A MemorySeriesStorage manages series in memory over time, while also +// interfacing with a persistence layer to make time series data persistent +// across restarts and evictable from memory. type MemorySeriesStorage struct { // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. archiveHighWatermark model.Time // No archived series has samples after this time. @@ -409,21 +412,38 @@ func (s *MemorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() } -// LastSampleForFingerprint implements Storage. -func (s *MemorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { - s.fpLocker.Lock(fp) - defer s.fpLocker.Unlock(fp) +// LastSampleForLabelMatchers implements Storage. +func (s *MemorySeriesStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { + fps := map[model.Fingerprint]struct{}{} + for _, matchers := range matcherSets { + fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) + if err != nil { + return nil, err + } + for fp := range fpToMetric { + fps[fp] = struct{}{} + } + } - series, ok := s.fpToSeries.get(fp) - if !ok { - return ZeroSample - } - sp := series.lastSamplePair() - return model.Sample{ - Metric: series.metric, - Value: sp.Value, - Timestamp: sp.Timestamp, + res := make(model.Vector, 0, len(fps)) + for fp := range fps { + s.fpLocker.Lock(fp) + + series, ok := s.fpToSeries.get(fp) + if !ok { + // A series could have disappeared between resolving label matchers and here. + s.fpLocker.Unlock(fp) + continue + } + sp := series.lastSamplePair() + res = append(res, &model.Sample{ + Metric: series.metric, + Value: sp.Value, + Timestamp: sp.Timestamp, + }) + s.fpLocker.Unlock(fp) } + return res, nil } // boundedIterator wraps a SeriesIterator and does not allow fetching @@ -452,11 +472,45 @@ func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.Sample return bit.it.RangeValues(interval) } -// NewPreloader implements Storage. -func (s *MemorySeriesStorage) NewPreloader() Preloader { - return &memorySeriesPreloader{ - storage: s, +// Metric implements SeriesIterator. +func (bit *boundedIterator) Metric() metric.Metric { + return bit.it.Metric() +} + +// Close implements SeriesIterator. +func (bit *boundedIterator) Close() { + bit.it.Close() +} + +// QueryRange implements Storage. +func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { + fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + if err != nil { + return nil, err } + iterators := make([]SeriesIterator, 0, len(fpToMetric)) + for fp := range fpToMetric { + it := s.preloadChunksForRange(fp, from, through) + iterators = append(iterators, it) + } + return iterators, nil +} + +// QueryInstant implements Storage. +func (s *MemorySeriesStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { + from := ts.Add(-stalenessDelta) + through := ts + + fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) + if err != nil { + return nil, err + } + iterators := make([]SeriesIterator, 0, len(fpToMetric)) + for fp := range fpToMetric { + it := s.preloadChunksForInstant(fp, from, through) + iterators = append(iterators, it) + } + return iterators, nil } // fingerprintsForLabelPair returns the fingerprints with the given @@ -486,14 +540,36 @@ func (s *MemorySeriesStorage) fingerprintsForLabelPair( // MetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) MetricsForLabelMatchers( + from, through model.Time, + matcherSets ...metric.LabelMatchers, +) ([]metric.Metric, error) { + fpToMetric := map[model.Fingerprint]metric.Metric{} + for _, matchers := range matcherSets { + metrics, err := s.metricsForLabelMatchers(from, through, matchers...) + if err != nil { + return nil, err + } + for fp, m := range metrics { + fpToMetric[fp] = m + } + } + + metrics := make([]metric.Metric, 0, len(fpToMetric)) + for _, m := range fpToMetric { + metrics = append(metrics, m) + } + return metrics, nil +} + +func (s *MemorySeriesStorage) metricsForLabelMatchers( from, through model.Time, matchers ...*metric.LabelMatcher, -) map[model.Fingerprint]metric.Metric { +) (map[model.Fingerprint]metric.Metric, error) { sort.Sort(metric.LabelMatchers(matchers)) if len(matchers) == 0 || matchers[0].MatchesEmptyString() { // No matchers at all or even the best matcher matches the empty string. - return nil + return nil, nil } var ( @@ -516,7 +592,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( remainingFPs, ) if len(remainingFPs) == 0 { - return nil + return nil, nil } } @@ -526,9 +602,14 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( if m.MatchesEmptyString() { break } - lvs := m.Filter(s.LabelValuesForLabelName(m.Name)) + + lvs, err := s.LabelValuesForLabelName(m.Name) + if err != nil { + return nil, err + } + lvs = m.Filter(lvs) if len(lvs) == 0 { - return nil + return nil, nil } fps := map[model.Fingerprint]struct{}{} for _, lv := range lvs { @@ -543,7 +624,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( } remainingFPs = fps if len(remainingFPs) == 0 { - return nil + return nil, nil } } @@ -562,7 +643,7 @@ func (s *MemorySeriesStorage) MetricsForLabelMatchers( } } } - return result + return result, nil } // metricForRange returns the metric for the given fingerprint if the @@ -612,15 +693,20 @@ func (s *MemorySeriesStorage) metricForRange( } // LabelValuesForLabelName implements Storage. -func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { +func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) { return s.persistence.labelValuesForLabelName(labelName) } -// DropMetric implements Storage. -func (s *MemorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { - for _, fp := range fps { +// DropMetricsForLabelMatchers implements Storage. +func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) { + fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) + if err != nil { + return 0, err + } + for fp := range fpToMetric { s.purgeSeries(fp, nil, nil) } + return len(fpToMetric), nil } var ( @@ -802,39 +888,39 @@ func (s *MemorySeriesStorage) seriesForRange( func (s *MemorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, -) ([]*chunkDesc, SeriesIterator) { +) SeriesIterator { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series := s.seriesForRange(fp, from, through) if series == nil { - return nil, nopIter + return nopIter } - cds, iter, err := series.preloadChunksForRange(fp, from, through, s) + iter, err := series.preloadChunksForRange(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) - return nil, nopIter + return nopIter } - return cds, iter + return iter } func (s *MemorySeriesStorage) preloadChunksForInstant( fp model.Fingerprint, from model.Time, through model.Time, -) ([]*chunkDesc, SeriesIterator) { +) SeriesIterator { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) series := s.seriesForRange(fp, from, through) if series == nil { - return nil, nopIter + return nopIter } - cds, iter, err := series.preloadChunksForInstant(fp, from, through, s) + iter, err := series.preloadChunksForInstant(fp, from, through, s) if err != nil { s.quarantineSeries(fp, series.metric, err) - return nil, nopIter + return nopIter } - return cds, iter + return iter } func (s *MemorySeriesStorage) handleEvictList() { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 74694023d..b5025827f 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -193,14 +193,18 @@ func TestMatches(t *testing.T) { } for _, mt := range matcherTests { - res := storage.MetricsForLabelMatchers( + metrics, err := storage.MetricsForLabelMatchers( model.Earliest, model.Latest, - mt.matchers..., + mt.matchers, ) - if len(mt.expected) != len(res) { - t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(res)) + if err != nil { + t.Fatal(err) } - for fp1 := range res { + if len(mt.expected) != len(metrics) { + t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(metrics)) + } + for _, m := range metrics { + fp1 := m.Metric.FastFingerprint() found := false for _, fp2 := range mt.expected { if fp1 == fp2 { @@ -213,16 +217,24 @@ func TestMatches(t *testing.T) { } } // Smoketest for from/through. - if len(storage.MetricsForLabelMatchers( + metrics, err = storage.MetricsForLabelMatchers( model.Earliest, -10000, - mt.matchers..., - )) > 0 { + mt.matchers, + ) + if err != nil { + t.Fatal(err) + } + if len(metrics) > 0 { t.Error("expected no matches with 'through' older than any sample") } - if len(storage.MetricsForLabelMatchers( + metrics, err = storage.MetricsForLabelMatchers( 10000, model.Latest, - mt.matchers..., - )) > 0 { + mt.matchers, + ) + if err != nil { + t.Fatal(err) + } + if len(metrics) > 0 { t.Error("expected no matches with 'from' newer than any sample") } // Now the tricky one, cut out something from the middle. @@ -230,10 +242,13 @@ func TestMatches(t *testing.T) { from model.Time = 25 through model.Time = 75 ) - res = storage.MetricsForLabelMatchers( + metrics, err = storage.MetricsForLabelMatchers( from, through, - mt.matchers..., + mt.matchers, ) + if err != nil { + t.Fatal(err) + } expected := model.Fingerprints{} for _, fp := range mt.expected { i := 0 @@ -246,10 +261,11 @@ func TestMatches(t *testing.T) { expected = append(expected, fp) } } - if len(expected) != len(res) { - t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res)) + if len(expected) != len(metrics) { + t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(metrics)) } - for fp1 := range res { + for _, m := range metrics { + fp1 := m.Metric.FastFingerprint() found := false for _, fp2 := range expected { if fp1 == fp2 { @@ -348,7 +364,7 @@ func TestFingerprintsForLabels(t *testing.T) { } } -var benchLabelMatchingRes map[model.Fingerprint]metric.Metric +var benchLabelMatchingRes []metric.Metric func BenchmarkLabelMatching(b *testing.B) { s, closer := NewTestStorage(b, 2) @@ -430,13 +446,17 @@ func BenchmarkLabelMatching(b *testing.B) { b.ReportAllocs() b.ResetTimer() + var err error for i := 0; i < b.N; i++ { - benchLabelMatchingRes = map[model.Fingerprint]metric.Metric{} + benchLabelMatchingRes = []metric.Metric{} for _, mt := range matcherTests { - benchLabelMatchingRes = s.MetricsForLabelMatchers( + benchLabelMatchingRes, err = s.MetricsForLabelMatchers( model.Earliest, model.Latest, - mt..., + mt, ) + if err != nil { + b.Fatal(err) + } } } // Stop timer to not count the storage closing. @@ -469,26 +489,25 @@ func TestRetentionCutoff(t *testing.T) { } s.WaitForIndexing() - var fp model.Fingerprint - for f := range s.fingerprintsForLabelPair(model.LabelPair{ - Name: "job", Value: "test", - }, nil, nil) { - fp = f - break + lm, err := metric.NewLabelMatcher(metric.Equal, "job", "test") + if err != nil { + t.Fatalf("error creating label matcher: %s", err) + } + its, err := s.QueryRange(insertStart, now, lm) + if err != nil { + t.Fatal(err) } - pl := s.NewPreloader() - defer pl.Close() + if len(its) != 1 { + t.Fatalf("expected one iterator but got %d", len(its)) + } - // Preload everything. - it := pl.PreloadRange(fp, insertStart, now) - - val := it.ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) + val := its[0].ValueAtOrBeforeTime(now.Add(-61 * time.Minute)) if val.Timestamp != model.Earliest { t.Errorf("unexpected result for timestamp before retention period") } - vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) + vals := its[0].RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}) // We get 59 values here because the model.Now() is slightly later // than our now. if len(vals) != 59 { @@ -522,6 +541,15 @@ func TestDropMetrics(t *testing.T) { m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"} + lm1, err := metric.NewLabelMatcher(metric.Equal, "n1", "v1") + if err != nil { + t.Fatal(err) + } + lmAll, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "test") + if err != nil { + t.Fatal(err) + } + N := 120000 for j, m := range []model.Metric{m1, m2, m3} { @@ -553,7 +581,13 @@ func TestDropMetrics(t *testing.T) { fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} - s.DropMetricsForFingerprints(fpList[0]) + n, err := s.DropMetricsForLabelMatchers(lm1) + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatalf("expected 1 series to be dropped, got %d", n) + } s.WaitForIndexing() fps2 := s.fingerprintsForLabelPair(model.LabelPair{ @@ -563,12 +597,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps2)) } - _, it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -580,7 +614,13 @@ func TestDropMetrics(t *testing.T) { t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } - s.DropMetricsForFingerprints(fpList...) + n, err = s.DropMetricsForLabelMatchers(lmAll) + if err != nil { + t.Fatal(err) + } + if n != 2 { + t.Fatalf("expected 2 series to be dropped, got %d", n) + } s.WaitForIndexing() fps3 := s.fingerprintsForLabelPair(model.LabelPair{ @@ -590,12 +630,12 @@ func TestDropMetrics(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps3)) } - _, it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) + it = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } - _, it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) + it = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest) if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { t.Errorf("unexpected number of samples: %d", len(vals)) } @@ -672,10 +712,9 @@ func TestQuarantineMetric(t *testing.T) { t.Errorf("unexpected number of fingerprints: %d", len(fps)) } - pl := s.NewPreloader() // This will access the corrupt file and lead to quarantining. - pl.PreloadInstant(fpToBeArchived, now.Add(-2*time.Hour), time.Minute) - pl.Close() + iter := s.preloadChunksForInstant(fpToBeArchived, now.Add(-2*time.Hour-1*time.Minute), now.Add(-2*time.Hour)) + iter.Close() time.Sleep(time.Second) // Give time to quarantine. TODO(beorn7): Find a better way to wait. s.WaitForIndexing() @@ -816,7 +855,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Exactly on a sample. for i, expected := range samples { @@ -894,7 +933,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -976,7 +1015,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) // #1 Zero length interval at sample. for i, expected := range samples { @@ -1132,7 +1171,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { fp := model.Metric{}.FastFingerprint() - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) b.ResetTimer() @@ -1182,7 +1221,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop ~half of the chunks. s.maintainMemorySeries(fp, 10000) - _, it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it := s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual := it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1200,7 +1239,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // Drop everything. s.maintainMemorySeries(fp, 100000) - _, it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) + it = s.preloadChunksForRange(fp, model.Earliest, model.Latest) actual = it.RangeValues(metric.Interval{ OldestInclusive: 0, NewestInclusive: 100000, @@ -1364,14 +1403,13 @@ func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { } // Load everything back. - p := s.NewPreloader() - p.PreloadRange(fp, 0, 100000) + it := s.preloadChunksForRange(fp, 0, 100000) if oldLen != len(series.chunkDescs) { t.Errorf("Expected number of chunkDescs to have reached old value again, old number %d, current number %d.", oldLen, len(series.chunkDescs)) } - p.Close() + it.Close() // Now maintain series with drops to make sure nothing crazy happens. s.maintainMemorySeries(fp, 100000) @@ -1693,8 +1731,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam for _, i := range rand.Perm(len(samples)) { sample := samples[i] fp := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) - p := s.NewPreloader() - it := p.PreloadInstant(fp, sample.Timestamp, 0) + it := s.preloadChunksForInstant(fp, sample.Timestamp, sample.Timestamp) found := it.ValueAtOrBeforeTime(sample.Timestamp) startTime := it.(*boundedIterator).start switch { @@ -1713,7 +1750,7 @@ func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Sam ) result = false } - p.Close() + it.Close() } return result } @@ -1723,21 +1760,21 @@ func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model var ( result = true fp model.Fingerprint - p = s.NewPreloader() it SeriesIterator r []model.SamplePair j int ) defer func() { - p.Close() + it.Close() }() for i, sample := range samples { newFP := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) if it == nil || newFP != fp { fp = newFP - p.Close() - p = s.NewPreloader() - it = p.PreloadRange(fp, sample.Timestamp, model.Latest) + if it != nil { + it.Close() + } + it = s.preloadChunksForRange(fp, sample.Timestamp, model.Latest) r = it.RangeValues(metric.Interval{ OldestInclusive: sample.Timestamp, NewestInclusive: model.Latest, @@ -1858,10 +1895,8 @@ func TestAppendOutOfOrder(t *testing.T) { fp := s.mapper.mapFP(m.FastFingerprint(), m) - pl := s.NewPreloader() - defer pl.Close() - - it := pl.PreloadRange(fp, 0, 2) + it := s.preloadChunksForRange(fp, 0, 2) + defer it.Close() want := []model.SamplePair{ { diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index 6d92361a0..3d7ad0e83 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -21,19 +21,10 @@ type QueryTiming int const ( TotalEvalTime QueryTiming = iota ResultSortTime - JSONEncodeTime - PreloadTime - TotalQueryPreparationTime - InnerViewBuildingTime + QueryPreparationTime InnerEvalTime ResultAppendTime - QueryAnalysisTime - GetValueAtTimeTime - GetRangeValuesTime ExecQueueTime - ViewDiskPreparationTime - ViewDataExtractionTime - ViewDiskExtractionTime ) // Return a string representation of a QueryTiming identifier. @@ -43,32 +34,14 @@ func (s QueryTiming) String() string { return "Total eval time" case ResultSortTime: return "Result sorting time" - case JSONEncodeTime: - return "JSON encoding time" - case PreloadTime: - return "Query preloading time" - case TotalQueryPreparationTime: - return "Total query preparation time" - case InnerViewBuildingTime: - return "Inner view building time" + case QueryPreparationTime: + return "Query preparation time" case InnerEvalTime: return "Inner eval time" case ResultAppendTime: return "Result append time" - case QueryAnalysisTime: - return "Query analysis time" - case GetValueAtTimeTime: - return "GetValueAtTime() time" - case GetRangeValuesTime: - return "GetRangeValues() time" case ExecQueueTime: return "Exec queue wait time" - case ViewDiskPreparationTime: - return "View building disk preparation time" - case ViewDataExtractionTime: - return "Total view data extraction time" - case ViewDiskExtractionTime: - return "View disk data extraction time" default: return "Unknown query timing" } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index bce83745e..72b19893b 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -221,7 +221,10 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - vals := api.Storage.LabelValuesForLabelName(model.LabelName(name)) + vals, err := api.Storage.LabelValuesForLabelName(model.LabelName(name)) + if err != nil { + return nil, &apiError{errorExec, err} + } sort.Sort(vals) return vals, nil @@ -255,19 +258,18 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { end = model.Latest } - res := map[model.Fingerprint]metric.Metric{} - - for _, lm := range r.Form["match[]"] { - matchers, err := promql.ParseMetricSelector(lm) + var matcherSets []metric.LabelMatchers + for _, s := range r.Form["match[]"] { + matchers, err := promql.ParseMetricSelector(s) if err != nil { return nil, &apiError{errorBadData, err} } - for fp, met := range api.Storage.MetricsForLabelMatchers( - start, end, - matchers..., - ) { - res[fp] = met - } + matcherSets = append(matcherSets, matchers) + } + + res, err := api.Storage.MetricsForLabelMatchers(start, end, matcherSets...) + if err != nil { + return nil, &apiError{errorExec, err} } metrics := make([]model.Metric, 0, len(res)) @@ -282,28 +284,24 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if len(r.Form["match[]"]) == 0 { return nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} } - fps := map[model.Fingerprint]struct{}{} - for _, lm := range r.Form["match[]"] { - matchers, err := promql.ParseMetricSelector(lm) + numDeleted := 0 + for _, s := range r.Form["match[]"] { + matchers, err := promql.ParseMetricSelector(s) if err != nil { return nil, &apiError{errorBadData, err} } - for fp := range api.Storage.MetricsForLabelMatchers( - model.Earliest, model.Latest, // Get every series. - matchers..., - ) { - fps[fp] = struct{}{} + n, err := api.Storage.DropMetricsForLabelMatchers(matchers...) + if err != nil { + return nil, &apiError{errorExec, err} } - } - for fp := range fps { - api.Storage.DropMetricsForFingerprints(fp) + numDeleted += n } res := struct { NumDeleted int `json:"numDeleted"` }{ - NumDeleted: len(fps), + NumDeleted: numDeleted, } return res, nil } diff --git a/web/federate.go b/web/federate.go index 26f4710eb..593fe39cf 100644 --- a/web/federate.go +++ b/web/federate.go @@ -17,13 +17,12 @@ import ( "net/http" "github.com/golang/protobuf/proto" - - "github.com/prometheus/prometheus/promql" - + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" - dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage/metric" ) func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { @@ -32,20 +31,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { req.ParseForm() - fps := map[model.Fingerprint]struct{}{} - + var matcherSets []metric.LabelMatchers for _, s := range req.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - for fp := range h.storage.MetricsForLabelMatchers( - model.Now().Add(-promql.StalenessDelta), model.Latest, - matchers..., - ) { - fps[fp] = struct{}{} - } + matcherSets = append(matcherSets, matchers) } var ( @@ -64,15 +57,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { Type: dto.MetricType_UNTYPED.Enum(), } - for fp := range fps { + vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + for _, s := range vector { globalUsed := map[model.LabelName]struct{}{} - s := h.storage.LastSampleForFingerprint(fp) - // Discard if sample does not exist or lays before the staleness interval. - if s.Timestamp.Before(minTimestamp) { - continue - } - // Reset label slice. protMetric.Label = protMetric.Label[:0]