Browse Source

histogram: Remove code replication via generics (#11361)

* histogram: Simplify iterators

We don't really need currLower and currUpper and can calculate it when
needed (as already done for the floatBucketIterator). The calculation
is cheap, while keeping those extra variables around costs RAM
(potentially a lot with many iterators).

* histogram: Convert Bucket/FloatBucket to one generic type

* histogram: Move some bucket iterator code into generic base iterator

* histogram: Remove cumulative iterator for FloatHistogram

We added it in the past for completeness (Histogram has one), but it
has never been used. Plus, even the cumulative iterator for Histogram
is only there for test reasons.

We can always add it back, and then maybe even using generics.

Signed-off-by: beorn7 <beorn@grafana.com>
pull/11418/head
Björn Rabenstein 2 years ago committed by GitHub
parent
commit
dccfb9db4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 347
      model/histogram/float_histogram.go
  2. 10
      model/histogram/float_histogram_test.go
  3. 110
      model/histogram/generic.go
  4. 112
      model/histogram/histogram.go
  5. 52
      model/histogram/histogram_test.go
  6. 2
      promql/quantile.go

347
model/histogram/float_histogram.go

@ -15,7 +15,6 @@ package histogram
import (
"fmt"
"math"
"strings"
)
@ -121,7 +120,7 @@ func (h *FloatHistogram) String() string {
var sb strings.Builder
fmt.Fprintf(&sb, "{count:%g, sum:%g", h.Count, h.Sum)
var nBuckets []FloatBucket
var nBuckets []Bucket[float64]
for it := h.NegativeBucketIterator(); it.Next(); {
bucket := it.At()
if bucket.Count != 0 {
@ -148,8 +147,8 @@ func (h *FloatHistogram) String() string {
}
// ZeroBucket returns the zero bucket.
func (h *FloatHistogram) ZeroBucket() FloatBucket {
return FloatBucket{
func (h *FloatHistogram) ZeroBucket() Bucket[float64] {
return Bucket[float64]{
Lower: -h.ZeroThreshold,
Upper: h.ZeroThreshold,
LowerInclusive: true,
@ -250,7 +249,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
// count is added. If not, the bucket is inserted. The updated slices and the
// coordinates of the inserted or added-to bucket are returned.
func addBucket(
b FloatBucket,
b Bucket[float64],
spans []Span, buckets []float64,
iSpan, iBucket int,
iInSpan, index int32,
@ -435,7 +434,7 @@ func (h *FloatHistogram) DetectReset(previous *FloatHistogram) bool {
return detectReset(currIt, prevIt)
}
func detectReset(currIt, prevIt FloatBucketIterator) bool {
func detectReset(currIt, prevIt BucketIterator[float64]) bool {
if !prevIt.Next() {
return false // If no buckets in previous histogram, nothing can be reset.
}
@ -495,40 +494,39 @@ func detectReset(currIt, prevIt FloatBucketIterator) bool {
}
}
// PositiveBucketIterator returns a FloatBucketIterator to iterate over all
// positive buckets in ascending order (starting next to the zero bucket and
// going up).
func (h *FloatHistogram) PositiveBucketIterator() FloatBucketIterator {
// PositiveBucketIterator returns a BucketIterator to iterate over all positive
// buckets in ascending order (starting next to the zero bucket and going up).
func (h *FloatHistogram) PositiveBucketIterator() BucketIterator[float64] {
return h.floatBucketIterator(true, 0, h.Schema)
}
// NegativeBucketIterator returns a FloatBucketIterator to iterate over all
// negative buckets in descending order (starting next to the zero bucket and
// going down).
func (h *FloatHistogram) NegativeBucketIterator() FloatBucketIterator {
// NegativeBucketIterator returns a BucketIterator to iterate over all negative
// buckets in descending order (starting next to the zero bucket and going
// down).
func (h *FloatHistogram) NegativeBucketIterator() BucketIterator[float64] {
return h.floatBucketIterator(false, 0, h.Schema)
}
// PositiveReverseBucketIterator returns a FloatBucketIterator to iterate over all
// positive buckets in descending order (starting at the highest bucket and going
// down towards the zero bucket).
func (h *FloatHistogram) PositiveReverseBucketIterator() FloatBucketIterator {
return h.reverseFloatBucketIterator(true)
// PositiveReverseBucketIterator returns a BucketIterator to iterate over all
// positive buckets in descending order (starting at the highest bucket and
// going down towards the zero bucket).
func (h *FloatHistogram) PositiveReverseBucketIterator() BucketIterator[float64] {
return newReverseFloatBucketIterator(h.PositiveSpans, h.PositiveBuckets, h.Schema, true)
}
// NegativeReverseBucketIterator returns a FloatBucketIterator to iterate over all
// negative buckets in ascending order (starting at the lowest bucket and going up
// towards the zero bucket).
func (h *FloatHistogram) NegativeReverseBucketIterator() FloatBucketIterator {
return h.reverseFloatBucketIterator(false)
// NegativeReverseBucketIterator returns a BucketIterator to iterate over all
// negative buckets in ascending order (starting at the lowest bucket and going
// up towards the zero bucket).
func (h *FloatHistogram) NegativeReverseBucketIterator() BucketIterator[float64] {
return newReverseFloatBucketIterator(h.NegativeSpans, h.NegativeBuckets, h.Schema, false)
}
// AllBucketIterator returns a FloatBucketIterator to iterate over all negative,
// AllBucketIterator returns a BucketIterator to iterate over all negative,
// zero, and positive buckets in ascending order (starting at the lowest bucket
// and going up). If the highest negative bucket or the lowest positive bucket
// overlap with the zero bucket, their upper or lower boundary, respectively, is
// set to the zero threshold.
func (h *FloatHistogram) AllBucketIterator() FloatBucketIterator {
func (h *FloatHistogram) AllBucketIterator() BucketIterator[float64] {
return &allFloatBucketIterator{
h: h,
negIter: h.NegativeReverseBucketIterator(),
@ -537,17 +535,6 @@ func (h *FloatHistogram) AllBucketIterator() FloatBucketIterator {
}
}
// CumulativeBucketIterator returns a FloatBucketIterator to iterate over a
// cumulative view of the buckets. This method currently only supports
// FloatHistograms without negative buckets and panics if the FloatHistogram has
// negative buckets. It is currently only used for testing.
func (h *FloatHistogram) CumulativeBucketIterator() FloatBucketIterator {
if len(h.NegativeBuckets) > 0 {
panic("CumulativeBucketIterator called on FloatHistogram with negative buckets")
}
return &cumulativeFloatBucketIterator{h: h, posSpansIdx: -1}
}
// zeroCountForLargerThreshold returns what the histogram's zero count would be
// if the ZeroThreshold had the provided larger (or equal) value. If the
// provided value is less than the histogram's ZeroThreshold, the method panics.
@ -659,52 +646,6 @@ func (h *FloatHistogram) reconcileZeroBuckets(other *FloatHistogram) float64 {
return otherZeroCount
}
// FloatBucketIterator iterates over the buckets of a FloatHistogram, returning
// decoded buckets.
type FloatBucketIterator interface {
// Next advances the iterator by one.
Next() bool
// At returns the current bucket.
At() FloatBucket
}
// FloatBucket represents a bucket with lower and upper limit and the count of
// samples in the bucket as a float64. It also specifies if each limit is
// inclusive or not. (Mathematically, inclusive limits create a closed interval,
// and non-inclusive limits an open interval.)
//
// To represent cumulative buckets, Lower is set to -Inf, and the Count is then
// cumulative (including the counts of all buckets for smaller values).
type FloatBucket struct {
Lower, Upper float64
LowerInclusive, UpperInclusive bool
Count float64
// Index within schema. To easily compare buckets that share the same
// schema and sign (positive or negative). Irrelevant for the zero bucket.
Index int32
}
// String returns a string representation of a FloatBucket, using the usual
// mathematical notation of '['/']' for inclusive bounds and '('/')' for
// non-inclusive bounds.
func (b FloatBucket) String() string {
var sb strings.Builder
if b.LowerInclusive {
sb.WriteRune('[')
} else {
sb.WriteRune('(')
}
fmt.Fprintf(&sb, "%g,%g", b.Lower, b.Upper)
if b.UpperInclusive {
sb.WriteRune(']')
} else {
sb.WriteRune(')')
}
fmt.Fprintf(&sb, ":%g", b.Count)
return sb.String()
}
// floatBucketIterator is a low-level constructor for bucket iterators.
//
// If positive is true, the returned iterator iterates through the positive
@ -725,9 +666,11 @@ func (h *FloatHistogram) floatBucketIterator(
panic(fmt.Errorf("cannot merge from schema %d to %d", h.Schema, targetSchema))
}
i := &floatBucketIterator{
schema: h.Schema,
baseBucketIterator: baseBucketIterator[float64, float64]{
schema: h.Schema,
positive: positive,
},
targetSchema: targetSchema,
positive: positive,
absoluteStartValue: absoluteStartValue,
}
if positive {
@ -741,14 +684,16 @@ func (h *FloatHistogram) floatBucketIterator(
}
// reverseFloatbucketiterator is a low-level constructor for reverse bucket iterators.
func (h *FloatHistogram) reverseFloatBucketIterator(positive bool) *reverseFloatBucketIterator {
r := &reverseFloatBucketIterator{schema: h.Schema, positive: positive}
if positive {
r.spans = h.PositiveSpans
r.buckets = h.PositiveBuckets
} else {
r.spans = h.NegativeSpans
r.buckets = h.NegativeBuckets
func newReverseFloatBucketIterator(
spans []Span, buckets []float64, schema int32, positive bool,
) *reverseFloatBucketIterator {
r := &reverseFloatBucketIterator{
baseBucketIterator: baseBucketIterator[float64, float64]{
schema: schema,
spans: spans,
buckets: buckets,
positive: positive,
},
}
r.spansIdx = len(r.spans) - 1
@ -765,21 +710,10 @@ func (h *FloatHistogram) reverseFloatBucketIterator(positive bool) *reverseFloat
}
type floatBucketIterator struct {
// targetSchema is the schema to merge to and must be ≤ schema.
schema, targetSchema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The bucket index within the targetSchema.
origIdx int32 // The bucket index within the original schema.
baseBucketIterator[float64, float64]
targetSchema int32 // targetSchema is the schema to merge to and must be ≤ schema.
origIdx int32 // The bucket index within the original schema.
absoluteStartValue float64 // Never return buckets with an upper bound ≤ this value.
}
@ -844,23 +778,6 @@ mergeLoop: // Merge together all buckets from the original schema that fall into
return true
}
func (i *floatBucketIterator) At() FloatBucket {
b := FloatBucket{
Count: i.currCount,
Index: i.currIdx,
}
if i.positive {
b.Upper = getBound(i.currIdx, i.targetSchema)
b.Lower = getBound(i.currIdx-1, i.targetSchema)
} else {
b.Lower = -getBound(i.currIdx, i.targetSchema)
b.Upper = -getBound(i.currIdx-1, i.targetSchema)
}
b.LowerInclusive = b.Lower < 0
b.UpperInclusive = b.Upper > 0
return b
}
// targetIdx returns the bucket index within i.targetSchema for the given bucket
// index within i.schema.
func (i *floatBucketIterator) targetIdx(idx int32) int32 {
@ -873,192 +790,82 @@ func (i *floatBucketIterator) targetIdx(idx int32) int32 {
}
type reverseFloatBucketIterator struct {
schema int32
spans []Span
buckets []float64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan int32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount float64 // Count in the current bucket.
currIdx int32 // The actual bucket index.
currLower, currUpper float64 // Limits of the current bucket.
baseBucketIterator[float64, float64]
idxInSpan int32 // Changed from uint32 to allow negative values for exhaustion detection.
}
func (r *reverseFloatBucketIterator) Next() bool {
r.currIdx--
if r.bucketsIdx < 0 {
func (i *reverseFloatBucketIterator) Next() bool {
i.currIdx--
if i.bucketsIdx < 0 {
return false
}
for r.idxInSpan < 0 {
for i.idxInSpan < 0 {
// We have exhausted the current span and have to find a new
// one. We'll even handle pathologic spans of length 0.
r.spansIdx--
r.idxInSpan = int32(r.spans[r.spansIdx].Length) - 1
r.currIdx -= r.spans[r.spansIdx+1].Offset
i.spansIdx--
i.idxInSpan = int32(i.spans[i.spansIdx].Length) - 1
i.currIdx -= i.spans[i.spansIdx+1].Offset
}
r.currCount = r.buckets[r.bucketsIdx]
if r.positive {
r.currUpper = getBound(r.currIdx, r.schema)
r.currLower = getBound(r.currIdx-1, r.schema)
} else {
r.currLower = -getBound(r.currIdx, r.schema)
r.currUpper = -getBound(r.currIdx-1, r.schema)
}
r.bucketsIdx--
r.idxInSpan--
i.currCount = i.buckets[i.bucketsIdx]
i.bucketsIdx--
i.idxInSpan--
return true
}
func (r *reverseFloatBucketIterator) At() FloatBucket {
return FloatBucket{
Count: r.currCount,
Lower: r.currLower,
Upper: r.currUpper,
LowerInclusive: r.currLower < 0,
UpperInclusive: r.currUpper > 0,
Index: r.currIdx,
}
}
type allFloatBucketIterator struct {
h *FloatHistogram
negIter, posIter FloatBucketIterator
negIter, posIter BucketIterator[float64]
// -1 means we are iterating negative buckets.
// 0 means it is time for the zero bucket.
// 1 means we are iterating positive buckets.
// Anything else means iteration is over.
state int8
currBucket FloatBucket
currBucket Bucket[float64]
}
func (r *allFloatBucketIterator) Next() bool {
switch r.state {
func (i *allFloatBucketIterator) Next() bool {
switch i.state {
case -1:
if r.negIter.Next() {
r.currBucket = r.negIter.At()
if r.currBucket.Upper > -r.h.ZeroThreshold {
r.currBucket.Upper = -r.h.ZeroThreshold
if i.negIter.Next() {
i.currBucket = i.negIter.At()
if i.currBucket.Upper > -i.h.ZeroThreshold {
i.currBucket.Upper = -i.h.ZeroThreshold
}
return true
}
r.state = 0
return r.Next()
i.state = 0
return i.Next()
case 0:
r.state = 1
if r.h.ZeroCount > 0 {
r.currBucket = FloatBucket{
Lower: -r.h.ZeroThreshold,
Upper: r.h.ZeroThreshold,
i.state = 1
if i.h.ZeroCount > 0 {
i.currBucket = Bucket[float64]{
Lower: -i.h.ZeroThreshold,
Upper: i.h.ZeroThreshold,
LowerInclusive: true,
UpperInclusive: true,
Count: r.h.ZeroCount,
Count: i.h.ZeroCount,
// Index is irrelevant for the zero bucket.
}
return true
}
return r.Next()
return i.Next()
case 1:
if r.posIter.Next() {
r.currBucket = r.posIter.At()
if r.currBucket.Lower < r.h.ZeroThreshold {
r.currBucket.Lower = r.h.ZeroThreshold
if i.posIter.Next() {
i.currBucket = i.posIter.At()
if i.currBucket.Lower < i.h.ZeroThreshold {
i.currBucket.Lower = i.h.ZeroThreshold
}
return true
}
r.state = 42
i.state = 42
return false
}
return false
}
func (r *allFloatBucketIterator) At() FloatBucket {
return r.currBucket
}
type cumulativeFloatBucketIterator struct {
h *FloatHistogram
posSpansIdx int // Index in h.PositiveSpans we are in. -1 means 0 bucket.
posBucketsIdx int // Index in h.PositiveBuckets.
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length.
initialized bool
currIdx int32 // The actual bucket index after decoding from spans.
currUpper float64 // The upper boundary of the current bucket.
currCumulativeCount float64 // Current "cumulative" count for the current bucket.
// Between 2 spans there could be some empty buckets which
// still needs to be counted for cumulative buckets.
// When we hit the end of a span, we use this to iterate
// through the empty buckets.
emptyBucketCount int32
}
func (c *cumulativeFloatBucketIterator) Next() bool {
if c.posSpansIdx == -1 {
// Zero bucket.
c.posSpansIdx++
if c.h.ZeroCount == 0 {
return c.Next()
}
c.currUpper = c.h.ZeroThreshold
c.currCumulativeCount = c.h.ZeroCount
return true
}
if c.posSpansIdx >= len(c.h.PositiveSpans) {
return false
}
if c.emptyBucketCount > 0 {
// We are traversing through empty buckets at the moment.
c.currUpper = getBound(c.currIdx, c.h.Schema)
c.currIdx++
c.emptyBucketCount--
return true
}
span := c.h.PositiveSpans[c.posSpansIdx]
if c.posSpansIdx == 0 && !c.initialized {
// Initializing.
c.currIdx = span.Offset
c.initialized = true
}
c.currCumulativeCount += c.h.PositiveBuckets[c.posBucketsIdx]
c.currUpper = getBound(c.currIdx, c.h.Schema)
c.posBucketsIdx++
c.idxInSpan++
c.currIdx++
if c.idxInSpan >= span.Length {
// Move to the next span. This one is done.
c.posSpansIdx++
c.idxInSpan = 0
if c.posSpansIdx < len(c.h.PositiveSpans) {
c.emptyBucketCount = c.h.PositiveSpans[c.posSpansIdx].Offset
}
}
return true
}
func (c *cumulativeFloatBucketIterator) At() FloatBucket {
return FloatBucket{
Upper: c.currUpper,
Lower: math.Inf(-1),
UpperInclusive: true,
LowerInclusive: true,
Count: c.currCumulativeCount,
Index: c.currIdx - 1,
}
func (i *allFloatBucketIterator) At() Bucket[float64] {
return i.currBucket
}

10
model/histogram/float_histogram_test.go

@ -1578,11 +1578,11 @@ func TestReverseFloatBucketIterator(t *testing.T) {
// Assuming that the regular iterator is correct.
// Positive buckets.
var expBuckets, actBuckets []FloatBucket
var expBuckets, actBuckets []Bucket[float64]
it := h.PositiveBucketIterator()
for it.Next() {
// Append in reverse to check reversed list.
expBuckets = append([]FloatBucket{it.At()}, expBuckets...)
expBuckets = append([]Bucket[float64]{it.At()}, expBuckets...)
}
it = h.PositiveReverseBucketIterator()
for it.Next() {
@ -1598,7 +1598,7 @@ func TestReverseFloatBucketIterator(t *testing.T) {
it = h.NegativeBucketIterator()
for it.Next() {
// Append in reverse to check reversed list.
expBuckets = append([]FloatBucket{it.At()}, expBuckets...)
expBuckets = append([]Bucket[float64]{it.At()}, expBuckets...)
}
it = h.NegativeReverseBucketIterator()
for it.Next() {
@ -1793,7 +1793,7 @@ func TestAllFloatBucketIterator(t *testing.T) {
for i, c := range cases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
var expBuckets, actBuckets []FloatBucket
var expBuckets, actBuckets []Bucket[float64]
if c.includeNeg {
it := c.h.NegativeReverseBucketIterator()
@ -1806,7 +1806,7 @@ func TestAllFloatBucketIterator(t *testing.T) {
}
}
if c.includeZero {
expBuckets = append(expBuckets, FloatBucket{
expBuckets = append(expBuckets, Bucket[float64]{
Lower: -c.h.ZeroThreshold,
Upper: c.h.ZeroThreshold,
LowerInclusive: true,

110
model/histogram/compact.go → model/histogram/generic.go

@ -13,16 +13,120 @@
package histogram
import (
"fmt"
"strings"
)
// BucketCount is a type constraint for the count in a bucket, which can be
// float64 (for type FloatHistogram) or uint64 (for type Histogram).
type BucketCount interface {
float64 | uint64
}
// internalBucketCount is used internally by Histogram and FloatHistogram. The
// difference to the BucketCount above is that Histogram internally uses deltas
// between buckets rather than absolute counts (while FloatHistogram uses
// absolute counts directly). Go type parameters don't allow type
// specialization. Therefore, where special treatment of deltas between buckets
// vs. absolute counts is important, this information has to be provided as a
// separate boolean parameter "deltaBuckets"
type internalBucketCount interface {
float64 | int64
}
// Bucket represents a bucket with lower and upper limit and the absolute count
// of samples in the bucket. It also specifies if each limit is inclusive or
// not. (Mathematically, inclusive limits create a closed interval, and
// non-inclusive limits an open interval.)
//
// To represent cumulative buckets, Lower is set to -Inf, and the Count is then
// cumulative (including the counts of all buckets for smaller values).
type Bucket[BC BucketCount] struct {
Lower, Upper float64
LowerInclusive, UpperInclusive bool
Count BC
// Index within schema. To easily compare buckets that share the same
// schema and sign (positive or negative). Irrelevant for the zero bucket.
Index int32
}
// String returns a string representation of a Bucket, using the usual
// mathematical notation of '['/']' for inclusive bounds and '('/')' for
// non-inclusive bounds.
func (b Bucket[BC]) String() string {
var sb strings.Builder
if b.LowerInclusive {
sb.WriteRune('[')
} else {
sb.WriteRune('(')
}
fmt.Fprintf(&sb, "%g,%g", b.Lower, b.Upper)
if b.UpperInclusive {
sb.WriteRune(']')
} else {
sb.WriteRune(')')
}
fmt.Fprintf(&sb, ":%v", b.Count)
return sb.String()
}
// BucketIterator iterates over the buckets of a Histogram, returning decoded
// buckets.
type BucketIterator[BC BucketCount] interface {
// Next advances the iterator by one.
Next() bool
// At returns the current bucket.
At() Bucket[BC]
}
// baseBucketIterator provides a struct that is shared by most BucketIterator
// implementations, together with an implementation of the At method. This
// iterator can be embedded in full implementations of BucketIterator to save on
// code replication.
type baseBucketIterator[BC BucketCount, IBC internalBucketCount] struct {
schema int32
spans []Span
buckets []IBC
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount IBC // Count in the current bucket.
currIdx int32 // The actual bucket index.
}
func (b baseBucketIterator[BC, IBC]) At() Bucket[BC] {
bucket := Bucket[BC]{
Count: BC(b.currCount),
Index: b.currIdx,
}
if b.positive {
bucket.Upper = getBound(b.currIdx, b.schema)
bucket.Lower = getBound(b.currIdx-1, b.schema)
} else {
bucket.Lower = -getBound(b.currIdx, b.schema)
bucket.Upper = -getBound(b.currIdx-1, b.schema)
}
bucket.LowerInclusive = bucket.Lower < 0
bucket.UpperInclusive = bucket.Upper > 0
return bucket
}
// compactBuckets is a generic function used by both Histogram.Compact and
// FloatHistogram.Compact. Set deltaBuckets to true if the provided buckets are
// deltas. Set it to false if the buckets contain absolute counts.
func compactBuckets[Bucket float64 | int64](buckets []Bucket, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]Bucket, []Span) {
func compactBuckets[IBC internalBucketCount](buckets []IBC, spans []Span, maxEmptyBuckets int, deltaBuckets bool) ([]IBC, []Span) {
// Fast path: If there are no empty buckets AND no offset in any span is
// <= maxEmptyBuckets AND no span has length 0, there is nothing to do and we can return
// immediately. We check that first because it's cheap and presumably
// common.
nothingToDo := true
var currentBucketAbsolute Bucket
var currentBucketAbsolute IBC
for _, bucket := range buckets {
if deltaBuckets {
currentBucketAbsolute += bucket
@ -204,7 +308,7 @@ func compactBuckets[Bucket float64 | int64](buckets []Bucket, spans []Span, maxE
offset := int(spans[iSpan].Offset)
spans[iSpan-1].Length += uint32(offset) + spans[iSpan].Length
spans = append(spans[:iSpan], spans[iSpan+1:]...)
newBuckets := make([]Bucket, len(buckets)+offset)
newBuckets := make([]IBC, len(buckets)+offset)
copy(newBuckets, buckets[:iBucket])
copy(newBuckets[iBucket+offset:], buckets[iBucket:])
if deltaBuckets {

112
model/histogram/histogram.go

@ -95,7 +95,7 @@ func (h *Histogram) String() string {
var sb strings.Builder
fmt.Fprintf(&sb, "{count:%d, sum:%g", h.Count, h.Sum)
var nBuckets []Bucket
var nBuckets []Bucket[uint64]
for it := h.NegativeBucketIterator(); it.Next(); {
bucket := it.At()
if bucket.Count != 0 {
@ -122,8 +122,8 @@ func (h *Histogram) String() string {
}
// ZeroBucket returns the zero bucket.
func (h *Histogram) ZeroBucket() Bucket {
return Bucket{
func (h *Histogram) ZeroBucket() Bucket[uint64] {
return Bucket[uint64]{
Lower: -h.ZeroThreshold,
Upper: h.ZeroThreshold,
LowerInclusive: true,
@ -134,21 +134,21 @@ func (h *Histogram) ZeroBucket() Bucket {
// PositiveBucketIterator returns a BucketIterator to iterate over all positive
// buckets in ascending order (starting next to the zero bucket and going up).
func (h *Histogram) PositiveBucketIterator() BucketIterator {
return newRegularBucketIterator(h, true)
func (h *Histogram) PositiveBucketIterator() BucketIterator[uint64] {
return newRegularBucketIterator(h.PositiveSpans, h.PositiveBuckets, h.Schema, true)
}
// NegativeBucketIterator returns a BucketIterator to iterate over all negative
// buckets in descending order (starting next to the zero bucket and going down).
func (h *Histogram) NegativeBucketIterator() BucketIterator {
return newRegularBucketIterator(h, false)
func (h *Histogram) NegativeBucketIterator() BucketIterator[uint64] {
return newRegularBucketIterator(h.NegativeSpans, h.NegativeBuckets, h.Schema, false)
}
// CumulativeBucketIterator returns a BucketIterator to iterate over a
// cumulative view of the buckets. This method currently only supports
// Histograms without negative buckets and panics if the Histogram has negative
// buckets. It is currently only used for testing.
func (h *Histogram) CumulativeBucketIterator() BucketIterator {
func (h *Histogram) CumulativeBucketIterator() BucketIterator[uint64] {
if len(h.NegativeBuckets) > 0 {
panic("CumulativeBucketIterator called on Histogram with negative buckets")
}
@ -319,75 +319,18 @@ func (h *Histogram) ToFloat() *FloatHistogram {
}
}
// BucketIterator iterates over the buckets of a Histogram, returning decoded
// buckets.
type BucketIterator interface {
// Next advances the iterator by one.
Next() bool
// At returns the current bucket.
At() Bucket
}
// Bucket represents a bucket with lower and upper limit and the count of
// samples in the bucket. It also specifies if each limit is inclusive or
// not. (Mathematically, inclusive limits create a closed interval, and
// non-inclusive limits an open interval.)
//
// To represent cumulative buckets, Lower is set to -Inf, and the Count is then
// cumulative (including the counts of all buckets for smaller values).
type Bucket struct {
Lower, Upper float64
LowerInclusive, UpperInclusive bool
Count uint64
Index int32 // Index within schema. To easily compare buckets that share the same schema.
}
// String returns a string representation of a Bucket, using the usual
// mathematical notation of '['/']' for inclusive bounds and '('/')' for
// non-inclusive bounds.
func (b Bucket) String() string {
var sb strings.Builder
if b.LowerInclusive {
sb.WriteRune('[')
} else {
sb.WriteRune('(')
}
fmt.Fprintf(&sb, "%g,%g", b.Lower, b.Upper)
if b.UpperInclusive {
sb.WriteRune(']')
} else {
sb.WriteRune(')')
}
fmt.Fprintf(&sb, ":%d", b.Count)
return sb.String()
}
type regularBucketIterator struct {
schema int32
spans []Span
buckets []int64
positive bool // Whether this is for positive buckets.
spansIdx int // Current span within spans slice.
idxInSpan uint32 // Index in the current span. 0 <= idxInSpan < span.Length.
bucketsIdx int // Current bucket within buckets slice.
currCount int64 // Count in the current bucket.
currIdx int32 // The actual bucket index.
currLower, currUpper float64 // Limits of the current bucket.
baseBucketIterator[uint64, int64]
}
func newRegularBucketIterator(h *Histogram, positive bool) *regularBucketIterator {
r := &regularBucketIterator{schema: h.Schema, positive: positive}
if positive {
r.spans = h.PositiveSpans
r.buckets = h.PositiveBuckets
} else {
r.spans = h.NegativeSpans
r.buckets = h.NegativeBuckets
func newRegularBucketIterator(spans []Span, buckets []int64, schema int32, positive bool) *regularBucketIterator {
i := baseBucketIterator[uint64, int64]{
schema: schema,
spans: spans,
buckets: buckets,
positive: positive,
}
return r
return &regularBucketIterator{i}
}
func (r *regularBucketIterator) Next() bool {
@ -414,30 +357,11 @@ func (r *regularBucketIterator) Next() bool {
}
r.currCount += r.buckets[r.bucketsIdx]
if r.positive {
r.currUpper = getBound(r.currIdx, r.schema)
r.currLower = getBound(r.currIdx-1, r.schema)
} else {
r.currLower = -getBound(r.currIdx, r.schema)
r.currUpper = -getBound(r.currIdx-1, r.schema)
}
r.idxInSpan++
r.bucketsIdx++
return true
}
func (r *regularBucketIterator) At() Bucket {
return Bucket{
Count: uint64(r.currCount),
Lower: r.currLower,
Upper: r.currUpper,
LowerInclusive: r.currLower < 0,
UpperInclusive: r.currUpper > 0,
Index: r.currIdx,
}
}
type cumulativeBucketIterator struct {
h *Histogram
@ -512,8 +436,8 @@ func (c *cumulativeBucketIterator) Next() bool {
return true
}
func (c *cumulativeBucketIterator) At() Bucket {
return Bucket{
func (c *cumulativeBucketIterator) At() Bucket[uint64] {
return Bucket[uint64]{
Upper: c.currUpper,
Lower: math.Inf(-1),
UpperInclusive: true,

52
model/histogram/histogram_test.go

@ -80,7 +80,7 @@ func TestHistogramString(t *testing.T) {
func TestCumulativeBucketIterator(t *testing.T) {
cases := []struct {
histogram Histogram
expectedBuckets []Bucket
expectedBuckets []Bucket[uint64]
}{
{
histogram: Histogram{
@ -91,7 +91,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 1, -1, 0},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 1, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: 0},
{Lower: math.Inf(-1), Upper: 2, Count: 3, LowerInclusive: true, UpperInclusive: true, Index: 1},
@ -110,7 +110,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 1, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: 0},
{Lower: math.Inf(-1), Upper: 2, Count: 4, LowerInclusive: true, UpperInclusive: true, Index: 1},
{Lower: math.Inf(-1), Upper: 4, Count: 5, LowerInclusive: true, UpperInclusive: true, Index: 2},
@ -130,7 +130,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 0},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 1, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: 0},
{Lower: math.Inf(-1), Upper: 2, Count: 4, LowerInclusive: true, UpperInclusive: true, Index: 1},
{Lower: math.Inf(-1), Upper: 4, Count: 5, LowerInclusive: true, UpperInclusive: true, Index: 2},
@ -150,7 +150,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 0.6484197773255048, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: -5},
{Lower: math.Inf(-1), Upper: 0.7071067811865475, Count: 4, LowerInclusive: true, UpperInclusive: true, Index: -4},
@ -177,7 +177,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 0.00390625, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: -2},
{Lower: math.Inf(-1), Upper: 0.0625, Count: 4, LowerInclusive: true, UpperInclusive: true, Index: -1},
{Lower: math.Inf(-1), Upper: 1, Count: 5, LowerInclusive: true, UpperInclusive: true, Index: 0},
@ -198,7 +198,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1},
},
expectedBuckets: []Bucket{
expectedBuckets: []Bucket[uint64]{
{Lower: math.Inf(-1), Upper: 0.0625, Count: 1, LowerInclusive: true, UpperInclusive: true, Index: -2},
{Lower: math.Inf(-1), Upper: 0.25, Count: 4, LowerInclusive: true, UpperInclusive: true, Index: -1},
{Lower: math.Inf(-1), Upper: 1, Count: 5, LowerInclusive: true, UpperInclusive: true, Index: 0},
@ -211,7 +211,7 @@ func TestCumulativeBucketIterator(t *testing.T) {
for i, c := range cases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
it := c.histogram.CumulativeBucketIterator()
actualBuckets := make([]Bucket, 0, len(c.expectedBuckets))
actualBuckets := make([]Bucket[uint64], 0, len(c.expectedBuckets))
for it.Next() {
actualBuckets = append(actualBuckets, it.At())
}
@ -223,15 +223,15 @@ func TestCumulativeBucketIterator(t *testing.T) {
func TestRegularBucketIterator(t *testing.T) {
cases := []struct {
histogram Histogram
expectedPositiveBuckets []Bucket
expectedNegativeBuckets []Bucket
expectedPositiveBuckets []Bucket[uint64]
expectedNegativeBuckets []Bucket[uint64]
}{
{
histogram: Histogram{
Schema: 0,
},
expectedPositiveBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket{},
expectedPositiveBuckets: []Bucket[uint64]{},
expectedNegativeBuckets: []Bucket[uint64]{},
},
{
histogram: Histogram{
@ -242,14 +242,14 @@ func TestRegularBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 1, -1, 0},
},
expectedPositiveBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{
{Lower: 0.5, Upper: 1, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 0},
{Lower: 1, Upper: 2, Count: 2, LowerInclusive: false, UpperInclusive: true, Index: 1},
{Lower: 4, Upper: 8, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 3},
{Lower: 8, Upper: 16, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 4},
},
expectedNegativeBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket[uint64]{},
},
{
histogram: Histogram{
@ -260,8 +260,8 @@ func TestRegularBucketIterator(t *testing.T) {
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
},
expectedPositiveBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{},
expectedNegativeBuckets: []Bucket[uint64]{
{Lower: -1, Upper: -0.5, Count: 1, LowerInclusive: true, UpperInclusive: false, Index: 0},
{Lower: -2, Upper: -1, Count: 3, LowerInclusive: true, UpperInclusive: false, Index: 1},
{Lower: -4, Upper: -2, Count: 1, LowerInclusive: true, UpperInclusive: false, Index: 2},
@ -287,7 +287,7 @@ func TestRegularBucketIterator(t *testing.T) {
},
NegativeBuckets: []int64{1, 2, -2, 1, -1, 0},
},
expectedPositiveBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{
{Lower: 0.5, Upper: 1, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 0},
{Lower: 1, Upper: 2, Count: 3, LowerInclusive: false, UpperInclusive: true, Index: 1},
{Lower: 2, Upper: 4, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 2},
@ -296,7 +296,7 @@ func TestRegularBucketIterator(t *testing.T) {
{Lower: 16, Upper: 32, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 5},
{Lower: 32, Upper: 64, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 6},
},
expectedNegativeBuckets: []Bucket{
expectedNegativeBuckets: []Bucket[uint64]{
{Lower: -1, Upper: -0.5, Count: 1, LowerInclusive: true, UpperInclusive: false, Index: 0},
{Lower: -2, Upper: -1, Count: 3, LowerInclusive: true, UpperInclusive: false, Index: 1},
{Lower: -4, Upper: -2, Count: 1, LowerInclusive: true, UpperInclusive: false, Index: 2},
@ -316,7 +316,7 @@ func TestRegularBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0, 3},
},
expectedPositiveBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{
{Lower: 0.5946035575013605, Upper: 0.6484197773255048, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: -5},
{Lower: 0.6484197773255048, Upper: 0.7071067811865475, Count: 3, LowerInclusive: false, UpperInclusive: true, Index: -4},
@ -327,7 +327,7 @@ func TestRegularBucketIterator(t *testing.T) {
{Lower: 1.2968395546510096, Upper: 1.414213562373095, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 4},
{Lower: 1.414213562373095, Upper: 1.5422108254079407, Count: 4, LowerInclusive: false, UpperInclusive: true, Index: 5},
},
expectedNegativeBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket[uint64]{},
},
{
histogram: Histogram{
@ -338,7 +338,7 @@ func TestRegularBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1, 0},
},
expectedPositiveBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{
{Lower: 0.000244140625, Upper: 0.00390625, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: -2},
{Lower: 0.00390625, Upper: 0.0625, Count: 3, LowerInclusive: false, UpperInclusive: true, Index: -1},
{Lower: 0.0625, Upper: 1, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 0},
@ -347,7 +347,7 @@ func TestRegularBucketIterator(t *testing.T) {
{Lower: 4096, Upper: 65536, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 4},
{Lower: 65536, Upper: 1048576, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 5},
},
expectedNegativeBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket[uint64]{},
},
{
histogram: Histogram{
@ -357,27 +357,27 @@ func TestRegularBucketIterator(t *testing.T) {
},
PositiveBuckets: []int64{1, 2, -2, 1, -1},
},
expectedPositiveBuckets: []Bucket{
expectedPositiveBuckets: []Bucket[uint64]{
{Lower: 0.015625, Upper: 0.0625, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: -2},
{Lower: 0.0625, Upper: 0.25, Count: 3, LowerInclusive: false, UpperInclusive: true, Index: -1},
{Lower: 0.25, Upper: 1, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 0},
{Lower: 1, Upper: 4, Count: 2, LowerInclusive: false, UpperInclusive: true, Index: 1},
{Lower: 4, Upper: 16, Count: 1, LowerInclusive: false, UpperInclusive: true, Index: 2},
},
expectedNegativeBuckets: []Bucket{},
expectedNegativeBuckets: []Bucket[uint64]{},
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
it := c.histogram.PositiveBucketIterator()
actualPositiveBuckets := make([]Bucket, 0, len(c.expectedPositiveBuckets))
actualPositiveBuckets := make([]Bucket[uint64], 0, len(c.expectedPositiveBuckets))
for it.Next() {
actualPositiveBuckets = append(actualPositiveBuckets, it.At())
}
require.Equal(t, c.expectedPositiveBuckets, actualPositiveBuckets)
it = c.histogram.NegativeBucketIterator()
actualNegativeBuckets := make([]Bucket, 0, len(c.expectedNegativeBuckets))
actualNegativeBuckets := make([]Bucket[uint64], 0, len(c.expectedNegativeBuckets))
for it.Next() {
actualNegativeBuckets = append(actualNegativeBuckets, it.At())
}

2
promql/quantile.go

@ -156,7 +156,7 @@ func histogramQuantile(q float64, h *histogram.FloatHistogram) float64 {
}
var (
bucket histogram.FloatBucket
bucket histogram.Bucket[float64]
count float64
it = h.AllBucketIterator()
rank = q * h.Count

Loading…
Cancel
Save