From 15fa680117dc7cc7f8046daf4a7258868385223b Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:35:02 +1000 Subject: [PATCH 01/12] Add benchmark for query using timestamp() Signed-off-by: Charles Korn --- promql/bench_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/promql/bench_test.go b/promql/bench_test.go index 6818498bf..fb5f3a06d 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -186,6 +186,10 @@ func rangeQueryCases() []benchCase { expr: "count({__name__!=\"\",l=\"\"})", steps: 1, }, + // timestamp() function + { + expr: "timestamp(a_X)", + }, } // X in an expr will be replaced by different metric sizes. From a2a2cc757e48de31cfbb5e099814232712e1c8b2 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:08:48 +1000 Subject: [PATCH 02/12] Extract timestamp special case to its own method. Signed-off-by: Charles Korn --- promql/engine.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 83bbdeff8..64f69df5a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1387,15 +1387,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { - if vs.Timestamp != nil { - // This is a special case only for "timestamp" since the offset - // needs to be adjusted for every point. - vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond - } - val, ws := ev.vectorSelector(vs, enh.Ts) - return call([]parser.Value{val}, e.Args, enh), ws - }) + return ev.evalTimestampFunctionOverVectorSelector(vs, call, e) } } @@ -1833,6 +1825,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(fmt.Errorf("unhandled expression of type: %T", expr)) } +func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + if vs.Timestamp != nil { + // This is a special case only for "timestamp" since the offset + // needs to be adjusted for every point. + vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond + } + val, ws := ev.vectorSelector(vs, enh.Ts) + return call([]parser.Value{val}, e.Args, enh), ws + }) +} + // vectorSelector evaluates a *parser.VectorSelector expression. func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, node) From eeface2e1779d8593cdb859ea637d6aa05595318 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:09:58 +1000 Subject: [PATCH 03/12] Inline method Signed-off-by: Charles Korn --- promql/engine.go | 60 ++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 64f69df5a..2702e1779 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1832,43 +1832,37 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe // needs to be adjusted for every point. vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } - val, ws := ev.vectorSelector(vs, enh.Ts) - return call([]parser.Value{val}, e.Args, enh), ws - }) -} - -// vectorSelector evaluates a *parser.VectorSelector expression. -func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { - ws, err := checkAndExpandSeriesSet(ev.ctx, node) - if err != nil { - ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) - } - vec := make(Vector, 0, len(node.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) - var chkIter chunkenc.Iterator - for i, s := range node.Series { - chkIter = s.Iterator(chkIter) - it.Reset(chkIter) + ws, err := checkAndExpandSeriesSet(ev.ctx, vs) + if err != nil { + ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) + } + vec := make(Vector, 0, len(vs.Series)) + it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + var chkIter chunkenc.Iterator + for i, s := range vs.Series { + chkIter = s.Iterator(chkIter) + it.Reset(chkIter) - t, f, h, ok := ev.vectorSelectorSingle(it, node, ts) - if ok { - vec = append(vec, Sample{ - Metric: node.Series[i].Labels(), - T: t, - F: f, - H: h, - }) + t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) + if ok { + vec = append(vec, Sample{ + Metric: vs.Series[i].Labels(), + T: t, + F: f, + H: h, + }) - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1) - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } } - } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - return vec, ws + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + return call([]parser.Value{vec}, e.Args, enh), ws + }) } // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. From a14299805208ed7da43e482cf04055635bfb634a Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:11:01 +1000 Subject: [PATCH 04/12] Expand series set just once Signed-off-by: Charles Korn --- promql/engine.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2702e1779..1d7483d2f 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1826,16 +1826,17 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { + ws, err := checkAndExpandSeriesSet(ev.ctx, vs) + if err != nil { + ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) + } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { if vs.Timestamp != nil { // This is a special case only for "timestamp" since the offset // needs to be adjusted for every point. vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } - ws, err := checkAndExpandSeriesSet(ev.ctx, vs) - if err != nil { - ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) - } vec := make(Vector, 0, len(vs.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator From b114c0888d295e97f0a531fc8547cc44c836149c Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:12:34 +1000 Subject: [PATCH 05/12] Simplify loop Signed-off-by: Charles Korn --- promql/engine.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 1d7483d2f..6fbaba505 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1840,14 +1840,14 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe vec := make(Vector, 0, len(vs.Series)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator - for i, s := range vs.Series { + for _, s := range vs.Series { chkIter = s.Iterator(chkIter) it.Reset(chkIter) t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) if ok { vec = append(vec, Sample{ - Metric: vs.Series[i].Labels(), + Metric: s.Labels(), T: t, F: f, H: h, @@ -1859,7 +1859,6 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe ev.error(ErrTooManySamples(env)) } } - } ev.samplesStats.UpdatePeak(ev.currentSamples) return call([]parser.Value{vec}, e.Args, enh), ws From 993618adea442fb8ba19a3c36a3c01c434b3cfc5 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:13:58 +1000 Subject: [PATCH 06/12] Don't create a new iterator for every time step. Signed-off-by: Charles Korn --- promql/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/promql/engine.go b/promql/engine.go index 6fbaba505..ed801a683 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1830,6 +1830,7 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } + it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { if vs.Timestamp != nil { @@ -1838,7 +1839,6 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } vec := make(Vector, 0, len(vs.Series)) - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for _, s := range vs.Series { chkIter = s.Iterator(chkIter) From fde6ebb17df9ee50e4a18533169b9ef84882a99f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 28 Jun 2023 15:27:44 +1000 Subject: [PATCH 07/12] Create per-series iterators only once per selector, rather than recreating it for each time step. Signed-off-by: Charles Korn --- promql/engine.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index ed801a683..b6c856ba7 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1830,7 +1830,12 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } - it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) + + seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series)) + for i, s := range vs.Series { + it := s.Iterator(nil) + seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) + } return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { if vs.Timestamp != nil { @@ -1838,12 +1843,10 @@ func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSe // needs to be adjusted for every point. vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } - vec := make(Vector, 0, len(vs.Series)) - var chkIter chunkenc.Iterator - for _, s := range vs.Series { - chkIter = s.Iterator(chkIter) - it.Reset(chkIter) + vec := make(Vector, 0, len(vs.Series)) + for i, s := range vs.Series { + it := seriesIterators[i] t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) if ok { vec = append(vec, Sample{ From 6903d6edd882a8439a5e96dde028def9a7a46d8c Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 29 Jun 2023 13:34:26 +1000 Subject: [PATCH 08/12] Add test to confirm `timestamp()` behaves correctly when evaluating a range query. Signed-off-by: Charles Korn --- promql/engine_test.go | 94 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/promql/engine_test.go b/promql/engine_test.go index ca4a022e0..1b14e8a5d 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1977,6 +1977,100 @@ func TestSubquerySelector(t *testing.T) { } } +func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) { + test, err := NewTest(t, ` +load 1m + metric 0+1x1000 +`) + require.NoError(t, err) + defer test.Close() + + err = test.Run() + require.NoError(t, err) + + query := "timestamp(metric)" + start := time.Unix(0, 0) + end := time.Unix(61, 0) + interval := time.Second + + expectedResult := Matrix{ + Series{ + Floats: []FPoint{ + {F: 0, T: 0}, + {F: 0, T: 1_000}, + {F: 0, T: 2_000}, + {F: 0, T: 3_000}, + {F: 0, T: 4_000}, + {F: 0, T: 5_000}, + {F: 0, T: 6_000}, + {F: 0, T: 7_000}, + {F: 0, T: 8_000}, + {F: 0, T: 9_000}, + {F: 0, T: 10_000}, + {F: 0, T: 11_000}, + {F: 0, T: 12_000}, + {F: 0, T: 13_000}, + {F: 0, T: 14_000}, + {F: 0, T: 15_000}, + {F: 0, T: 16_000}, + {F: 0, T: 17_000}, + {F: 0, T: 18_000}, + {F: 0, T: 19_000}, + {F: 0, T: 20_000}, + {F: 0, T: 21_000}, + {F: 0, T: 22_000}, + {F: 0, T: 23_000}, + {F: 0, T: 24_000}, + {F: 0, T: 25_000}, + {F: 0, T: 26_000}, + {F: 0, T: 27_000}, + {F: 0, T: 28_000}, + {F: 0, T: 29_000}, + {F: 0, T: 30_000}, + {F: 0, T: 31_000}, + {F: 0, T: 32_000}, + {F: 0, T: 33_000}, + {F: 0, T: 34_000}, + {F: 0, T: 35_000}, + {F: 0, T: 36_000}, + {F: 0, T: 37_000}, + {F: 0, T: 38_000}, + {F: 0, T: 39_000}, + {F: 0, T: 40_000}, + {F: 0, T: 41_000}, + {F: 0, T: 42_000}, + {F: 0, T: 43_000}, + {F: 0, T: 44_000}, + {F: 0, T: 45_000}, + {F: 0, T: 46_000}, + {F: 0, T: 47_000}, + {F: 0, T: 48_000}, + {F: 0, T: 49_000}, + {F: 0, T: 50_000}, + {F: 0, T: 51_000}, + {F: 0, T: 52_000}, + {F: 0, T: 53_000}, + {F: 0, T: 54_000}, + {F: 0, T: 55_000}, + {F: 0, T: 56_000}, + {F: 0, T: 57_000}, + {F: 0, T: 58_000}, + {F: 0, T: 59_000}, + {F: 60, T: 60_000}, + {F: 60, T: 61_000}, + }, + Metric: labels.EmptyLabels(), + }, + } + + qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, query, start, end, interval) + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.NoError(t, res.Err) + require.Equal(t, expectedResult, res.Value) +} + type FakeQueryLogger struct { closed bool logs []interface{} From fb3935e8f96a6b97cbf7c8637655ca724b727021 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 1 Aug 2023 13:08:57 +1000 Subject: [PATCH 09/12] Address PR feedback: rename method Signed-off-by: Charles Korn --- promql/engine.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index b6c856ba7..ac8c0f566 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1387,7 +1387,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.evalTimestampFunctionOverVectorSelector(vs, call, e) + return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) } } @@ -1825,7 +1825,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(fmt.Errorf("unhandled expression of type: %T", expr)) } -func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { +func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) From 6087c555eda562ee2702540176bbd26e4287e6ef Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 1 Aug 2023 13:17:49 +1000 Subject: [PATCH 10/12] Address PR feedback: clarify comment Signed-off-by: Charles Korn --- promql/engine.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index ac8c0f566..37e9d425b 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1839,8 +1839,9 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { if vs.Timestamp != nil { - // This is a special case only for "timestamp" since the offset - // needs to be adjusted for every point. + // This is a special case for "timestamp()" when the @ modifier is used, to ensure that + // we return a point for each time step in this case. + // See https://github.com/prometheus/prometheus/issues/8433. vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } From 145d7457feb4a9b6d6c597e173c6a03e54f4ecfc Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 1 Aug 2023 13:27:46 +1000 Subject: [PATCH 11/12] Address PR feedback: use loop to create expected test result Signed-off-by: Charles Korn --- promql/engine_test.go | 78 ++++++++----------------------------------- 1 file changed, 14 insertions(+), 64 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index 1b14e8a5d..54567d154 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -1993,72 +1993,22 @@ load 1m end := time.Unix(61, 0) interval := time.Second + // We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s. + expectedPoints := []FPoint{} + + for t := 0; t <= 59; t++ { + expectedPoints = append(expectedPoints, FPoint{F: 0, T: int64(t * 1000)}) + } + + expectedPoints = append( + expectedPoints, + FPoint{F: 60, T: 60_000}, + FPoint{F: 60, T: 61_000}, + ) + expectedResult := Matrix{ Series{ - Floats: []FPoint{ - {F: 0, T: 0}, - {F: 0, T: 1_000}, - {F: 0, T: 2_000}, - {F: 0, T: 3_000}, - {F: 0, T: 4_000}, - {F: 0, T: 5_000}, - {F: 0, T: 6_000}, - {F: 0, T: 7_000}, - {F: 0, T: 8_000}, - {F: 0, T: 9_000}, - {F: 0, T: 10_000}, - {F: 0, T: 11_000}, - {F: 0, T: 12_000}, - {F: 0, T: 13_000}, - {F: 0, T: 14_000}, - {F: 0, T: 15_000}, - {F: 0, T: 16_000}, - {F: 0, T: 17_000}, - {F: 0, T: 18_000}, - {F: 0, T: 19_000}, - {F: 0, T: 20_000}, - {F: 0, T: 21_000}, - {F: 0, T: 22_000}, - {F: 0, T: 23_000}, - {F: 0, T: 24_000}, - {F: 0, T: 25_000}, - {F: 0, T: 26_000}, - {F: 0, T: 27_000}, - {F: 0, T: 28_000}, - {F: 0, T: 29_000}, - {F: 0, T: 30_000}, - {F: 0, T: 31_000}, - {F: 0, T: 32_000}, - {F: 0, T: 33_000}, - {F: 0, T: 34_000}, - {F: 0, T: 35_000}, - {F: 0, T: 36_000}, - {F: 0, T: 37_000}, - {F: 0, T: 38_000}, - {F: 0, T: 39_000}, - {F: 0, T: 40_000}, - {F: 0, T: 41_000}, - {F: 0, T: 42_000}, - {F: 0, T: 43_000}, - {F: 0, T: 44_000}, - {F: 0, T: 45_000}, - {F: 0, T: 46_000}, - {F: 0, T: 47_000}, - {F: 0, T: 48_000}, - {F: 0, T: 49_000}, - {F: 0, T: 50_000}, - {F: 0, T: 51_000}, - {F: 0, T: 52_000}, - {F: 0, T: 53_000}, - {F: 0, T: 54_000}, - {F: 0, T: 55_000}, - {F: 0, T: 56_000}, - {F: 0, T: 57_000}, - {F: 0, T: 58_000}, - {F: 0, T: 59_000}, - {F: 60, T: 60_000}, - {F: 60, T: 61_000}, - }, + Floats: expectedPoints, Metric: labels.EmptyLabels(), }, } From d39628294195a9328bc02af815b1deb264a65576 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 2 Aug 2023 11:48:17 +1000 Subject: [PATCH 12/12] Address PR feedback: clarify comment Signed-off-by: Charles Korn --- promql/bench_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index fb5f3a06d..c6a528f7b 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -186,7 +186,7 @@ func rangeQueryCases() []benchCase { expr: "count({__name__!=\"\",l=\"\"})", steps: 1, }, - // timestamp() function + // Functions which have special handling inside eval() { expr: "timestamp(a_X)", },