mirror of https://github.com/prometheus/prometheus
Add context argument to Querier.Select (#12660)
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>pull/9278/head
parent
aa7bf083e9
commit
6daee89e5f
|
@ -1378,17 +1378,17 @@ func (s *readyStorage) StartTime() (int64, error) {
|
|||
}
|
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (s *readyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.Querier(ctx, mint, maxt)
|
||||
return x.Querier(mint, maxt)
|
||||
}
|
||||
return nil, tsdb.ErrNotReady
|
||||
}
|
||||
|
||||
// ChunkQuerier implements the Storage interface.
|
||||
func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (s *readyStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
if x := s.get(); x != nil {
|
||||
return x.ChunkQuerier(ctx, mint, maxt)
|
||||
return x.ChunkQuerier(mint, maxt)
|
||||
}
|
||||
return nil, tsdb.ErrNotReady
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ func sortSamples(samples []backfillSample) {
|
|||
}
|
||||
|
||||
func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMaxTime int64) []backfillSample { // nolint:revive
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
samples := []backfillSample{}
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
|
@ -67,7 +67,7 @@ func testBlocks(t *testing.T, db *tsdb.DB, expectedMinTime, expectedMaxTime, exp
|
|||
require.Equal(t, block.MinTime()/expectedBlockDuration, (block.MaxTime()-1)/expectedBlockDuration, "block %d contains data outside of one aligned block duration", i)
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, q.Close())
|
||||
|
|
|
@ -86,6 +86,8 @@ func main() {
|
|||
httpConfigFilePath string
|
||||
)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
app := kingpin.New(filepath.Base(os.Args[0]), "Tooling for the Prometheus monitoring system.").UsageWriter(os.Stdout)
|
||||
app.Version(version.Print("promtool"))
|
||||
app.HelpFlag.Short('h')
|
||||
|
@ -376,7 +378,7 @@ func main() {
|
|||
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
||||
|
||||
case tsdbDumpCmd.FullCommand():
|
||||
os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch)))
|
||||
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch)))
|
||||
// TODO(aSquare14): Work on adding support for custom block size.
|
||||
case openMetricsImportCmd.FullCommand():
|
||||
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
|
||||
|
|
|
@ -124,10 +124,10 @@ func TestBackfillRuleIntegration(t *testing.T) {
|
|||
blocks := db.Blocks()
|
||||
require.Equal(t, (i+1)*tt.expectedBlockCount, len(blocks))
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
selectedSeries := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
var seriesCount, samplesCount int
|
||||
for selectedSeries.Next() {
|
||||
seriesCount++
|
||||
|
@ -248,11 +248,11 @@ func TestBackfillLabels(t *testing.T) {
|
|||
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("correct-labels", func(t *testing.T) {
|
||||
selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
selectedSeries := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
for selectedSeries.Next() {
|
||||
series := selectedSeries.At()
|
||||
expectedLabels := labels.FromStrings("__name__", "rulename", "name1", "value-from-rule")
|
||||
|
|
|
@ -619,7 +619,7 @@ func analyzeCompaction(block tsdb.BlockReader, indexr tsdb.IndexReader) (err err
|
|||
return nil
|
||||
}
|
||||
|
||||
func dumpSamples(path string, mint, maxt int64, match string) (err error) {
|
||||
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match string) (err error) {
|
||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -627,7 +627,7 @@ func dumpSamples(path string, mint, maxt int64, match string) (err error) {
|
|||
defer func() {
|
||||
err = tsdb_errors.NewMulti(err, db.Close()).Err()
|
||||
}()
|
||||
q, err := db.Querier(context.TODO(), mint, maxt)
|
||||
q, err := db.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -637,7 +637,7 @@ func dumpSamples(path string, mint, maxt int64, match string) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ss := q.Select(false, nil, matchers...)
|
||||
ss := q.Select(ctx, false, nil, matchers...)
|
||||
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
|
|
|
@ -669,14 +669,14 @@ func durationMilliseconds(d time.Duration) int64 {
|
|||
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) {
|
||||
prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime)
|
||||
mint, maxt := ng.findMinMaxTime(s)
|
||||
querier, err := query.queryable.Querier(ctxPrepare, mint, maxt)
|
||||
querier, err := query.queryable.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
prepareSpanTimer.Finish()
|
||||
return nil, nil, err
|
||||
}
|
||||
defer querier.Close()
|
||||
|
||||
ng.populateSeries(querier, s)
|
||||
ng.populateSeries(ctxPrepare, querier, s)
|
||||
prepareSpanTimer.Finish()
|
||||
|
||||
// Modify the offset of vector and matrix selectors for the @ modifier
|
||||
|
@ -890,7 +890,7 @@ func (ng *Engine) getLastSubqueryInterval(path []parser.Node) time.Duration {
|
|||
return interval
|
||||
}
|
||||
|
||||
func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
|
||||
func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) {
|
||||
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
|
||||
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
|
||||
// the variable.
|
||||
|
@ -913,7 +913,7 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
|
|||
}
|
||||
evalRange = 0
|
||||
hints.By, hints.Grouping = extractGroupsFromPath(path)
|
||||
n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...)
|
||||
n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...)
|
||||
|
||||
case *parser.MatrixSelector:
|
||||
evalRange = n.Range
|
||||
|
|
|
@ -194,7 +194,7 @@ type errQuerier struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
func (q *errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return errSeriesSet{err: q.err}
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ func TestQueryError(t *testing.T) {
|
|||
}
|
||||
engine := NewEngine(opts)
|
||||
errStorage := ErrStorage{errors.New("storage error")}
|
||||
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
|
||||
return &errQuerier{err: errStorage}, nil
|
||||
})
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
|
@ -251,7 +251,7 @@ type noopHintRecordingQueryable struct {
|
|||
hints []*storage.SelectHints
|
||||
}
|
||||
|
||||
func (h *noopHintRecordingQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) {
|
||||
func (h *noopHintRecordingQueryable) Querier(int64, int64) (storage.Querier, error) {
|
||||
return &hintRecordingQuerier{Querier: &errQuerier{}, h: h}, nil
|
||||
}
|
||||
|
||||
|
@ -261,9 +261,9 @@ type hintRecordingQuerier struct {
|
|||
h *noopHintRecordingQueryable
|
||||
}
|
||||
|
||||
func (h *hintRecordingQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
func (h *hintRecordingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
h.h.hints = append(h.h.hints, hints)
|
||||
return h.Querier.Select(sortSeries, hints, matchers...)
|
||||
return h.Querier.Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
func TestSelectHintsSetCorrectly(t *testing.T) {
|
||||
|
|
|
@ -123,7 +123,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
|
||||
// Check the series.
|
||||
queryable := suite.Queryable()
|
||||
querier, err := queryable.Querier(suite.Context(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := queryable.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
for _, s := range tc.series {
|
||||
var matchers []*labels.Matcher
|
||||
|
@ -134,7 +134,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
})
|
||||
|
||||
// Get the series for the matcher.
|
||||
ss := querier.Select(false, nil, matchers...)
|
||||
ss := querier.Select(suite.Context(), false, nil, matchers...)
|
||||
require.True(t, ss.Next())
|
||||
storageSeries := ss.At()
|
||||
require.False(t, ss.Next(), "Expecting only 1 series")
|
||||
|
|
|
@ -261,7 +261,7 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro
|
|||
}
|
||||
|
||||
// QueryforStateSeries returns the series for ALERTS_FOR_STATE.
|
||||
func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (storage.Series, error) {
|
||||
func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q storage.Querier) (storage.Series, error) {
|
||||
smpl := r.forStateSample(alert, time.Now(), 0)
|
||||
var matchers []*labels.Matcher
|
||||
smpl.Metric.Range(func(l labels.Label) {
|
||||
|
@ -271,7 +271,7 @@ func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (sto
|
|||
}
|
||||
matchers = append(matchers, mt)
|
||||
})
|
||||
sset := q.Select(false, nil, matchers...)
|
||||
sset := q.Select(ctx, false, nil, matchers...)
|
||||
|
||||
var s storage.Series
|
||||
for sset.Next() {
|
||||
|
|
|
@ -659,7 +659,7 @@ func TestQueryForStateSeries(t *testing.T) {
|
|||
ValidUntil: time.Time{},
|
||||
}
|
||||
|
||||
series, err := rule.QueryforStateSeries(alert, querier)
|
||||
series, err := rule.QueryforStateSeries(context.Background(), alert, querier)
|
||||
|
||||
require.Equal(t, tst.expectedSeries, series)
|
||||
require.Equal(t, tst.expectedError, err)
|
||||
|
|
|
@ -814,7 +814,7 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
// We allow restoration only if alerts were active before after certain time.
|
||||
mint := ts.Add(-g.opts.OutageTolerance)
|
||||
mintMS := int64(model.TimeFromUnixNano(mint.UnixNano()))
|
||||
q, err := g.opts.Queryable.Querier(g.opts.Context, mintMS, maxtMS)
|
||||
q, err := g.opts.Queryable.Querier(mintMS, maxtMS)
|
||||
if err != nil {
|
||||
level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err)
|
||||
return
|
||||
|
@ -843,7 +843,7 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
alertRule.ForEachActiveAlert(func(a *Alert) {
|
||||
var s storage.Series
|
||||
|
||||
s, err := alertRule.QueryforStateSeries(a, q)
|
||||
s, err := alertRule.QueryforStateSeries(g.opts.Context, a, q)
|
||||
if err != nil {
|
||||
// Querier Warnings are ignored. We do not care unless we have an error.
|
||||
level.Error(g.logger).Log(
|
||||
|
|
|
@ -558,14 +558,14 @@ func TestStaleness(t *testing.T) {
|
|||
group.Eval(ctx, time.Unix(1, 0))
|
||||
group.Eval(ctx, time.Unix(2, 0))
|
||||
|
||||
querier, err := st.Querier(context.Background(), 0, 2000)
|
||||
querier, err := st.Querier(0, 2000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||
require.NoError(t, err)
|
||||
|
||||
set := querier.Select(false, nil, matcher)
|
||||
set := querier.Select(ctx, false, nil, matcher)
|
||||
samples, err := readSeriesSet(set)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -681,14 +681,14 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
|
|||
|
||||
newGroup.Eval(context.Background(), time.Unix(0, 0))
|
||||
|
||||
querier, err := st.Querier(context.Background(), 0, 2000)
|
||||
querier, err := st.Querier(0, 2000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1")
|
||||
require.NoError(t, err)
|
||||
|
||||
set := querier.Select(false, nil, matcher)
|
||||
set := querier.Select(context.Background(), false, nil, matcher)
|
||||
samples, err := readSeriesSet(set)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1107,14 +1107,14 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
|
|||
|
||||
func countStaleNaN(t *testing.T, st storage.Storage) int {
|
||||
var c int
|
||||
querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000)
|
||||
querier, err := st.Querier(0, time.Now().Unix()*1000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
|
||||
require.NoError(t, err)
|
||||
|
||||
set := querier.Select(false, nil, matcher)
|
||||
set := querier.Select(context.Background(), false, nil, matcher)
|
||||
samples, err := readSeriesSet(set)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1381,9 +1381,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
|||
|
||||
group.Eval(context.Background(), ts.Add(10*time.Second))
|
||||
|
||||
q, err := db.Querier(context.Background(), ts.UnixMilli(), ts.Add(20*time.Second).UnixMilli())
|
||||
q, err := db.Querier(ts.UnixMilli(), ts.Add(20*time.Second).UnixMilli())
|
||||
require.NoError(t, err)
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "sum:histogram_metric"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "sum:histogram_metric"))
|
||||
require.True(t, ss.Next())
|
||||
s := ss.At()
|
||||
require.False(t, ss.Next())
|
||||
|
|
|
@ -2925,9 +2925,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
require.NoError(t, slApp.Rollback())
|
||||
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
q, err := s.Querier(time.Time{}.UnixNano(), 0)
|
||||
require.NoError(t, err)
|
||||
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
require.Equal(t, false, series.Next(), "series found in tsdb")
|
||||
require.NoError(t, series.Err())
|
||||
|
||||
|
@ -2937,9 +2937,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.NoError(t, slApp.Commit())
|
||||
|
||||
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
q, err = s.Querier(time.Time{}.UnixNano(), 0)
|
||||
require.NoError(t, err)
|
||||
series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
|
||||
series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
|
||||
require.Equal(t, true, series.Next(), "series not found in tsdb")
|
||||
require.NoError(t, series.Err())
|
||||
require.Equal(t, false, series.Next(), "more than one series found in tsdb")
|
||||
|
@ -2984,9 +2984,9 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
require.NoError(t, slApp.Rollback())
|
||||
require.Equal(t, errNameLabelMandatory, err)
|
||||
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
q, err := s.Querier(time.Time{}.UnixNano(), 0)
|
||||
require.NoError(t, err)
|
||||
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
require.Equal(t, false, series.Next(), "series found in tsdb")
|
||||
require.NoError(t, series.Err())
|
||||
}
|
||||
|
@ -3346,9 +3346,9 @@ func TestScrapeReportSingleAppender(t *testing.T) {
|
|||
|
||||
start := time.Now()
|
||||
for time.Since(start) < 3*time.Second {
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano())
|
||||
q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
|
||||
require.NoError(t, err)
|
||||
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
|
||||
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
|
||||
|
||||
c := 0
|
||||
for series.Next() {
|
||||
|
@ -3418,10 +3418,10 @@ func TestScrapeReportLimit(t *testing.T) {
|
|||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano())
|
||||
q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up"))
|
||||
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up"))
|
||||
|
||||
var found bool
|
||||
for series.Next() {
|
||||
|
|
|
@ -72,15 +72,15 @@ func (f *fanout) StartTime() (int64, error) {
|
|||
return firstTime, nil
|
||||
}
|
||||
|
||||
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
primary, err := f.primary.Querier(ctx, mint, maxt)
|
||||
func (f *fanout) Querier(mint, maxt int64) (Querier, error) {
|
||||
primary, err := f.primary.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
secondaries := make([]Querier, 0, len(f.secondaries))
|
||||
for _, storage := range f.secondaries {
|
||||
querier, err := storage.Querier(ctx, mint, maxt)
|
||||
querier, err := storage.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
// Close already open Queriers, append potential errors to returned error.
|
||||
errs := tsdb_errors.NewMulti(err, primary.Close())
|
||||
|
@ -94,15 +94,15 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
|||
return NewMergeQuerier([]Querier{primary}, secondaries, ChainedSeriesMerge), nil
|
||||
}
|
||||
|
||||
func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) {
|
||||
primary, err := f.primary.ChunkQuerier(ctx, mint, maxt)
|
||||
func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) {
|
||||
primary, err := f.primary.ChunkQuerier(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
secondaries := make([]ChunkQuerier, 0, len(f.secondaries))
|
||||
for _, storage := range f.secondaries {
|
||||
querier, err := storage.ChunkQuerier(ctx, mint, maxt)
|
||||
querier, err := storage.ChunkQuerier(mint, maxt)
|
||||
if err != nil {
|
||||
// Close already open Queriers, append potential errors to returned error.
|
||||
errs := tsdb_errors.NewMulti(err, primary.Close())
|
||||
|
|
|
@ -75,14 +75,14 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
|
||||
|
||||
t.Run("querier", func(t *testing.T) {
|
||||
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
|
||||
querier, err := fanoutStorage.Querier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := querier.Select(true, nil, matcher)
|
||||
seriesSet := querier.Select(ctx, true, nil, matcher)
|
||||
|
||||
result := make(map[int64]float64)
|
||||
var labelsResult labels.Labels
|
||||
|
@ -102,14 +102,14 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
require.Equal(t, inputTotalSize, len(result))
|
||||
})
|
||||
t.Run("chunk querier", func(t *testing.T) {
|
||||
querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000)
|
||||
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher))
|
||||
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(ctx, true, nil, matcher))
|
||||
|
||||
result := make(map[int64]float64)
|
||||
var labelsResult labels.Labels
|
||||
|
@ -159,12 +159,12 @@ func TestFanoutErrors(t *testing.T) {
|
|||
fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary)
|
||||
|
||||
t.Run("samples", func(t *testing.T) {
|
||||
querier, err := fanoutStorage.Querier(context.Background(), 0, 8000)
|
||||
querier, err := fanoutStorage.Querier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
||||
ss := querier.Select(true, nil, matcher)
|
||||
ss := querier.Select(context.Background(), true, nil, matcher)
|
||||
|
||||
// Exhaust.
|
||||
for ss.Next() {
|
||||
|
@ -184,12 +184,12 @@ func TestFanoutErrors(t *testing.T) {
|
|||
})
|
||||
t.Run("chunks", func(t *testing.T) {
|
||||
t.Skip("enable once TestStorage and TSDB implements ChunkQuerier")
|
||||
querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000)
|
||||
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b")
|
||||
ss := querier.Select(true, nil, matcher)
|
||||
ss := querier.Select(context.Background(), true, nil, matcher)
|
||||
|
||||
// Exhaust.
|
||||
for ss.Next() {
|
||||
|
@ -216,20 +216,20 @@ type errStorage struct{}
|
|||
|
||||
type errQuerier struct{}
|
||||
|
||||
func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) {
|
||||
func (errStorage) Querier(_, _ int64) (storage.Querier, error) {
|
||||
return errQuerier{}, nil
|
||||
}
|
||||
|
||||
type errChunkQuerier struct{ errQuerier }
|
||||
|
||||
func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) {
|
||||
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
|
||||
return errChunkQuerier{}, nil
|
||||
}
|
||||
func (errStorage) Appender(_ context.Context) storage.Appender { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
|
||||
func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.ErrSeriesSet(errSelect)
|
||||
}
|
||||
|
||||
|
@ -243,6 +243,6 @@ func (errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, er
|
|||
|
||||
func (errQuerier) Close() error { return nil }
|
||||
|
||||
func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
return storage.ErrChunkSeriesSet(errSelect)
|
||||
}
|
||||
|
|
|
@ -17,12 +17,14 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
type genericQuerier interface {
|
||||
LabelQuerier
|
||||
Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet
|
||||
Select(context.Context, bool, *SelectHints, ...*labels.Matcher) genericSeriesSet
|
||||
}
|
||||
|
||||
type genericSeriesSet interface {
|
||||
|
@ -58,11 +60,11 @@ type genericQuerierAdapter struct {
|
|||
cq ChunkQuerier
|
||||
}
|
||||
|
||||
func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
func (q *genericQuerierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
if q.q != nil {
|
||||
return &genericSeriesSetAdapter{q.q.Select(sortSeries, hints, matchers...)}
|
||||
return &genericSeriesSetAdapter{q.q.Select(ctx, sortSeries, hints, matchers...)}
|
||||
}
|
||||
return &genericChunkSeriesSetAdapter{q.cq.Select(sortSeries, hints, matchers...)}
|
||||
return &genericChunkSeriesSetAdapter{q.cq.Select(ctx, sortSeries, hints, matchers...)}
|
||||
}
|
||||
|
||||
func newGenericQuerierFrom(q Querier) genericQuerier {
|
||||
|
@ -85,8 +87,8 @@ func (a *seriesSetAdapter) At() Series {
|
|||
return a.genericSeriesSet.At().(Series)
|
||||
}
|
||||
|
||||
func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
||||
return &seriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)}
|
||||
func (q *querierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
||||
return &seriesSetAdapter{q.genericQuerier.Select(ctx, sortSeries, hints, matchers...)}
|
||||
}
|
||||
|
||||
type chunkQuerierAdapter struct {
|
||||
|
@ -101,8 +103,8 @@ func (a *chunkSeriesSetAdapter) At() ChunkSeries {
|
|||
return a.genericSeriesSet.At().(ChunkSeries)
|
||||
}
|
||||
|
||||
func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet {
|
||||
return &chunkSeriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)}
|
||||
func (q *chunkQuerierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet {
|
||||
return &chunkSeriesSetAdapter{q.genericQuerier.Select(ctx, sortSeries, hints, matchers...)}
|
||||
}
|
||||
|
||||
type seriesMergerAdapter struct {
|
||||
|
|
|
@ -91,7 +91,7 @@ type ExemplarStorage interface {
|
|||
// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL.
|
||||
type Queryable interface {
|
||||
// Querier returns a new Querier on the storage.
|
||||
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
Querier(mint, maxt int64) (Querier, error)
|
||||
}
|
||||
|
||||
// A MockQueryable is used for testing purposes so that a mock Querier can be used.
|
||||
|
@ -99,7 +99,7 @@ type MockQueryable struct {
|
|||
MockQuerier Querier
|
||||
}
|
||||
|
||||
func (q *MockQueryable) Querier(context.Context, int64, int64) (Querier, error) {
|
||||
func (q *MockQueryable) Querier(int64, int64) (Querier, error) {
|
||||
return q.MockQuerier, nil
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ type Querier interface {
|
|||
// Select returns a set of series that matches the given label matchers.
|
||||
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
||||
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
||||
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
||||
Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
||||
}
|
||||
|
||||
// MockQuerier is used for test purposes to mock the selected series that is returned.
|
||||
|
@ -130,7 +130,7 @@ func (q *MockQuerier) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
||||
func (q *MockQuerier) Select(_ context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet {
|
||||
return q.SelectMockFunction(sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*l
|
|||
// Use it when you need to have access to samples in encoded format.
|
||||
type ChunkQueryable interface {
|
||||
// ChunkQuerier returns a new ChunkQuerier on the storage.
|
||||
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error)
|
||||
ChunkQuerier(mint, maxt int64) (ChunkQuerier, error)
|
||||
}
|
||||
|
||||
// ChunkQuerier provides querying access over time series data of a fixed time range.
|
||||
|
@ -148,7 +148,7 @@ type ChunkQuerier interface {
|
|||
// Select returns a set of series that matches the given label matchers.
|
||||
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
||||
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
||||
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
|
||||
Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet
|
||||
}
|
||||
|
||||
// LabelQuerier provides querying access over labels.
|
||||
|
@ -202,11 +202,11 @@ type SelectHints struct {
|
|||
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||
// QueryableFunc is an adapter to allow the use of ordinary functions as
|
||||
// Queryables. It follows the idea of http.HandlerFunc.
|
||||
type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error)
|
||||
type QueryableFunc func(mint, maxt int64) (Querier, error)
|
||||
|
||||
// Querier calls f() with the given parameters.
|
||||
func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
|
||||
return f(ctx, mint, maxt)
|
||||
func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
|
||||
return f(mint, maxt)
|
||||
}
|
||||
|
||||
// Appender provides batched appends against a storage.
|
||||
|
|
|
@ -16,6 +16,7 @@ package storage
|
|||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
@ -97,19 +98,19 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
|
|||
}
|
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
if len(q.queriers) == 0 {
|
||||
return noopGenericSeriesSet{}
|
||||
}
|
||||
if len(q.queriers) == 1 {
|
||||
return q.queriers[0].Select(sortSeries, hints, matchers...)
|
||||
return q.queriers[0].Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
|
||||
if !q.concurrentSelect {
|
||||
for _, querier := range q.queriers {
|
||||
// We need to sort for merge to work.
|
||||
seriesSets = append(seriesSets, querier.Select(true, hints, matchers...))
|
||||
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
|
||||
}
|
||||
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
||||
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
|
||||
|
@ -128,7 +129,7 @@ func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matche
|
|||
defer wg.Done()
|
||||
|
||||
// We need to sort for NewMergeSeriesSet to work.
|
||||
seriesSetChan <- qr.Select(true, hints, matchers...)
|
||||
seriesSetChan <- qr.Select(ctx, true, hints, matchers...)
|
||||
}(querier)
|
||||
}
|
||||
go func() {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -187,7 +188,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
|
|||
}
|
||||
qs = append(qs, tc.extraQueriers...)
|
||||
|
||||
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(false, nil)
|
||||
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil)
|
||||
|
||||
// Get all merged series upfront to make sure there are no incorrectly retained shared
|
||||
// buffers causing bugs.
|
||||
|
@ -363,7 +364,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
|
|||
}
|
||||
qs = append(qs, tc.extraQueriers...)
|
||||
|
||||
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil)
|
||||
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil)
|
||||
for merged.Next() {
|
||||
require.True(t, tc.expected.Next(), "Expected Next() to be true")
|
||||
actualSeries := merged.At()
|
||||
|
@ -721,7 +722,7 @@ func (a seriesByLabel) Len() int { return len(a) }
|
|||
func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
|
||||
|
||||
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
|
||||
func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet {
|
||||
cpy := make([]Series, len(m.toReturn))
|
||||
copy(cpy, m.toReturn)
|
||||
if sortSeries {
|
||||
|
@ -745,7 +746,7 @@ func (a chunkSeriesByLabel) Less(i, j int) bool {
|
|||
return labels.Compare(a[i].Labels(), a[j].Labels()) < 0
|
||||
}
|
||||
|
||||
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet {
|
||||
func (m *mockChunkQurier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet {
|
||||
cpy := make([]ChunkSeries, len(m.toReturn))
|
||||
copy(cpy, m.toReturn)
|
||||
if sortSeries {
|
||||
|
@ -982,7 +983,7 @@ type labelNameRequest struct {
|
|||
matchers []*labels.Matcher
|
||||
}
|
||||
|
||||
func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
|
||||
func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
|
||||
m.mtx.Lock()
|
||||
m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
|
||||
m.mtx.Unlock()
|
||||
|
@ -1177,7 +1178,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("Select", func(t *testing.T) {
|
||||
res := q.Select(false, nil)
|
||||
res := q.Select(context.Background(), false, nil)
|
||||
var lbls []labels.Labels
|
||||
for res.Next() {
|
||||
lbls = append(lbls, res.At().Labels())
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
)
|
||||
|
||||
|
@ -24,7 +26,7 @@ func NoopQuerier() Querier {
|
|||
return noopQuerier{}
|
||||
}
|
||||
|
||||
func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) SeriesSet {
|
||||
func (noopQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matcher) SeriesSet {
|
||||
return NoopSeriesSet()
|
||||
}
|
||||
|
||||
|
@ -47,7 +49,7 @@ func NoopChunkedQuerier() ChunkQuerier {
|
|||
return noopChunkQuerier{}
|
||||
}
|
||||
|
||||
func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) ChunkSeriesSet {
|
||||
func (noopChunkQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matcher) ChunkSeriesSet {
|
||||
return NoopChunkedSeriesSet()
|
||||
}
|
||||
|
||||
|
|
|
@ -48,9 +48,8 @@ func NewSampleAndChunkQueryableClient(
|
|||
}
|
||||
}
|
||||
|
||||
func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
q := &querier{
|
||||
ctx: ctx,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
client: c.client,
|
||||
|
@ -75,10 +74,9 @@ func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt
|
|||
return q, nil
|
||||
}
|
||||
|
||||
func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
cq := &chunkQuerier{
|
||||
querier: querier{
|
||||
ctx: ctx,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
client: c.client,
|
||||
|
@ -125,7 +123,6 @@ func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cm
|
|||
}
|
||||
|
||||
type querier struct {
|
||||
ctx context.Context
|
||||
mint, maxt int64
|
||||
client ReadClient
|
||||
|
||||
|
@ -140,7 +137,7 @@ type querier struct {
|
|||
//
|
||||
// If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the
|
||||
// requiredMatchers. Otherwise it'll just call remote endpoint.
|
||||
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
if len(q.requiredMatchers) > 0 {
|
||||
// Copy to not modify slice configured by user.
|
||||
requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...)
|
||||
|
@ -167,7 +164,7 @@ func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers .
|
|||
return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err))
|
||||
}
|
||||
|
||||
res, err := q.client.Read(q.ctx, query)
|
||||
res, err := q.client.Read(ctx, query)
|
||||
if err != nil {
|
||||
return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err))
|
||||
}
|
||||
|
@ -235,9 +232,9 @@ type chunkQuerier struct {
|
|||
|
||||
// Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client.
|
||||
// It uses remote.querier.Select so it supports external labels and required matchers if specified.
|
||||
func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
// TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket).
|
||||
return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...))
|
||||
return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...))
|
||||
}
|
||||
|
||||
// Note strings in toFilter must be sorted.
|
||||
|
|
|
@ -131,7 +131,7 @@ func (h *readHandler) remoteReadSamples(
|
|||
return err
|
||||
}
|
||||
|
||||
querier, err := h.queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
||||
querier, err := h.queryable.Querier(query.StartTimestampMs, query.EndTimestampMs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ func (h *readHandler) remoteReadSamples(
|
|||
}
|
||||
|
||||
var ws storage.Warnings
|
||||
resp.Results[i], ws, err = ToQueryResult(querier.Select(false, hints, filteredMatchers...), h.remoteReadSampleLimit)
|
||||
resp.Results[i], ws, err = ToQueryResult(querier.Select(ctx, false, hints, filteredMatchers...), h.remoteReadSampleLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -198,7 +198,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
|||
return err
|
||||
}
|
||||
|
||||
querier, err := h.queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs)
|
||||
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -225,7 +225,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re
|
|||
NewChunkedWriter(w, f),
|
||||
int64(i),
|
||||
// The streaming API has to provide the series sorted.
|
||||
querier.Select(true, hints, filteredMatchers...),
|
||||
querier.Select(ctx, true, hints, filteredMatchers...),
|
||||
sortedExternalLabels,
|
||||
h.remoteReadMaxBytesInFrame,
|
||||
h.marshalPool,
|
||||
|
|
|
@ -469,11 +469,11 @@ func TestSampleAndChunkQueryableClient(t *testing.T) {
|
|||
tc.readRecent,
|
||||
tc.callback,
|
||||
)
|
||||
q, err := c.Querier(context.TODO(), tc.mint, tc.maxt)
|
||||
q, err := c.Querier(tc.mint, tc.maxt)
|
||||
require.NoError(t, err)
|
||||
defer require.NoError(t, q.Close())
|
||||
|
||||
ss := q.Select(true, nil, tc.matchers...)
|
||||
ss := q.Select(context.Background(), true, nil, tc.matchers...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, storage.Warnings(nil), ss.Warnings())
|
||||
|
||||
|
|
|
@ -152,14 +152,14 @@ func (s *Storage) StartTime() (int64, error) {
|
|||
// Returned querier will never return error as all queryables are assumed best effort.
|
||||
// Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke.
|
||||
// This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design.
|
||||
func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (s *Storage) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
s.mtx.Lock()
|
||||
queryables := s.queryables
|
||||
s.mtx.Unlock()
|
||||
|
||||
queriers := make([]storage.Querier, 0, len(queryables))
|
||||
for _, queryable := range queryables {
|
||||
q, err := queryable.Querier(ctx, mint, maxt)
|
||||
q, err := queryable.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -170,14 +170,14 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie
|
|||
|
||||
// ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers
|
||||
// of each configured remote read endpoint.
|
||||
func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
s.mtx.Lock()
|
||||
queryables := s.queryables
|
||||
s.mtx.Unlock()
|
||||
|
||||
queriers := make([]storage.ChunkQuerier, 0, len(queryables))
|
||||
for _, queryable := range queryables {
|
||||
q, err := queryable.ChunkQuerier(ctx, mint, maxt)
|
||||
q, err := queryable.ChunkQuerier(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
|
@ -63,12 +64,12 @@ func (s *secondaryQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Wa
|
|||
return names, w, nil
|
||||
}
|
||||
|
||||
func (s *secondaryQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||
if s.done {
|
||||
panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done")
|
||||
}
|
||||
|
||||
s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(sortSeries, hints, matchers...))
|
||||
s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(ctx, sortSeries, hints, matchers...))
|
||||
curr := len(s.asyncSets) - 1
|
||||
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
|
||||
s.once.Do(func() {
|
||||
|
|
|
@ -716,12 +716,12 @@ func (db *DB) StartTime() (int64, error) {
|
|||
}
|
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (db *DB) Querier(context.Context, int64, int64) (storage.Querier, error) {
|
||||
func (db *DB) Querier(int64, int64) (storage.Querier, error) {
|
||||
return nil, ErrUnsupported
|
||||
}
|
||||
|
||||
// ChunkQuerier implements the Storage interface.
|
||||
func (db *DB) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) {
|
||||
func (db *DB) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) {
|
||||
return nil, ErrUnsupported
|
||||
}
|
||||
|
||||
|
|
|
@ -103,12 +103,12 @@ func TestUnsupportedFunctions(t *testing.T) {
|
|||
defer s.Close()
|
||||
|
||||
t.Run("Querier", func(t *testing.T) {
|
||||
_, err := s.Querier(context.TODO(), 0, 0)
|
||||
_, err := s.Querier(0, 0)
|
||||
require.Equal(t, err, ErrUnsupported)
|
||||
})
|
||||
|
||||
t.Run("ChunkQuerier", func(t *testing.T) {
|
||||
_, err := s.ChunkQuerier(context.TODO(), 0, 0)
|
||||
_, err := s.ChunkQuerier(0, 0)
|
||||
require.Equal(t, err, ErrUnsupported)
|
||||
})
|
||||
|
||||
|
|
|
@ -198,7 +198,7 @@ func TestCorruptedChunk(t *testing.T) {
|
|||
querier, err := NewBlockQuerier(b, 0, 1)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, querier.Close()) }()
|
||||
set := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
set := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
// Check chunk errors during iter time.
|
||||
require.True(t, set.Next())
|
||||
|
|
12
tsdb/db.go
12
tsdb/db.go
|
@ -526,22 +526,22 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue
|
|||
|
||||
// Querier loads the blocks and wal and returns a new querier over the data partition for the given time range.
|
||||
// Current implementation doesn't support multiple Queriers.
|
||||
func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
q, err := db.loadDataAsQueryable(maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return q.Querier(ctx, mint, maxt)
|
||||
return q.Querier(mint, maxt)
|
||||
}
|
||||
|
||||
// ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range.
|
||||
// Current implementation doesn't support multiple ChunkQueriers.
|
||||
func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
q, err := db.loadDataAsQueryable(maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return q.ChunkQuerier(ctx, mint, maxt)
|
||||
return q.ChunkQuerier(mint, maxt)
|
||||
}
|
||||
|
||||
// Blocks returns a slice of block readers for persisted blocks.
|
||||
|
@ -1841,7 +1841,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
|
|||
}
|
||||
|
||||
// Querier returns a new querier over the data partition for the given time range.
|
||||
func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
var blocks []BlockReader
|
||||
|
||||
db.mtx.RLock()
|
||||
|
@ -1989,7 +1989,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie
|
|||
}
|
||||
|
||||
// ChunkQuerier returns a new chunk querier over the data partition for the given time range.
|
||||
func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
blockQueriers, err := db.blockChunkQuerierForRange(mint, maxt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
140
tsdb/db_test.go
140
tsdb/db_test.go
|
@ -88,7 +88,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
|
|||
|
||||
// query runs a matcher query against the querier and fully expands its data.
|
||||
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
|
||||
ss := q.Select(false, nil, matchers...)
|
||||
ss := q.Select(context.Background(), false, nil, matchers...)
|
||||
defer func() {
|
||||
require.NoError(t, q.Close())
|
||||
}()
|
||||
|
@ -150,7 +150,7 @@ func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*lab
|
|||
|
||||
// queryChunks runs a matcher query against the querier and expands its data.
|
||||
func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][]chunks.Meta {
|
||||
ss := q.Select(false, nil, matchers...)
|
||||
ss := q.Select(context.Background(), false, nil, matchers...)
|
||||
defer func() {
|
||||
require.NoError(t, q.Close())
|
||||
}()
|
||||
|
@ -219,7 +219,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
|||
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
querier, err := db.Querier(context.TODO(), 0, 1)
|
||||
querier, err := db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
require.Equal(t, map[string][]chunks.Sample{}, seriesSet)
|
||||
|
@ -227,7 +227,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) {
|
|||
err = app.Commit()
|
||||
require.NoError(t, err)
|
||||
|
||||
querier, err = db.Querier(context.TODO(), 0, 1)
|
||||
querier, err = db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
|
@ -285,7 +285,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) {
|
|||
}()
|
||||
require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal), "WAL corruption count mismatch")
|
||||
|
||||
querier, err := db.Querier(context.TODO(), 0, maxt)
|
||||
querier, err := db.Querier(0, maxt)
|
||||
require.NoError(t, err)
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "", ""))
|
||||
// The last sample should be missing as it was after the WAL segment corruption.
|
||||
|
@ -306,7 +306,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
|
|||
err = app.Rollback()
|
||||
require.NoError(t, err)
|
||||
|
||||
querier, err := db.Querier(context.TODO(), 0, 1)
|
||||
querier, err := db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
|
@ -357,7 +357,7 @@ func TestDBAppenderAddRef(t *testing.T) {
|
|||
|
||||
require.NoError(t, app2.Commit())
|
||||
|
||||
q, err := db.Querier(context.TODO(), 0, 200)
|
||||
q, err := db.Querier(0, 200)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
@ -450,10 +450,10 @@ Outer:
|
|||
}
|
||||
|
||||
// Compare the result.
|
||||
q, err := db.Querier(context.TODO(), 0, numSamples)
|
||||
q, err := db.Querier(0, numSamples)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
expSamples := make([]chunks.Sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
|
@ -610,7 +610,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Make sure the right value is stored.
|
||||
q, err := db.Querier(context.TODO(), 0, 10)
|
||||
q, err := db.Querier(0, 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
@ -627,7 +627,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
q, err = db.Querier(context.TODO(), 0, 10)
|
||||
q, err = db.Querier(0, 10)
|
||||
require.NoError(t, err)
|
||||
|
||||
ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
@ -660,12 +660,12 @@ func TestDB_Snapshot(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
querier, err := db.Querier(context.TODO(), mint, mint+1000)
|
||||
querier, err := db.Querier(mint, mint+1000)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, querier.Close()) }()
|
||||
|
||||
// sum values
|
||||
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
seriesSet := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
var series chunkenc.Iterator
|
||||
sum := 0.0
|
||||
for seriesSet.Next() {
|
||||
|
@ -709,12 +709,12 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
querier, err := db.Querier(context.TODO(), mint, mint+1000)
|
||||
querier, err := db.Querier(mint, mint+1000)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, querier.Close()) }()
|
||||
|
||||
// Sum values.
|
||||
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
seriesSet := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
var series chunkenc.Iterator
|
||||
sum := 0.0
|
||||
for seriesSet.Next() {
|
||||
|
@ -777,11 +777,11 @@ Outer:
|
|||
defer func() { require.NoError(t, newDB.Close()) }()
|
||||
|
||||
// Compare the result.
|
||||
q, err := newDB.Querier(context.TODO(), 0, numSamples)
|
||||
q, err := newDB.Querier(0, numSamples)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
||||
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
expSamples := make([]chunks.Sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
|
@ -952,10 +952,10 @@ func TestDB_e2e(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.TODO(), mint, maxt)
|
||||
q, err := db.Querier(mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
ss := q.Select(false, nil, qry.ms...)
|
||||
ss := q.Select(ctx, false, nil, qry.ms...)
|
||||
result := map[string][]chunks.Sample{}
|
||||
|
||||
for ss.Next() {
|
||||
|
@ -997,7 +997,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
q, err := db.Querier(context.TODO(), 0, 1)
|
||||
q, err := db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
values, ws, err := q.LabelValues("labelname")
|
||||
|
@ -1143,10 +1143,10 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore
|
|||
})
|
||||
|
||||
// Query back chunks for all series.
|
||||
q, err := reopenDB.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := reopenDB.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
|
||||
set := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
|
||||
actualSeries := 0
|
||||
var chunksIt chunks.Iterator
|
||||
|
||||
|
@ -1214,11 +1214,11 @@ func TestTombstoneClean(t *testing.T) {
|
|||
require.NoError(t, db.CleanTombstones())
|
||||
|
||||
// Compare the result.
|
||||
q, err := db.Querier(context.TODO(), 0, numSamples)
|
||||
q, err := db.Querier(0, numSamples)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
expSamples := make([]chunks.Sample, 0, len(c.remaint))
|
||||
for _, ts := range c.remaint {
|
||||
|
@ -1716,12 +1716,12 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
|||
series: labelpairs[:1],
|
||||
}}
|
||||
|
||||
q, err := db.Querier(context.TODO(), 0, 10)
|
||||
q, err := db.Querier(0, 10)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
||||
for _, c := range cases {
|
||||
ss := q.Select(false, nil, c.selector...)
|
||||
ss := q.Select(ctx, false, nil, c.selector...)
|
||||
lres, _, ws, err := expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
|
@ -1925,7 +1925,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
|
|||
|
||||
require.GreaterOrEqual(t, len(db.blocks), 3, "invalid test, less than three blocks in DB")
|
||||
|
||||
q, err := db.Querier(context.TODO(), blockRange, 2*blockRange)
|
||||
q, err := db.Querier(blockRange, 2*blockRange)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
|
@ -2232,7 +2232,7 @@ func TestDB_LabelNames(t *testing.T) {
|
|||
appendSamples(db, 5, 9, tst.sampleLabels2)
|
||||
|
||||
// Testing DB (union).
|
||||
q, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
var ws storage.Warnings
|
||||
labelNames, ws, err = q.LabelNames()
|
||||
|
@ -2434,10 +2434,10 @@ func TestDBReadOnly(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Greater(t, expDbSize, dbSizeBeforeAppend, "db size didn't increase after an append")
|
||||
|
||||
q, err := dbWritable.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
expSeries = query(t, q, matchAll)
|
||||
cq, err := dbWritable.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
cq, err := dbWritable.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
expChunks = queryAndExpandChunks(t, cq, matchAll)
|
||||
|
||||
|
@ -2476,7 +2476,7 @@ func TestDBReadOnly(t *testing.T) {
|
|||
})
|
||||
t.Run("querier", func(t *testing.T) {
|
||||
// Open a read only db and ensure that the API returns the same result as the normal DB.
|
||||
q, err := dbReadOnly.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
readOnlySeries := query(t, q, matchAll)
|
||||
readOnlyDBHash := testutil.DirHash(t, dbDir)
|
||||
|
@ -2486,7 +2486,7 @@ func TestDBReadOnly(t *testing.T) {
|
|||
require.Equal(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
|
||||
})
|
||||
t.Run("chunk querier", func(t *testing.T) {
|
||||
cq, err := dbReadOnly.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
cq, err := dbReadOnly.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
readOnlySeries := queryAndExpandChunks(t, cq, matchAll)
|
||||
readOnlyDBHash := testutil.DirHash(t, dbDir)
|
||||
|
@ -2507,7 +2507,7 @@ func TestDBReadOnlyClosing(t *testing.T) {
|
|||
require.Equal(t, db.Close(), ErrClosed)
|
||||
_, err = db.Blocks()
|
||||
require.Equal(t, err, ErrClosed)
|
||||
_, err = db.Querier(context.TODO(), 0, 1)
|
||||
_, err = db.Querier(0, 1)
|
||||
require.Equal(t, err, ErrClosed)
|
||||
}
|
||||
|
||||
|
@ -2554,12 +2554,12 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, len(blocks), 1)
|
||||
|
||||
querier, err := db.Querier(context.TODO(), 0, int64(maxt)-1)
|
||||
querier, err := db.Querier(0, int64(maxt)-1)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, querier.Close()) }()
|
||||
|
||||
// Sum the values.
|
||||
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
|
||||
seriesSet := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
|
||||
var series chunkenc.Iterator
|
||||
|
||||
sum := 0.0
|
||||
|
@ -2624,11 +2624,11 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
|
|||
inconsistencies := 0
|
||||
for i := 0; i < 10; i++ {
|
||||
func() {
|
||||
querier, err := db.Querier(context.Background(), 0, 1000000)
|
||||
querier, err := db.Querier(0, 1000000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err := expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
|
@ -2658,7 +2658,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
querierBeforeAdd, err := db.Querier(context.Background(), 0, 1000000)
|
||||
querierBeforeAdd, err := db.Querier(0, 1000000)
|
||||
require.NoError(t, err)
|
||||
defer querierBeforeAdd.Close()
|
||||
|
||||
|
@ -2667,18 +2667,18 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000)
|
||||
querierAfterAddButBeforeCommit, err := db.Querier(0, 1000000)
|
||||
require.NoError(t, err)
|
||||
defer querierAfterAddButBeforeCommit.Close()
|
||||
|
||||
// None of the queriers should return anything after the Add but before the commit.
|
||||
ss := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss := querierBeforeAdd.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err := expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
require.Equal(t, map[string][]sample{}, seriesSet)
|
||||
|
||||
ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss = querierAfterAddButBeforeCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err = expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
|
@ -2689,25 +2689,25 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// Nothing returned for querier created before the Add.
|
||||
ss = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss = querierBeforeAdd.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err = expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
require.Equal(t, map[string][]sample{}, seriesSet)
|
||||
|
||||
// Series exists but has no samples for querier created after Add.
|
||||
ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss = querierAfterAddButBeforeCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err = expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
require.Equal(t, map[string][]sample{`{foo="bar"}`: {}}, seriesSet)
|
||||
|
||||
querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000)
|
||||
querierAfterCommit, err := db.Querier(0, 1000000)
|
||||
require.NoError(t, err)
|
||||
defer querierAfterCommit.Close()
|
||||
|
||||
// Samples are returned for querier created after Commit.
|
||||
ss = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss = querierAfterCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err = expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
|
@ -3008,11 +3008,11 @@ func TestCompactHead(t *testing.T) {
|
|||
require.Equal(t, 1, len(db.Blocks()))
|
||||
require.Equal(t, int64(maxt), db.Head().MinTime())
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
querier, err := db.Querier(context.Background(), 0, int64(maxt)-1)
|
||||
querier, err := db.Querier(0, int64(maxt)-1)
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, querier.Close()) }()
|
||||
|
||||
seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
|
||||
seriesSet := querier.Select(ctx, false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
|
||||
var series chunkenc.Iterator
|
||||
var actSamples []sample
|
||||
|
||||
|
@ -3403,7 +3403,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
|
|||
// At this point we expect 2 mmap-ed head chunks.
|
||||
|
||||
// Get a querier and make sure it's closed only once the test is over.
|
||||
querier, err := db.Querier(ctx, 0, math.MaxInt64)
|
||||
querier, err := db.Querier(0, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, querier.Close())
|
||||
|
@ -3411,7 +3411,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
|
|||
|
||||
// Query back all series.
|
||||
hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval}
|
||||
seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
|
||||
seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
|
||||
|
||||
// Fetch samples iterators from all series.
|
||||
var iterators []chunkenc.Iterator
|
||||
|
@ -3539,7 +3539,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
|
|||
// At this point we expect 2 mmap-ed head chunks.
|
||||
|
||||
// Get a querier and make sure it's closed only once the test is over.
|
||||
querier, err := db.ChunkQuerier(ctx, 0, math.MaxInt64)
|
||||
querier, err := db.ChunkQuerier(0, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, querier.Close())
|
||||
|
@ -3547,7 +3547,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
|
|||
|
||||
// Query back all series.
|
||||
hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval}
|
||||
seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
|
||||
seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
|
||||
|
||||
// Iterate all series and get their chunks.
|
||||
var it chunks.Iterator
|
||||
|
@ -4170,7 +4170,7 @@ func TestOOOCompaction(t *testing.T) {
|
|||
series2.String(): series2Samples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -4561,7 +4561,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) {
|
|||
series2.String(): series2Samples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), fromMins*time.Minute.Milliseconds(), toMins*time.Minute.Milliseconds())
|
||||
q, err := db.Querier(fromMins*time.Minute.Milliseconds(), toMins*time.Minute.Milliseconds())
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -4661,7 +4661,7 @@ func Test_Querier_OOOQuery(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
querier, err := db.Querier(context.TODO(), tc.queryMinT, tc.queryMaxT)
|
||||
querier, err := db.Querier(tc.queryMinT, tc.queryMaxT)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
|
@ -4746,7 +4746,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
querier, err := db.ChunkQuerier(context.TODO(), tc.queryMinT, tc.queryMaxT)
|
||||
querier, err := db.ChunkQuerier(tc.queryMinT, tc.queryMaxT)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
|
@ -4807,7 +4807,7 @@ func TestOOOAppendAndQuery(t *testing.T) {
|
|||
}
|
||||
|
||||
testQuery := func(from, to int64) {
|
||||
querier, err := db.Querier(context.TODO(), from, to)
|
||||
querier, err := db.Querier(from, to)
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
|
||||
|
@ -4936,7 +4936,7 @@ func TestOOODisabled(t *testing.T) {
|
|||
addSample(s1, 59, 59, true) // Out of time window again.
|
||||
addSample(s1, 301, 310, false) // More in-order samples.
|
||||
|
||||
querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
|
||||
|
@ -4988,7 +4988,7 @@ func TestWBLAndMmapReplay(t *testing.T) {
|
|||
}
|
||||
|
||||
testQuery := func(exp map[string][]chunks.Sample) {
|
||||
querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar."))
|
||||
|
@ -5376,7 +5376,7 @@ func TestWBLCorruption(t *testing.T) {
|
|||
series1.String(): expSamples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -5484,7 +5484,7 @@ func TestOOOMmapCorruption(t *testing.T) {
|
|||
series1.String(): expSamples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -5602,7 +5602,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) {
|
|||
series1.String(): expSamples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -5805,7 +5805,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) {
|
|||
series1.String(): expSamples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -5924,7 +5924,7 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) {
|
|||
series1.String(): expSamples,
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
|
@ -6126,7 +6126,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
|
|||
|
||||
testQuery := func(name, value string, exp map[string][]chunks.Sample) {
|
||||
t.Helper()
|
||||
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, name, value))
|
||||
require.Equal(t, exp, act)
|
||||
|
@ -6362,8 +6362,6 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
require.NoError(t, db.Close())
|
||||
})
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
var it chunkenc.Iterator
|
||||
exp := make(map[string][]chunks.Sample)
|
||||
for _, series := range blockSeries {
|
||||
|
@ -6399,7 +6397,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
require.NoError(t, db.reload())
|
||||
require.Len(t, db.Blocks(), len(blockSeries))
|
||||
|
||||
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
compareSeries(t, exp, res)
|
||||
|
@ -6416,7 +6414,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
|
|||
require.NoError(t, db.reload())
|
||||
require.Len(t, db.Blocks(), 1)
|
||||
|
||||
q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
q, err = db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
|
||||
|
@ -6543,7 +6541,7 @@ func TestNativeHistogramFlag(t *testing.T) {
|
|||
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
require.Equal(t, map[string][]chunks.Sample{
|
||||
|
@ -6627,12 +6625,12 @@ func TestChunkQuerierReadWriteRace(t *testing.T) {
|
|||
}
|
||||
|
||||
reader := func() {
|
||||
querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
defer func(q storage.ChunkQuerier) {
|
||||
require.NoError(t, q.Close())
|
||||
}(querier)
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
for ss.Next() {
|
||||
cs := ss.At()
|
||||
it := cs.Iterator(nil)
|
||||
|
|
|
@ -59,9 +59,9 @@ func Example() {
|
|||
// ... adding more samples.
|
||||
|
||||
// Open a querier for reading.
|
||||
querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
noErr(err)
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
|
|
|
@ -1140,7 +1140,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
|||
for _, h := range []*Head{head, reloadedHead} {
|
||||
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
|
||||
require.NoError(t, err)
|
||||
actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
|
||||
actSeriesSet := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
|
||||
require.NoError(t, q.Close())
|
||||
expSeriesSet := newMockSeriesSet([]storage.Series{
|
||||
storage.NewListSeries(lblsDefault, func() []chunks.Sample {
|
||||
|
@ -1200,7 +1200,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
// Test the series returns no samples. The series is cleared only after compaction.
|
||||
q, err := NewBlockQuerier(hb, 0, 100000)
|
||||
require.NoError(t, err)
|
||||
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
require.True(t, res.Next(), "series is not present")
|
||||
s := res.At()
|
||||
it := s.Iterator(nil)
|
||||
|
@ -1217,7 +1217,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
q, err = NewBlockQuerier(hb, 0, 100000)
|
||||
require.NoError(t, err)
|
||||
res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res = q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
require.True(t, res.Next(), "series don't exist")
|
||||
exps := res.At()
|
||||
it = exps.Iterator(nil)
|
||||
|
@ -1389,7 +1389,7 @@ func TestDelete_e2e(t *testing.T) {
|
|||
q, err := NewBlockQuerier(hb, 0, 100000)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
ss := q.Select(true, nil, del.ms...)
|
||||
ss := q.Select(context.Background(), true, nil, del.ms...)
|
||||
// Build the mockSeriesSet.
|
||||
matchedSeries := make([]storage.Series, 0, len(matched))
|
||||
for _, m := range matched {
|
||||
|
@ -1835,7 +1835,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
require.Equal(t, true, ss.Next())
|
||||
for ss.Next() {
|
||||
}
|
||||
|
@ -1864,7 +1864,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
|||
q, err := NewBlockQuerier(h, 1500, 2500)
|
||||
require.NoError(t, err)
|
||||
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
require.Equal(t, false, ss.Next())
|
||||
require.Equal(t, 0, len(ss.Warnings()))
|
||||
require.NoError(t, q.Close())
|
||||
|
@ -2149,7 +2149,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
_, seriesSet, ws, err := expandSeriesSet(ss)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
|
@ -2521,7 +2521,7 @@ func testHeadSeriesChunkRace(t *testing.T) {
|
|||
h.gc()
|
||||
wg.Done()
|
||||
}()
|
||||
ss := q.Select(false, nil, matcher)
|
||||
ss := q.Select(context.Background(), false, nil, matcher)
|
||||
for ss.Next() {
|
||||
}
|
||||
require.NoError(t, ss.Err())
|
||||
|
@ -2926,11 +2926,11 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Get a querier before compaction (or when compaction is about to begin).
|
||||
q, err := db.Querier(context.Background(), mint, maxt)
|
||||
q, err := db.Querier(mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Query the compacted range and get the first series before compaction.
|
||||
ss := q.Select(true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
ss := q.Select(context.Background(), true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
require.True(t, ss.Next())
|
||||
s := ss.At()
|
||||
|
||||
|
@ -2993,7 +2993,7 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) {
|
|||
require.NoError(t, app.Commit())
|
||||
|
||||
// Get a querier before compaction (or when compaction is about to begin).
|
||||
q, err := db.Querier(context.Background(), mint, maxt)
|
||||
q, err := db.Querier(mint, maxt)
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
@ -3107,11 +3107,11 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
|
|||
require.True(t, waitOver.Load())
|
||||
}
|
||||
|
||||
q, err := db.Querier(context.Background(), c.mint, c.maxt)
|
||||
q, err := db.Querier(c.mint, c.maxt)
|
||||
require.NoError(t, err)
|
||||
checkWaiting(q)
|
||||
|
||||
cq, err := db.ChunkQuerier(context.Background(), c.mint, c.maxt)
|
||||
cq, err := db.ChunkQuerier(c.mint, c.maxt)
|
||||
require.NoError(t, err)
|
||||
checkWaiting(cq)
|
||||
})
|
||||
|
@ -3191,7 +3191,7 @@ func TestAppendHistogram(t *testing.T) {
|
|||
require.NoError(t, q.Close())
|
||||
})
|
||||
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
require.True(t, ss.Next())
|
||||
s := ss.At()
|
||||
|
@ -3844,7 +3844,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
|
|||
require.NoError(t, q.Close())
|
||||
})
|
||||
|
||||
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
||||
require.True(t, ss.Next())
|
||||
s := ss.At()
|
||||
|
@ -4236,7 +4236,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
}
|
||||
|
||||
// Query back and expect same order of samples.
|
||||
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
@ -125,7 +126,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
|||
return &blockQuerier{blockBaseQuerier: q}, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||
func (q *blockQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
|
@ -165,7 +166,7 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier
|
|||
return &blockChunkQuerier{blockBaseQuerier: q}, nil
|
||||
}
|
||||
|
||||
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
disableTrimming := false
|
||||
|
|
|
@ -258,7 +258,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss := q.Select(sorted, nil, matcher)
|
||||
ss := q.Select(context.Background(), sorted, nil, matcher)
|
||||
for ss.Next() { // nolint:revive
|
||||
}
|
||||
require.NoError(b, ss.Err())
|
||||
|
|
|
@ -182,7 +182,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
|||
},
|
||||
}
|
||||
|
||||
res := q.Select(false, c.hints, c.ms...)
|
||||
res := q.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
||||
for {
|
||||
|
@ -217,7 +217,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
|||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
res := q.Select(false, c.hints, c.ms...)
|
||||
res := q.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
||||
for {
|
||||
|
@ -1689,7 +1689,7 @@ func BenchmarkQuerySeek(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
|
||||
var it chunkenc.Iterator
|
||||
ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
ss := sq.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
for ss.Next() {
|
||||
it = ss.At().Iterator(it)
|
||||
for t := mint; t <= maxt; t++ {
|
||||
|
@ -1822,7 +1822,7 @@ func BenchmarkSetMatcher(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for n := 0; n < b.N; n++ {
|
||||
ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
|
||||
ss := sq.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
|
||||
for ss.Next() {
|
||||
}
|
||||
require.NoError(b, ss.Err())
|
||||
|
@ -2234,7 +2234,7 @@ func TestQuerierIndexQueriesRace(t *testing.T) {
|
|||
t.Cleanup(cancel)
|
||||
|
||||
for i := 0; i < testRepeats; i++ {
|
||||
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
|
||||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
values, _, err := q.LabelValues("seq", c.matchers...)
|
||||
|
@ -2275,7 +2275,7 @@ func TestClose(t *testing.T) {
|
|||
require.NoError(t, db.Close())
|
||||
}()
|
||||
|
||||
q, err := db.Querier(context.TODO(), 0, 20)
|
||||
q, err := db.Querier(0, 20)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, q.Close())
|
||||
require.Error(t, q.Close())
|
||||
|
@ -2408,7 +2408,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss := q.Select(false, nil, selectors...)
|
||||
ss := q.Select(context.Background(), false, nil, selectors...)
|
||||
var actualExpansions int
|
||||
var it chunkenc.Iterator
|
||||
for ss.Next() {
|
||||
|
@ -2628,7 +2628,7 @@ func BenchmarkHeadChunkQuerier(b *testing.B) {
|
|||
}
|
||||
require.NoError(b, app.Commit())
|
||||
|
||||
querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(b, err)
|
||||
defer func(q storage.ChunkQuerier) {
|
||||
require.NoError(b, q.Close())
|
||||
|
@ -2636,7 +2636,7 @@ func BenchmarkHeadChunkQuerier(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
total := 0
|
||||
for ss.Next() {
|
||||
cs := ss.At()
|
||||
|
@ -2673,7 +2673,7 @@ func BenchmarkHeadQuerier(b *testing.B) {
|
|||
}
|
||||
require.NoError(b, app.Commit())
|
||||
|
||||
querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
|
||||
querier, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(b, err)
|
||||
defer func(q storage.Querier) {
|
||||
require.NoError(b, q.Close())
|
||||
|
@ -2681,7 +2681,7 @@ func BenchmarkHeadQuerier(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*"))
|
||||
total := int64(0)
|
||||
for ss.Next() {
|
||||
cs := ss.At()
|
||||
|
@ -2746,10 +2746,10 @@ func TestQueryWithDeletedHistograms(t *testing.T) {
|
|||
err = db.Delete(80, 100, matcher)
|
||||
require.NoError(t, err)
|
||||
|
||||
chunkQuerier, err := db.ChunkQuerier(context.Background(), 0, 100)
|
||||
chunkQuerier, err := db.ChunkQuerier(0, 100)
|
||||
require.NoError(t, err)
|
||||
|
||||
css := chunkQuerier.Select(false, nil, matcher)
|
||||
css := chunkQuerier.Select(context.Background(), false, nil, matcher)
|
||||
|
||||
seriesCount := 0
|
||||
for css.Next() {
|
||||
|
|
|
@ -659,7 +659,7 @@ func (api *API) labelNames(r *http.Request) apiFuncResult {
|
|||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||
}
|
||||
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, returnAPIError(err), nil, nil}
|
||||
}
|
||||
|
@ -725,7 +725,7 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
|
|||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||
}
|
||||
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil}
|
||||
}
|
||||
|
@ -793,6 +793,8 @@ var (
|
|||
)
|
||||
|
||||
func (api *API) series(r *http.Request) (result apiFuncResult) {
|
||||
ctx := r.Context()
|
||||
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorBadData, errors.Wrapf(err, "error parsing form values")}, nil, nil}
|
||||
}
|
||||
|
@ -814,7 +816,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
|
|||
return invalidParamError(err, "match[]")
|
||||
}
|
||||
|
||||
q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end))
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, returnAPIError(err), nil, nil}
|
||||
}
|
||||
|
@ -841,13 +843,13 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
|
|||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
// We need to sort this select results to merge (deduplicate) the series sets later.
|
||||
s := q.Select(true, hints, mset...)
|
||||
s := q.Select(ctx, true, hints, mset...)
|
||||
sets = append(sets, s)
|
||||
}
|
||||
set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
} else {
|
||||
// At this point at least one match exists.
|
||||
set = q.Select(false, hints, matcherSets[0]...)
|
||||
set = q.Select(ctx, false, hints, matcherSets[0]...)
|
||||
}
|
||||
|
||||
metrics := []labels.Labels{}
|
||||
|
|
|
@ -993,14 +993,14 @@ func setupRemote(s storage.Storage) *httptest.Server {
|
|||
}
|
||||
}
|
||||
|
||||
querier, err := s.Querier(r.Context(), query.StartTimestampMs, query.EndTimestampMs)
|
||||
querier, err := s.Querier(query.StartTimestampMs, query.EndTimestampMs)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer querier.Close()
|
||||
|
||||
set := querier.Select(false, hints, matchers...)
|
||||
set := querier.Select(r.Context(), false, hints, matchers...)
|
||||
resp.Results[i], _, err = remote.ToQueryResult(set, 1e6)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
|
|
@ -154,11 +154,11 @@ func (t errorTestQueryable) ExemplarQuerier(ctx context.Context) (storage.Exempl
|
|||
return nil, t.err
|
||||
}
|
||||
|
||||
func (t errorTestQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
func (t errorTestQueryable) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
|
||||
return nil, t.err
|
||||
}
|
||||
|
||||
func (t errorTestQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (t errorTestQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
|
||||
if t.q != nil {
|
||||
return t.q, nil
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ func (t errorTestQuerier) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t errorTestQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
func (t errorTestQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
if t.s != nil {
|
||||
return t.s
|
||||
}
|
||||
|
|
|
@ -57,6 +57,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
ctx := req.Context()
|
||||
|
||||
if err := req.ParseForm(); err != nil {
|
||||
http.Error(w, fmt.Sprintf("error parsing form values: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
|
@ -80,7 +82,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
)
|
||||
w.Header().Set("Content-Type", string(format))
|
||||
|
||||
q, err := h.localStorage.Querier(req.Context(), mint, maxt)
|
||||
q, err := h.localStorage.Querier(mint, maxt)
|
||||
if err != nil {
|
||||
federationErrors.Inc()
|
||||
if errors.Cause(err) == tsdb.ErrNotReady {
|
||||
|
@ -98,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
|
||||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
s := q.Select(true, hints, mset...)
|
||||
s := q.Select(ctx, true, hints, mset...)
|
||||
sets = append(sets, s)
|
||||
}
|
||||
|
||||
|
|
|
@ -237,7 +237,7 @@ type notReadyReadStorage struct {
|
|||
LocalStorage
|
||||
}
|
||||
|
||||
func (notReadyReadStorage) Querier(context.Context, int64, int64) (storage.Querier, error) {
|
||||
func (notReadyReadStorage) Querier(int64, int64) (storage.Querier, error) {
|
||||
return nil, errors.Wrap(tsdb.ErrNotReady, "wrap")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue