convert classic histograms to int nhcb where possible instead

Signed-off-by: Jeanette Tan <jeanette.tan@grafana.com>
pull/14978/head
Jeanette Tan 5 months ago committed by György Krajcsovits
parent 62e7f0438d
commit 4503145c8b

@ -534,10 +534,17 @@ func (cmd *loadCmd) appendCustomHistogram(a storage.Appender) error {
// Convert the collated classic histogram data into native histograms // Convert the collated classic histogram data into native histograms
// with custom bounds and append them to the storage. // with custom bounds and append them to the storage.
for _, histogramWrapper := range histogramMap { for _, histogramWrapper := range histogramMap {
upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds) upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(histogramWrapper.upperBounds, true)
fhBase := hBase.ToFloat(nil)
samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs)) samples := make([]promql.Sample, 0, len(histogramWrapper.histogramByTs))
for t, histogram := range histogramWrapper.histogramByTs { for t, histogram := range histogramWrapper.histogramByTs {
fh := convertnhcb.ConvertHistogramWrapper(histogram, upperBounds, fhBase) h, fh := convertnhcb.ConvertHistogramWrapper(histogram, upperBounds, hBase, fhBase)
if fh == nil {
if err := h.Validate(); err != nil {
return err
}
fh = h.ToFloat(nil)
}
if err := fh.Validate(); err != nil { if err := fh.Validate(); err != nil {
return err return err
} }

@ -1812,15 +1812,24 @@ loop:
for b := range th.BucketCounts { for b := range th.BucketCounts {
ub = append(ub, b) ub = append(ub, b)
} }
upperBounds, fhBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub) upperBounds, hBase := convertnhcb.ProcessUpperBoundsAndCreateBaseHistogram(ub, false)
fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, fhBase) fhBase := hBase.ToFloat(nil)
if err := fh.Validate(); err != nil { h, fh := convertnhcb.ConvertHistogramWrapper(th, upperBounds, hBase, fhBase)
continue
}
// fmt.Printf("FINAL lset: %s, timestamp: %v, val: %v\n", lset, defTime, fh) // fmt.Printf("FINAL lset: %s, timestamp: %v, val: %v\n", lset, defTime, fh)
_, err = app.AppendHistogram(0, lset, defTime, nil, fh) if h != nil {
if err != nil { if err := h.Validate(); err != nil {
continue continue
}
if _, err = app.AppendHistogram(0, lset, defTime, h, nil); err != nil {
continue
}
} else if fh != nil {
if err := fh.Validate(); err != nil {
continue
}
if _, err = app.AppendHistogram(0, lset, defTime, nil, fh); err != nil {
continue
}
} }
} }
sl.cache.resetNhcb() sl.cache.resetNhcb()

