mirror of https://github.com/prometheus/prometheus
promql: Refactored subquery hint tests and added todos. (#7636)
* promql: Refactorer subquery hint tests and added todos. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * fmt. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com> * Fixes. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>pull/7655/head
parent
50c261502e
commit
841b13641c
|
@ -650,9 +650,10 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) {
|
|||
}
|
||||
|
||||
// We need to make sure we select the timerange selected by the subquery.
|
||||
// TODO(gouthamve): cumulativeSubqueryOffset gives the sum of range and the offset
|
||||
// we can optimise it by separating out the range and offsets, and subtracting the offsets
|
||||
// from end also.
|
||||
// The cumulativeSubqueryOffset function gives the sum of range and the offset.
|
||||
// TODO(gouthamve): Consider optimising it by separating out the range and offsets, and subtracting the offsets
|
||||
// from end also. See: https://github.com/prometheus/prometheus/issues/7629.
|
||||
// TODO(bwplotka): Add support for better hints when subquerying. See: https://github.com/prometheus/prometheus/issues/7630.
|
||||
subqOffset := ng.cumulativeSubqueryOffset(path)
|
||||
offsetMilliseconds := durationMilliseconds(subqOffset)
|
||||
hints.Start = hints.Start - offsetMilliseconds
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/promql/parser"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
|
@ -218,36 +219,26 @@ func TestQueryError(t *testing.T) {
|
|||
testutil.Assert(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
|
||||
}
|
||||
|
||||
// hintCheckerQuerier implements storage.Querier which checks the start and end times
|
||||
// in hints.
|
||||
type hintCheckerQuerier struct {
|
||||
start int64
|
||||
end int64
|
||||
grouping []string
|
||||
by bool
|
||||
selRange int64
|
||||
function string
|
||||
|
||||
t *testing.T
|
||||
type noopHintRecordingQueryable struct {
|
||||
hints []*storage.SelectHints
|
||||
}
|
||||
|
||||
func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet {
|
||||
testutil.Equals(q.t, q.start, sp.Start)
|
||||
testutil.Equals(q.t, q.end, sp.End)
|
||||
testutil.Equals(q.t, q.grouping, sp.Grouping)
|
||||
testutil.Equals(q.t, q.by, sp.By)
|
||||
testutil.Equals(q.t, q.selRange, sp.Range)
|
||||
testutil.Equals(q.t, q.function, sp.Func)
|
||||
|
||||
return errSeriesSet{err: nil}
|
||||
func (h *noopHintRecordingQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) {
|
||||
return &hintRecordingQuerier{Querier: &errQuerier{}, h: h}, nil
|
||||
}
|
||||
func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
func (*hintCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*hintCheckerQuerier) Close() error { return nil }
|
||||
|
||||
func TestParamsSetCorrectly(t *testing.T) {
|
||||
type hintRecordingQuerier struct {
|
||||
storage.Querier
|
||||
|
||||
h *noopHintRecordingQueryable
|
||||
}
|
||||
|
||||
func (h *hintRecordingQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
h.h.hints = append(h.h.hints, hints)
|
||||
return h.Querier.Select(sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
func TestSelectHintsSetCorrectly(t *testing.T) {
|
||||
opts := EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
@ -256,192 +247,139 @@ func TestParamsSetCorrectly(t *testing.T) {
|
|||
LookbackDelta: 5 * time.Second,
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
for _, tc := range []struct {
|
||||
query string
|
||||
|
||||
// All times are in seconds.
|
||||
// All times are in milliseconds.
|
||||
start int64
|
||||
end int64
|
||||
|
||||
paramStart int64
|
||||
paramEnd int64
|
||||
|
||||
paramGrouping []string
|
||||
paramBy bool
|
||||
paramRange int64
|
||||
paramFunc string
|
||||
// TODO(bwplotka): Add support for better hints when subquerying.
|
||||
expected []*storage.SelectHints
|
||||
}{{
|
||||
query: "foo",
|
||||
start: 10,
|
||||
|
||||
paramStart: 5,
|
||||
paramEnd: 10,
|
||||
query: "foo", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 5000, End: 10000},
|
||||
},
|
||||
}, {
|
||||
query: "foo[2m]",
|
||||
start: 200,
|
||||
|
||||
paramStart: 80, // 200 - 120
|
||||
paramEnd: 200,
|
||||
paramRange: 120000,
|
||||
query: "foo[2m]", start: 200000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 80000, End: 200000, Range: 120000},
|
||||
},
|
||||
}, {
|
||||
query: "foo[2m] offset 2m",
|
||||
start: 300,
|
||||
|
||||
paramStart: 60,
|
||||
paramEnd: 180,
|
||||
paramRange: 120000,
|
||||
query: "foo[2m] offset 2m", start: 300000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 60000, End: 180000, Range: 120000},
|
||||
},
|
||||
}, {
|
||||
query: "foo[2m:1s]",
|
||||
start: 300,
|
||||
|
||||
paramStart: 175, // 300 - 120 - 5
|
||||
paramEnd: 300,
|
||||
query: "foo[2m:1s]", start: 300000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 175000, End: 300000},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time(foo[2m:1s])",
|
||||
start: 300,
|
||||
|
||||
paramStart: 175, // 300 - 120 - 5
|
||||
paramEnd: 300,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time(foo[2m:1s])", start: 300000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 175000, End: 300000, Func: "count_over_time"},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time(foo[2m:1s] offset 10s)",
|
||||
start: 300,
|
||||
|
||||
paramStart: 165, // 300 - 120 - 5 - 10
|
||||
paramEnd: 300,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time(foo[2m:1s] offset 10s)", start: 300000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 165000, End: 300000, Func: "count_over_time"},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
||||
start: 300,
|
||||
|
||||
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
||||
paramEnd: 290,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", start: 300000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 155000, End: 290000, Func: "count_over_time"},
|
||||
},
|
||||
}, {
|
||||
// Range queries now.
|
||||
query: "foo",
|
||||
start: 10,
|
||||
end: 20,
|
||||
|
||||
paramStart: 5,
|
||||
paramEnd: 20,
|
||||
query: "foo", start: 10000, end: 20000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 5000, End: 20000, Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "rate(foo[2m])",
|
||||
start: 200,
|
||||
end: 500,
|
||||
|
||||
paramStart: 80, // 200 - 120
|
||||
paramEnd: 500,
|
||||
paramRange: 120000,
|
||||
paramFunc: "rate",
|
||||
query: "rate(foo[2m])", start: 200000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 80000, End: 500000, Range: 120000, Func: "rate", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "rate(foo[2m] offset 2m)",
|
||||
start: 300,
|
||||
end: 500,
|
||||
|
||||
paramStart: 60,
|
||||
paramEnd: 380,
|
||||
paramRange: 120000,
|
||||
paramFunc: "rate",
|
||||
query: "rate(foo[2m] offset 2m)", start: 300000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 60000, End: 380000, Range: 120000, Func: "rate", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "rate(foo[2m:1s])",
|
||||
start: 300,
|
||||
end: 500,
|
||||
|
||||
paramStart: 175, // 300 - 120 - 5
|
||||
paramEnd: 500,
|
||||
paramFunc: "rate",
|
||||
query: "rate(foo[2m:1s])", start: 300000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 175000, End: 500000, Func: "rate", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time(foo[2m:1s])",
|
||||
start: 300,
|
||||
end: 500,
|
||||
|
||||
paramStart: 175, // 300 - 120 - 5
|
||||
paramEnd: 500,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time(foo[2m:1s])", start: 300000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 175000, End: 500000, Func: "count_over_time", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time(foo[2m:1s] offset 10s)",
|
||||
start: 300,
|
||||
end: 500,
|
||||
|
||||
paramStart: 165, // 300 - 120 - 5 - 10
|
||||
paramEnd: 500,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time(foo[2m:1s] offset 10s)", start: 300000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 165000, End: 500000, Func: "count_over_time", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)",
|
||||
start: 300,
|
||||
end: 500,
|
||||
|
||||
paramStart: 155, // 300 - 120 - 5 - 10 - 10
|
||||
paramEnd: 490,
|
||||
paramFunc: "count_over_time",
|
||||
query: "count_over_time((foo offset 10s)[2m:1s] offset 10s)", start: 300000, end: 500000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 155000, End: 490000, Func: "count_over_time", Step: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "sum by (dim1) (foo)",
|
||||
start: 10,
|
||||
|
||||
paramStart: 5,
|
||||
paramEnd: 10,
|
||||
paramGrouping: []string{"dim1"},
|
||||
paramBy: true,
|
||||
paramFunc: "sum",
|
||||
query: "sum by (dim1) (foo)", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 5000, End: 10000, Func: "sum", By: true, Grouping: []string{"dim1"}},
|
||||
},
|
||||
}, {
|
||||
query: "sum without (dim1) (foo)",
|
||||
start: 10,
|
||||
|
||||
paramStart: 5,
|
||||
paramEnd: 10,
|
||||
paramGrouping: []string{"dim1"},
|
||||
paramBy: false,
|
||||
paramFunc: "sum",
|
||||
query: "sum without (dim1) (foo)", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 5000, End: 10000, Func: "sum", Grouping: []string{"dim1"}},
|
||||
},
|
||||
}, {
|
||||
query: "sum by (dim1) (avg_over_time(foo[1s]))",
|
||||
start: 10,
|
||||
|
||||
paramStart: 9,
|
||||
paramEnd: 10,
|
||||
paramGrouping: nil,
|
||||
paramBy: false,
|
||||
paramRange: 1000,
|
||||
paramFunc: "avg_over_time",
|
||||
query: "sum by (dim1) (avg_over_time(foo[1s]))", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 9000, End: 10000, Func: "avg_over_time", Range: 1000},
|
||||
},
|
||||
}, {
|
||||
query: "sum by (dim1) (max by (dim2) (foo))",
|
||||
start: 10,
|
||||
|
||||
paramStart: 5,
|
||||
paramEnd: 10,
|
||||
paramGrouping: []string{"dim2"},
|
||||
paramBy: true,
|
||||
paramFunc: "max",
|
||||
query: "sum by (dim1) (max by (dim2) (foo))", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 5000, End: 10000, Func: "max", By: true, Grouping: []string{"dim2"}},
|
||||
},
|
||||
}, {
|
||||
query: "(max by (dim1) (foo))[5s:1s]",
|
||||
start: 10,
|
||||
query: "(max by (dim1) (foo))[5s:1s]", start: 10000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 0, End: 10000, Func: "max", By: true, Grouping: []string{"dim1"}},
|
||||
},
|
||||
}, {
|
||||
query: "(sum(http_requests{group=~\"p.*\"})+max(http_requests{group=~\"c.*\"}))[20s:5s]", start: 120000,
|
||||
expected: []*storage.SelectHints{
|
||||
{Start: 95000, End: 120000, Func: "sum", By: true},
|
||||
{Start: 95000, End: 120000, Func: "max", By: true},
|
||||
},
|
||||
}} {
|
||||
t.Run(tc.query, func(t *testing.T) {
|
||||
engine := NewEngine(opts)
|
||||
hintsRecorder := &noopHintRecordingQueryable{}
|
||||
|
||||
paramStart: 0,
|
||||
paramEnd: 10,
|
||||
paramGrouping: []string{"dim1"},
|
||||
paramBy: true,
|
||||
paramFunc: "max",
|
||||
}}
|
||||
var (
|
||||
query Query
|
||||
err error
|
||||
)
|
||||
if tc.end == 0 {
|
||||
query, err = engine.NewInstantQuery(hintsRecorder, tc.query, timestamp.Time(tc.start))
|
||||
} else {
|
||||
query, err = engine.NewRangeQuery(hintsRecorder, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
|
||||
for _, tc := range cases {
|
||||
engine := NewEngine(opts)
|
||||
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return &hintCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
|
||||
res := query.Exec(context.Background())
|
||||
testutil.Ok(t, res.Err)
|
||||
|
||||
testutil.Equals(t, tc.expected, hintsRecorder.hints)
|
||||
})
|
||||
|
||||
var (
|
||||
query Query
|
||||
err error
|
||||
)
|
||||
if tc.end == 0 {
|
||||
query, err = engine.NewInstantQuery(queryable, tc.query, time.Unix(tc.start, 0))
|
||||
} else {
|
||||
query, err = engine.NewRangeQuery(queryable, tc.query, time.Unix(tc.start, 0), time.Unix(tc.end, 0), time.Second)
|
||||
}
|
||||
testutil.Ok(t, err)
|
||||
|
||||
res := query.Exec(context.Background())
|
||||
testutil.Ok(t, res.Err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -886,22 +824,20 @@ func TestRecoverEvaluatorErrorWithWarnings(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSubquerySelector(t *testing.T) {
|
||||
tests := []struct {
|
||||
type caseType struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}
|
||||
|
||||
for _, tst := range []struct {
|
||||
loadString string
|
||||
cases []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}
|
||||
cases []caseType
|
||||
}{
|
||||
{
|
||||
loadString: `load 10s
|
||||
metric 1 2`,
|
||||
cases: []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}{
|
||||
cases: []caseType{
|
||||
{
|
||||
Query: "metric[20s:10s]",
|
||||
Result: Result{
|
||||
|
@ -1006,11 +942,7 @@ func TestSubquerySelector(t *testing.T) {
|
|||
http_requests{job="api-server", instance="1", group="production"} 0+20x1000 200+30x1000
|
||||
http_requests{job="api-server", instance="0", group="canary"} 0+30x1000 300+80x1000
|
||||
http_requests{job="api-server", instance="1", group="canary"} 0+40x2000`,
|
||||
cases: []struct {
|
||||
Query string
|
||||
Result Result
|
||||
Start time.Time
|
||||
}{
|
||||
cases: []caseType{
|
||||
{ // Normal selector.
|
||||
Query: `http_requests{group=~"pro.*",instance="0"}[30s:10s]`,
|
||||
Result: Result{
|
||||
|
@ -1111,9 +1043,7 @@ func TestSubquerySelector(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tst := range tests {
|
||||
} {
|
||||
t.Run("", func(t *testing.T) {
|
||||
test, err := NewTest(t, tst.loadString)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -1127,7 +1057,8 @@ func TestSubquerySelector(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
testutil.Equals(t, c.Result.Err, res.Err, "errors do not match for query %s", c.Query)
|
||||
|
||||
testutil.Equals(t, c.Result.Err, res.Err)
|
||||
mat := res.Value.(Matrix)
|
||||
sort.Sort(mat)
|
||||
testutil.Equals(t, c.Result.Value, mat)
|
||||
|
|
Loading…
Reference in New Issue