feat: add limitk() and limit_ratio() operators (#12503)

* rebase 2024-07-01, picks previous renaming to `limitk()` and `limit_ratio()`

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* gofumpt -d -extra

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* more lint fixes

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* more lint fixes+

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* put limitk() and limit_ratio() behind --enable-feature=promql-experimental-functions

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* EnableExperimentalFunctions for TestConcurrentRangeQueries() also

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* use testutil.RequireEqual to fix tests, WIP equivalent thingie for require.Contains

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* lint fix

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* moar linting

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* rebase 2024-06-19

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* re-add limit(2, metric) testing for N=2 common series subset

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* move `ratio = param` to default switch case, for better readability

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* gofumpt -d -extra util/testutil/cmp.go

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* early break when reaching k elems in limitk(), should have always been so (!)

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* small typo fix

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* no-change small break-loop rearrange for readability

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* remove IsNan(ratio) condition in switch-case, already handled as input validation

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* no-change adding some comments

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* no-change simplify fullMatrix() helper functions used for tests

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* add `limitk(-1, metric)` testcase, which is handled as any k < 1 case

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* engine_test.go: no-change create `requireCommonSeries() helper func (moving code into it) for readability

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* rebase 2024-06-21

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* engine_test.go: HAPPY NOW about its code -> reorg, create and use simpleRangeQuery() function, less lines and more readable ftW \o/

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* move limitk(), limit_ratio() testing to promql/promqltest/testdata/limit.test

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* remove stale leftover after moving tests from engine_test.go to testdata/

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* fix flaky `limit_ratio(0.5, ...)` test case

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* Update promql/engine.go

Co-authored-by: Julius Volz <julius.volz@gmail.com>
Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* Update promql/engine.go

Co-authored-by: Julius Volz <julius.volz@gmail.com>
Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* Update promql/engine.go

Co-authored-by: Julius Volz <julius.volz@gmail.com>
Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* fix AddRatioSample() implementation to use a single conditional (instead of switch/case + fallback return)

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* docs/querying/operators.md: document r < 0

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* add negative limit_ratio() example to docs/querying/examples.md

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* move more extensive docu examples to docs/querying/operators.md

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* typo

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* small docu fix for poor-mans-normality-check, add it to limit.test ;)

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* limit.test: expand "Poor man's normality check" to whole eval range

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* restore mistakenly removed existing small comment

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* expand poors-man-normality-check case(s)

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* Revert "expand poors-man-normality-check case(s)"

This reverts commit f69e1603b2ebe69c0a100197cfbcf6f81644b564, indeed too
flaky 0:)

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* remove humor from docs/querying/operators.md

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* fix signoff

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* add web/ui missing changes

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* expand limit_ratio test cases, cross-fingering they'll not be flaky

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* remove flaky test

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* add missing warnings.Merge(ws) in instant-query return shortcut

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* add missing LimitK||LimitRatio case to codemirror-promql/src/parser/parser.ts

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* fix ui-lint

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

* actually fix returned warnings :]

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>

---------

Signed-off-by: JuanJo Ciarlante <juanjosec@gmail.com>
Co-authored-by: Julius Volz <julius.volz@gmail.com>
pull/14425/head
JuanJo Ciarlante 2024-07-03 17:18:57 -03:00 committed by GitHub
parent 82a8c6abe2
commit c94c5b64c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 785 additions and 421 deletions

View File

@ -95,3 +95,13 @@ Assuming this metric contains one time series per running instance, you could
count the number of running instances per application like this:
count by (app) (instance_cpu_time_ns)
If we are exploring some metrics for their labels, to e.g. be able to aggregate
over some of them, we could use the following:
limitk(10, app_foo_metric_bar)
Alternatively, if we wanted the returned timeseries to be more evenly sampled,
we could use the following to get approximately 10% of them:
limit_ratio(0.1, app_foo_metric_bar)

View File

@ -230,6 +230,8 @@ vector of fewer elements with aggregated values:
* `bottomk` (smallest k elements by sample value)
* `topk` (largest k elements by sample value)
* `quantile` (calculate φ-quantile (0 ≤ φ ≤ 1) over dimensions)
* `limitk` (sample n elements)
* `limit_ratio` (sample elements with approximately 𝑟 ratio if `𝑟 > 0`, and the complement of such samples if `𝑟 = -(1.0 - 𝑟)`)
These operators can either be used to aggregate over **all** label dimensions
or preserve distinct dimensions by including a `without` or `by` clause. These
@ -249,8 +251,8 @@ all other labels are preserved in the output. `by` does the opposite and drops
labels that are not listed in the `by` clause, even if their label values are
identical between all elements of the vector.
`parameter` is only required for `count_values`, `quantile`, `topk` and
`bottomk`.
`parameter` is only required for `count_values`, `quantile`, `topk`,
`bottomk`, `limitk` and `limit_ratio`.
`count_values` outputs one time series per unique sample value. Each series has
an additional label. The name of that label is given by the aggregation
@ -261,11 +263,16 @@ time series is the number of times that sample value was present.
the input samples, including the original labels, are returned in the result
vector. `by` and `without` are only used to bucket the input vector.
`limitk` and `limit_ratio` also return a subset of the input samples,
including the original labels in the result vector, these are experimental
operators that must be enabled with `--enable-feature=promql-experimental-functions`.
`quantile` calculates the φ-quantile, the value that ranks at number φ*N among
the N metric values of the dimensions aggregated over. φ is provided as the
aggregation parameter. For example, `quantile(0.5, ...)` calculates the median,
`quantile(0.95, ...)` the 95th percentile. For φ = `NaN`, `NaN` is returned. For φ < 0, `-Inf` is returned. For φ > 1, `+Inf` is returned.
Example:
If the metric `http_requests_total` had time series that fan out by
@ -291,6 +298,33 @@ To get the 5 largest HTTP requests counts across all instances we could write:
topk(5, http_requests_total)
To sample 10 timeseries, for example to inspect labels and their values, we
could write:
limitk(10, http_requests_total)
To deterministically sample approximately 10% of timeseries we could write:
limit_ratio(0.1, http_requests_total)
Given that `limit_ratio()` implements a deterministic sampling algorithm (based
on labels' hash), you can get the _complement_ of the above samples, i.e.
approximately 90%, but precisely those not returned by `limit_ratio(0.1, ...)`
with:
limit_ratio(-0.9, http_requests_total)
You can also use this feature to e.g. verify that `avg()` is a representative
aggregation for your samples' values, by checking that the difference between
averaging two samples' subsets is "small" when compared to the standard
deviation.
abs(
avg(limit_ratio(0.5, http_requests_total))
-
avg(limit_ratio(-0.5, http_requests_total))
) <= bool stddev(http_requests_total)
## Binary operator precedence
The following list shows the precedence of binary operators in Prometheus, from

View File

@ -187,6 +187,21 @@ func rangeQueryCases() []benchCase {
{
expr: "topk(5, a_X)",
},
{
expr: "limitk(1, a_X)",
},
{
expr: "limitk(5, a_X)",
},
{
expr: "limit_ratio(0.1, a_X)",
},
{
expr: "limit_ratio(0.5, a_X)",
},
{
expr: "limit_ratio(-0.5, a_X)",
},
// Combinations.
{
expr: "rate(a_X[1m]) + rate(b_X[1m])",

View File

@ -1318,7 +1318,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
index, ok := groupToResultIndex[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK {
if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK && aggExpr.Op != parser.LIMITK && aggExpr.Op != parser.LIMIT_RATIO {
m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping)
result = append(result, Series{Metric: m})
}
@ -1331,9 +1331,10 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
groups := make([]groupedAggregation, groupCount)
var k int
var ratio float64
var seriess map[uint64]Series
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
case parser.TOPK, parser.BOTTOMK, parser.LIMITK:
if !convertibleToInt64(param) {
ev.errorf("Scalar value %v overflows int64", param)
}
@ -1345,6 +1346,23 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
return nil, warnings
}
seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash.
case parser.LIMIT_RATIO:
if math.IsNaN(param) {
ev.errorf("Ratio value %v is NaN", param)
}
switch {
case param == 0:
return nil, warnings
case param < -1.0:
ratio = -1.0
warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange()))
case param > 1.0:
ratio = 1.0
warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange()))
default:
ratio = param
}
seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash.
case parser.QUANTILE:
if math.IsNaN(param) || param < 0 || param > 1 {
warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange()))
@ -1362,11 +1380,12 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
enh.Ts = ts
var ws annotations.Annotations
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, groups, enh, seriess)
case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO:
result, ws = ev.aggregationK(aggExpr, k, ratio, inputMatrix, seriesToResult, groups, enh, seriess)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
return result, ws
warnings.Merge(ws)
return result, warnings
}
default:
ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh)
@ -1381,7 +1400,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
switch aggExpr.Op {
case parser.TOPK, parser.BOTTOMK:
case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO:
result = make(Matrix, 0, len(seriess))
for _, ss := range seriess {
result = append(result, ss)
@ -2754,14 +2773,15 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram
}
type groupedAggregation struct {
seen bool // Was this output groups seen in the input at this timestamp.
hasFloat bool // Has at least 1 float64 sample aggregated.
hasHistogram bool // Has at least 1 histogram sample aggregated.
floatValue float64
histogramValue *histogram.FloatHistogram
floatMean float64 // Mean, or "compensating value" for Kahan summation.
groupCount int
heap vectorByValueHeap
seen bool // Was this output groups seen in the input at this timestamp.
hasFloat bool // Has at least 1 float64 sample aggregated.
hasHistogram bool // Has at least 1 histogram sample aggregated.
floatValue float64
histogramValue *histogram.FloatHistogram
floatMean float64 // Mean, or "compensating value" for Kahan summation.
groupCount int
groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group
heap vectorByValueHeap
}
// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix.
@ -2958,19 +2978,22 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
return annos
}
// aggregationK evaluates topk or bottomk at one timestep on inputMatrix.
// aggregationK evaluates topk, bottomk, limitk, or limit_ratio at one timestep on inputMatrix.
// Output that has the same labels as the input, but just k of them per group.
// seriesToResult maps inputMatrix indexes to groups indexes.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio.
// For a range query, aggregates output in the seriess map.
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, r float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) {
op := e.Op
var s Sample
var annos annotations.Annotations
// Used to short-cut the loop for LIMITK if we already collected k elements for every group
groupsRemaining := len(groups)
for i := range groups {
groups[i].seen = false
}
seriesLoop:
for si := range inputMatrix {
f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si])
if !ok {
@ -2981,11 +3004,23 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
group := &groups[seriesToResult[si]]
// Initialize this group if it's the first time we've seen it.
if !group.seen {
*group = groupedAggregation{
seen: true,
heap: make(vectorByValueHeap, 1, k),
// LIMIT_RATIO is a special case, as we may not add this very sample to the heap,
// while we also don't know the final size of it.
if op == parser.LIMIT_RATIO {
*group = groupedAggregation{
seen: true,
heap: make(vectorByValueHeap, 0),
}
if ratiosampler.AddRatioSample(r, &s) {
heap.Push(&group.heap, &s)
}
} else {
*group = groupedAggregation{
seen: true,
heap: make(vectorByValueHeap, 1, k),
}
group.heap[0] = s
}
group.heap[0] = s
continue
}
@ -3016,6 +3051,26 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
}
}
case parser.LIMITK:
if len(group.heap) < k {
heap.Push(&group.heap, &s)
}
// LIMITK optimization: early break if we've added K elem to _every_ group,
// especially useful for large timeseries where the user is exploring labels via e.g.
// limitk(10, my_metric)
if !group.groupAggrComplete && len(group.heap) == k {
group.groupAggrComplete = true
groupsRemaining--
if groupsRemaining == 0 {
break seriesLoop
}
}
case parser.LIMIT_RATIO:
if ratiosampler.AddRatioSample(r, &s) {
heap.Push(&group.heap, &s)
}
default:
panic(fmt.Errorf("expected aggregation operator but got %q", op))
}
@ -3065,6 +3120,11 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma
for _, v := range aggr.heap {
add(v.Metric, v.F)
}
case parser.LIMITK, parser.LIMIT_RATIO:
for _, v := range aggr.heap {
add(v.Metric, v.F)
}
}
}
@ -3419,6 +3479,56 @@ func makeInt64Pointer(val int64) *int64 {
return valp
}
// Add RatioSampler interface to allow unit-testing (previously: Randomizer).
type RatioSampler interface {
// Return this sample "offset" between [0.0, 1.0]
sampleOffset(ts int64, sample *Sample) float64
AddRatioSample(r float64, sample *Sample) bool
}
// Use Hash(labels.String()) / maxUint64 as a "deterministic"
// value in [0.0, 1.0].
type HashRatioSampler struct{}
var ratiosampler RatioSampler = NewHashRatioSampler()
func NewHashRatioSampler() *HashRatioSampler {
return &HashRatioSampler{}
}
func (s *HashRatioSampler) sampleOffset(ts int64, sample *Sample) float64 {
const (
float64MaxUint64 = float64(math.MaxUint64)
)
return float64(sample.Metric.Hash()) / float64MaxUint64
}
func (s *HashRatioSampler) AddRatioSample(ratioLimit float64, sample *Sample) bool {
// If ratioLimit >= 0: add sample if sampleOffset is lesser than ratioLimit
//
// 0.0 ratioLimit 1.0
// [---------|--------------------------]
// [#########...........................]
//
// e.g.:
// sampleOffset==0.3 && ratioLimit==0.4
// 0.3 < 0.4 ? --> add sample
//
// Else if ratioLimit < 0: add sample if rand() return the "complement" of ratioLimit>=0 case
// (loosely similar behavior to negative array index in other programming languages)
//
// 0.0 1+ratioLimit 1.0
// [---------|--------------------------]
// [.........###########################]
//
// e.g.:
// sampleOffset==0.3 && ratioLimit==-0.6
// 0.3 >= 0.4 ? --> don't add sample
sampleOffset := s.sampleOffset(sample.T, sample)
return (ratioLimit >= 0 && sampleOffset < ratioLimit) ||
(ratioLimit < 0 && sampleOffset >= (1.0+ratioLimit))
}
type histogramStatsSeries struct {
storage.Series
}

View File

@ -49,6 +49,8 @@ const (
)
func TestMain(m *testing.M) {
// Enable experimental functions testing
parser.EnableExperimentalFunctions = true
goleak.VerifyTestMain(m)
}

View File

@ -126,6 +126,8 @@ STDDEV
STDVAR
SUM
TOPK
LIMITK
LIMIT_RATIO
%token aggregatorsEnd
// Keywords.
@ -609,7 +611,7 @@ metric : metric_identifier label_set
;
metric_identifier: AVG | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | IDENTIFIER | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | QUANTILE | STDDEV | STDVAR | SUM | TOPK | WITHOUT | START | END;
metric_identifier: AVG | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | IDENTIFIER | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | QUANTILE | STDDEV | STDVAR | SUM | TOPK | WITHOUT | START | END | LIMITK | LIMIT_RATIO;
label_set : LEFT_BRACE label_set_list RIGHT_BRACE
{ $$ = labels.New($2...) }
@ -851,10 +853,10 @@ bucket_set_list : bucket_set_list SPACE number
* Keyword lists.
*/
aggregate_op : AVG | BOTTOMK | COUNT | COUNT_VALUES | GROUP | MAX | MIN | QUANTILE | STDDEV | STDVAR | SUM | TOPK ;
aggregate_op : AVG | BOTTOMK | COUNT | COUNT_VALUES | GROUP | MAX | MIN | QUANTILE | STDDEV | STDVAR | SUM | TOPK | LIMITK | LIMIT_RATIO;
// Inside of grouping options label names can be recognized as keywords by the lexer. This is a list of keywords that could also be a label name.
maybe_label : AVG | BOOL | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | GROUP_LEFT | GROUP_RIGHT | IDENTIFIER | IGNORING | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | ON | QUANTILE | STDDEV | STDVAR | SUM | TOPK | START | END | ATAN2;
maybe_label : AVG | BOOL | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | GROUP_LEFT | GROUP_RIGHT | IDENTIFIER | IGNORING | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | ON | QUANTILE | STDDEV | STDVAR | SUM | TOPK | START | END | ATAN2 | LIMITK | LIMIT_RATIO;
unary_op : ADD | SUB;

File diff suppressed because it is too large Load Diff

View File

@ -65,7 +65,7 @@ func (i ItemType) IsAggregator() bool { return i > aggregatorsStart && i < aggre
// IsAggregatorWithParam returns true if the Item is an aggregator that takes a parameter.
// Returns false otherwise.
func (i ItemType) IsAggregatorWithParam() bool {
return i == TOPK || i == BOTTOMK || i == COUNT_VALUES || i == QUANTILE
return i == TOPK || i == BOTTOMK || i == COUNT_VALUES || i == QUANTILE || i == LIMITK || i == LIMIT_RATIO
}
// IsKeyword returns true if the Item corresponds to a keyword.
@ -118,6 +118,8 @@ var key = map[string]ItemType{
"bottomk": BOTTOMK,
"count_values": COUNT_VALUES,
"quantile": QUANTILE,
"limitk": LIMITK,
"limit_ratio": LIMIT_RATIO,
// Keywords.
"offset": OFFSET,

View File

@ -447,6 +447,10 @@ func (p *parser) newAggregateExpr(op Item, modifier, args Node) (ret *AggregateE
desiredArgs := 1
if ret.Op.IsAggregatorWithParam() {
if !EnableExperimentalFunctions && (ret.Op == LIMITK || ret.Op == LIMIT_RATIO) {
p.addParseErrf(ret.PositionRange(), "limitk() and limit_ratio() are experimental and must be enabled with --enable-feature=promql-experimental-functions")
return
}
desiredArgs = 2
ret.Param = arguments[0]
@ -672,7 +676,7 @@ func (p *parser) checkAST(node Node) (typ ValueType) {
p.addParseErrf(n.PositionRange(), "aggregation operator expected in aggregation expression but got %q", n.Op)
}
p.expectType(n.Expr, ValueTypeVector, "aggregation expression")
if n.Op == TOPK || n.Op == BOTTOMK || n.Op == QUANTILE {
if n.Op == TOPK || n.Op == BOTTOMK || n.Op == QUANTILE || n.Op == LIMITK || n.Op == LIMIT_RATIO {
p.expectType(n.Param, ValueTypeScalar, "aggregation parameter")
}
if n.Op == COUNT_VALUES {

View File

@ -23,6 +23,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/util/teststorage"
)
@ -45,6 +46,8 @@ func TestConcurrentRangeQueries(t *testing.T) {
MaxSamples: 50000000,
Timeout: 100 * time.Second,
}
// Enable experimental functions testing
parser.EnableExperimentalFunctions = true
engine := promql.NewEngine(opts)
const interval = 10000 // 10s interval.

119
promql/promqltest/testdata/limit.test vendored Normal file
View File

@ -0,0 +1,119 @@
# Tests for limitk
#
# NB: those many `and http_requests` are to ensure that the series _are_ indeed
# a subset of the original series.
load 5m
http_requests{job="api-server", instance="0", group="production"} 0+10x10
http_requests{job="api-server", instance="1", group="production"} 0+20x10
http_requests{job="api-server", instance="0", group="canary"} 0+30x10
http_requests{job="api-server", instance="1", group="canary"} 0+40x10
http_requests{job="api-server", instance="2", group="canary"} 0+50x10
http_requests{job="api-server", instance="3", group="canary"} 0+60x10
eval instant at 50m count(limitk by (group) (0, http_requests))
# empty
eval instant at 50m count(limitk by (group) (-1, http_requests))
# empty
# Exercise k==1 special case (as sample is added before the main series loop
eval instant at 50m count(limitk by (group) (1, http_requests) and http_requests)
{} 2
eval instant at 50m count(limitk by (group) (2, http_requests) and http_requests)
{} 4
eval instant at 50m count(limitk(100, http_requests) and http_requests)
{} 6
# Exercise k==1 special case (as sample is added before the main series loop
eval instant at 50m count(limitk by (group) (1, http_requests) and http_requests)
{} 2
eval instant at 50m count(limitk by (group) (2, http_requests) and http_requests)
{} 4
eval instant at 50m count(limitk(100, http_requests) and http_requests)
{} 6
# limit_ratio
eval range from 0 to 50m step 5m count(limit_ratio(0.0, http_requests))
# empty
# limitk(2, ...) should always return a 2-count subset of the timeseries (hence the AND'ing)
eval range from 0 to 50m step 5m count(limitk(2, http_requests) and http_requests)
{} 2+0x10
# Tests for limit_ratio
#
# NB: below 0.5 ratio will depend on some hashing "luck" (also there's no guarantee that
# an integer comes from: total number of series * ratio), as it depends on:
#
# * ratioLimit = [0.0, 1.0]:
# float64(sample.Metric.Hash()) / float64MaxUint64 < Ratio ?
# * ratioLimit = [-1.0, 1.0):
# float64(sample.Metric.Hash()) / float64MaxUint64 >= (1.0 + Ratio) ?
#
# See `AddRatioSample()` in promql/engine.go for more details.
# Half~ish samples: verify we get "near" 3 (of 0.5 * 6)
eval range from 0 to 50m step 5m count(limit_ratio(0.5, http_requests) and http_requests) <= bool (3+1)
{} 1+0x10
eval range from 0 to 50m step 5m count(limit_ratio(0.5, http_requests) and http_requests) >= bool (3-1)
{} 1+0x10
# All samples
eval range from 0 to 50m step 5m count(limit_ratio(1.0, http_requests) and http_requests)
{} 6+0x10
# All samples
eval range from 0 to 50m step 5m count(limit_ratio(-1.0, http_requests) and http_requests)
{} 6+0x10
# Capped to 1.0 -> all samples
eval_warn range from 0 to 50m step 5m count(limit_ratio(1.1, http_requests) and http_requests)
{} 6+0x10
# Capped to -1.0 -> all samples
eval_warn range from 0 to 50m step 5m count(limit_ratio(-1.1, http_requests) and http_requests)
{} 6+0x10
# Verify that limit_ratio(value) and limit_ratio(1.0-value) return the "complement" of each other
# Complement below for [0.2, -0.8]
#
# Complement 1of2: `or` should return all samples
eval range from 0 to 50m step 5m count(limit_ratio(0.2, http_requests) or limit_ratio(-0.8, http_requests))
{} 6+0x10
# Complement 2of2: `and` should return no samples
eval range from 0 to 50m step 5m count(limit_ratio(0.2, http_requests) and limit_ratio(-0.8, http_requests))
# empty
# Complement below for [0.5, -0.5]
eval range from 0 to 50m step 5m count(limit_ratio(0.5, http_requests) or limit_ratio(-0.5, http_requests))
{} 6+0x10
eval range from 0 to 50m step 5m count(limit_ratio(0.5, http_requests) and limit_ratio(-0.5, http_requests))
# empty
# Complement below for [0.8, -0.2]
eval range from 0 to 50m step 5m count(limit_ratio(0.8, http_requests) or limit_ratio(-0.2, http_requests))
{} 6+0x10
eval range from 0 to 50m step 5m count(limit_ratio(0.8, http_requests) and limit_ratio(-0.2, http_requests))
# empty
# Complement below for [some_ratio, 1.0 - some_ratio], some_ratio derived from time(),
# using a small prime number to avoid rounded ratio values, and a small set of them.
eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) or limit_ratio(1.0 - (time() % 17/17), http_requests))
{} 6+0x10
eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) and limit_ratio(1.0 - (time() % 17/17), http_requests))
# empty
# Poor man's normality check: ok (loaded samples follow a nice linearity over labels and time)
# The check giving: 1 (i.e. true)
eval range from 0 to 50m step 5m abs(avg(limit_ratio(0.5, http_requests)) - avg(limit_ratio(-0.5, http_requests))) <= bool stddev(http_requests)
{} 1+0x10

View File

@ -116,6 +116,7 @@ var (
PromQLInfo = errors.New("PromQL info")
PromQLWarning = errors.New("PromQL warning")
InvalidRatioWarning = fmt.Errorf("%w: ratio value should be between -1 and 1", PromQLWarning)
InvalidQuantileWarning = fmt.Errorf("%w: quantile value should be between 0 and 1", PromQLWarning)
BadBucketLabelWarning = fmt.Errorf("%w: bucket label %q is missing or has a malformed value", PromQLWarning, model.BucketLabel)
MixedFloatsHistogramsWarning = fmt.Errorf("%w: encountered a mix of histograms and floats for", PromQLWarning)
@ -155,6 +156,15 @@ func NewInvalidQuantileWarning(q float64, pos posrange.PositionRange) error {
}
}
// NewInvalidQuantileWarning is used when the user specifies an invalid ratio
// value, i.e. a float that is outside the range [-1, 1] or NaN.
func NewInvalidRatioWarning(q, to float64, pos posrange.PositionRange) error {
return annoErr{
PositionRange: pos,
Err: fmt.Errorf("%w, got %g, capping to %g", InvalidRatioWarning, q, to),
}
}
// NewBadBucketLabelWarning is used when there is an error parsing the bucket label
// of a classic histogram.
func NewBadBucketLabelWarning(metricName, label string, pos posrange.PositionRange) error {

View File

@ -544,6 +544,18 @@ export const aggregateOpTerms = [
info: 'Group series, while setting the sample value to 1',
type: 'keyword',
},
{
label: 'limitk',
detail: 'aggregation',
info: 'Sample k elements',
type: 'keyword',
},
{
label: 'limit_ratio',
detail: 'aggregation',
info: 'Sample given ratio of elements',
type: 'keyword',
},
{
label: 'max',
detail: 'aggregation',

View File

@ -28,6 +28,8 @@ import {
Gtr,
Identifier,
LabelMatchers,
LimitK,
LimitRatio,
Lss,
Lte,
MatrixSelector,
@ -167,7 +169,13 @@ export class Parser {
}
this.expectType(params[params.length - 1], ValueType.vector, 'aggregation expression');
// get the parameter of the aggregation operator
if (aggregateOp.type.id === Topk || aggregateOp.type.id === Bottomk || aggregateOp.type.id === Quantile) {
if (
aggregateOp.type.id === Topk ||
aggregateOp.type.id === Bottomk ||
aggregateOp.type.id === LimitK ||
aggregateOp.type.id === LimitRatio ||
aggregateOp.type.id === Quantile
) {
this.expectType(params[0], ValueType.scalar, 'aggregation parameter');
}
if (aggregateOp.type.id === CountValues) {

View File

@ -22,7 +22,7 @@ export const promQLHighLight = styleTags({
Identifier: tags.variableName,
'Abs Absent AbsentOverTime Acos Acosh Asin Asinh Atan Atanh AvgOverTime Ceil Changes Clamp ClampMax ClampMin Cos Cosh CountOverTime DaysInMonth DayOfMonth DayOfWeek DayOfYear Deg Delta Deriv Exp Floor HistogramAvg HistogramCount HistogramFraction HistogramQuantile HistogramSum HoltWinters Hour Idelta Increase Irate LabelReplace LabelJoin LastOverTime Ln Log10 Log2 MaxOverTime MinOverTime Minute Month Pi PredictLinear PresentOverTime QuantileOverTime Rad Rate Resets Round Scalar Sgn Sin Sinh Sort SortDesc SortByLabel SortByLabelDesc Sqrt StddevOverTime StdvarOverTime SumOverTime Tan Tanh Time Timestamp Vector Year':
tags.function(tags.variableName),
'Avg Bottomk Count Count_values Group Max Min Quantile Stddev Stdvar Sum Topk': tags.operatorKeyword,
'Avg Bottomk Count Count_values Group LimitK LimitRatio Max Min Quantile Stddev Stdvar Sum Topk': tags.operatorKeyword,
'By Without Bool On Ignoring GroupLeft GroupRight Offset Start End': tags.modifier,
'And Unless Or': tags.logicOperator,
'Sub Add Mul Mod Div Atan2 Eql Neq Lte Lss Gte Gtr EqlRegex EqlSingle NeqRegex Pow At': tags.operator,

View File

@ -54,6 +54,8 @@ AggregateOp {
Max |
Min |
Quantile |
LimitK |
LimitRatio |
Stddev |
Stdvar |
Sum |
@ -330,6 +332,8 @@ NumberLiteral {
Max,
Min,
Quantile,
LimitK,
LimitRatio,
Stddev,
Stdvar,
Sum,

View File

@ -33,6 +33,8 @@ import {
On,
Or,
Quantile,
LimitK,
LimitRatio,
Start,
Stddev,
Stdvar,
@ -67,6 +69,8 @@ const contextualKeywordTokens = {
max: Max,
min: Min,
quantile: Quantile,
limitk: LimitK,
limit_ratio: LimitRatio,
stddev: Stddev,
stdvar: Stdvar,
sum: Sum,