@ -14,6 +14,7 @@
package convertnhcb package convertnhcb
import ( import (
"fmt"
"math" "math"
"sort" "sort"
"strings" "strings"
@ -26,6 +27,7 @@ type TempHistogram struct {
BucketCounts map[float64]float64 BucketCounts map[float64]float64
Count float64 Count float64
Sum float64 Sum float64
HasFloat bool
} }
func NewTempHistogram() TempHistogram { func NewTempHistogram() TempHistogram {
@ -34,15 +36,32 @@ func NewTempHistogram() TempHistogram {
} }
} }
func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64, *histogram.FloatHistogram) { func (h TempHistogram) getIntBucketCounts() (map[float64]int64, error) {
bucketCounts := map[float64]int64{}
for le, count := range h.BucketCounts {
intCount := int64(math.Round(count))
if float64(intCount) != count {
return nil, fmt.Errorf("bucket count %f for le %g is not an integer", count, le)
}
bucketCounts[le] = intCount
}
return bucketCounts, nil
}
func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64, needsDedup bool) ([]float64, *histogram.Histogram) {
sort.Float64s(upperBounds0) sort.Float64s(upperBounds0)
upperBounds := make([]float64, 0, len(upperBounds0)) var upperBounds []float64
prevLE := math.Inf(-1) if needsDedup {
for _, le := range upperBounds0 { upperBounds = make([]float64, 0, len(upperBounds0))
if le != prevLE { // deduplicate prevLE := math.Inf(-1)
upperBounds = append(upperBounds, le) for _, le := range upperBounds0 {
prevLE = le if le != prevLE {
upperBounds = append(upperBounds, le)
prevLE = le
}
} }
} else {
upperBounds = upperBounds0
} }
var customBounds []float64 var customBounds []float64
if upperBounds[len(upperBounds)-1] == math.Inf(1) { if upperBounds[len(upperBounds)-1] == math.Inf(1) {
@ -50,23 +69,57 @@ func ProcessUpperBoundsAndCreateBaseHistogram(upperBounds0 []float64) ([]float64
} else { } else {
customBounds = upperBounds customBounds = upperBounds
} }
return upperBounds, &histogram.FloatHistogram{ return upperBounds, &histogram.Histogram{
Count: 0, Count: 0,
Sum: 0, Sum: 0,
Schema: histogram.CustomBucketsSchema, Schema: histogram.CustomBucketsSchema,
PositiveSpans: []histogram.Span{ PositiveSpans: []histogram.Span{
{Offset: 0, Length: uint32(len(upperBounds))}, {Offset: 0, Length: uint32(len(upperBounds))},
}, },
PositiveBuckets: make([]float64, len(upperBounds)), PositiveBuckets: make([]int64, len(upperBounds)),
CustomValues: customBounds, CustomValues: customBounds,
} }
} }
func ConvertHistogramWrapper(hist TempHistogram, upperBounds []float64, fhBase *histogram.FloatHistogram) *histogram.FloatHistogram { func ConvertHistogramWrapper(histogram TempHistogram, upperBounds []float64, hBase *histogram.Histogram, fhBase *histogram.FloatHistogram) (*histogram.Histogram, *histogram.FloatHistogram) {
intBucketCounts, err := histogram.getIntBucketCounts()
if err != nil {
return nil, convertFloatHistogramWrapper(histogram, upperBounds, histogram.BucketCounts, fhBase)
}
return convertIntHistogramWrapper(histogram, upperBounds, intBucketCounts, hBase), nil
}
func convertIntHistogramWrapper(histogram TempHistogram, upperBounds []float64, bucketCounts map[float64]int64, hBase *histogram.Histogram) *histogram.Histogram {
h := hBase.Copy()
absBucketCounts := make([]int64, len(h.PositiveBuckets))
var prevCount, total int64
for i, le := range upperBounds {
currCount, exists := bucketCounts[le]
if !exists {
currCount = 0
}
count := currCount - prevCount
absBucketCounts[i] = count
total += count
prevCount = currCount
}
h.PositiveBuckets[0] = absBucketCounts[0]
for i := 1; i < len(h.PositiveBuckets); i++ {
h.PositiveBuckets[i] = absBucketCounts[i] - absBucketCounts[i-1]
}
h.Sum = histogram.Sum
if histogram.Count != 0 {
total = int64(histogram.Count)
}
h.Count = uint64(total)
return h.Compact(0)
}
func convertFloatHistogramWrapper(histogram TempHistogram, upperBounds []float64, bucketCounts map[float64]float64, fhBase *histogram.FloatHistogram) *histogram.FloatHistogram {
fh := fhBase.Copy() fh := fhBase.Copy()
var prevCount, total float64 var prevCount, total float64
for i, le := range upperBounds { for i, le := range upperBounds {
currCount, exists := hist.BucketCounts[le] currCount, exists := bucketCounts[le]
if !exists { if !exists {
currCount = 0 currCount = 0
} }
@ -75,9 +128,9 @@ func ConvertHistogramWrapper(hist TempHistogram, upperBounds []float64, fhBase *
total += count total += count
prevCount = currCount prevCount = currCount
} }
fh.Sum = hist.Sum fh.Sum = histogram.Sum
if hist.Count != 0 { if histogram.Count != 0 {
total = hist.Count total = histogram.Count
} }
fh.Count = total fh.Count = total
return fh.Compact(0) return fh.Compact(0)

Loading…
Cancel
Save