Merge pull request #12190 from bboreham/faster-topk

promql: use faster heap method for topk/bottomk
pull/9413/merge
Bryan Boreham 2023-03-30 14:05:53 +01:00 committed by GitHub
commit 1bb6b8b309
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 22 deletions

View File

@ -157,6 +157,9 @@ func rangeQueryCases() []benchCase {
{
expr: "topk(1, a_X)",
},
{
expr: "topk(5, a_X)",
},
// Combinations.
{
expr: "rate(a_X[1m]) + rate(b_X[1m])",

View File

@ -2507,39 +2507,39 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without
group.value += delta * (s.V - group.mean)
case parser.TOPK:
if int64(len(group.heap)) < k || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) {
if int64(len(group.heap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.heap[0] = Sample{
Point: Point{V: s.V},
Metric: s.Metric,
}
break
}
heap.Pop(&group.heap)
}
// We build a heap of up to k elements, with the smallest element at heap[0].
if int64(len(group.heap)) < k {
heap.Push(&group.heap, &Sample{
Point: Point{V: s.V},
Metric: s.Metric,
})
}
case parser.BOTTOMK:
if int64(len(group.reverseHeap)) < k || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) {
if int64(len(group.reverseHeap)) == k {
if k == 1 { // For k==1 we can replace in-situ.
group.reverseHeap[0] = Sample{
} else if group.heap[0].V < s.V || (math.IsNaN(group.heap[0].V) && !math.IsNaN(s.V)) {
// This new element is bigger than the previous smallest element - overwrite that.
group.heap[0] = Sample{
Point: Point{V: s.V},
Metric: s.Metric,
}
break
if k > 1 {
heap.Fix(&group.heap, 0) // Maintain the heap invariant.
}
heap.Pop(&group.reverseHeap)
}
case parser.BOTTOMK:
// We build a heap of up to k elements, with the biggest element at heap[0].
if int64(len(group.reverseHeap)) < k {
heap.Push(&group.reverseHeap, &Sample{
Point: Point{V: s.V},
Metric: s.Metric,
})
} else if group.reverseHeap[0].V > s.V || (math.IsNaN(group.reverseHeap[0].V) && !math.IsNaN(s.V)) {
// This new element is smaller than the previous biggest element - overwrite that.
group.reverseHeap[0] = Sample{
Point: Point{V: s.V},
Metric: s.Metric,
}
if k > 1 {
heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant.
}
}
case parser.QUANTILE: