mirror of https://github.com/prometheus/prometheus
Merge pull request #12711 from fatsheep9146/floathistogram-addsub-enhance
enhance float histogram add and sub methodpull/12755/head
commit
798c5737a0
|
@ -218,23 +218,17 @@ func (h *FloatHistogram) Add(other *FloatHistogram) *FloatHistogram {
|
|||
h.Count += other.Count
|
||||
h.Sum += other.Sum
|
||||
|
||||
// TODO(beorn7): If needed, this can be optimized by inspecting the
|
||||
// spans in other and create missing buckets in h in batches.
|
||||
var iInSpan, index int32
|
||||
for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); {
|
||||
b := it.At()
|
||||
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
|
||||
b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index,
|
||||
)
|
||||
index = b.Index
|
||||
}
|
||||
for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); {
|
||||
b := it.At()
|
||||
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
|
||||
b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index,
|
||||
)
|
||||
index = b.Index
|
||||
otherPositiveSpans := other.PositiveSpans
|
||||
otherPositiveBuckets := other.PositiveBuckets
|
||||
otherNegativeSpans := other.NegativeSpans
|
||||
otherNegativeBuckets := other.NegativeBuckets
|
||||
if other.Schema != h.Schema {
|
||||
otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema)
|
||||
otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema)
|
||||
}
|
||||
|
||||
h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets)
|
||||
h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, false, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets)
|
||||
return h
|
||||
}
|
||||
|
||||
|
@ -245,25 +239,17 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) *FloatHistogram {
|
|||
h.Count -= other.Count
|
||||
h.Sum -= other.Sum
|
||||
|
||||
// TODO(beorn7): If needed, this can be optimized by inspecting the
|
||||
// spans in other and create missing buckets in h in batches.
|
||||
var iInSpan, index int32
|
||||
for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(true, h.ZeroThreshold, h.Schema); it.Next(); {
|
||||
b := it.At()
|
||||
b.Count *= -1
|
||||
h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan = addBucket(
|
||||
b, h.PositiveSpans, h.PositiveBuckets, iSpan, iBucket, iInSpan, index,
|
||||
)
|
||||
index = b.Index
|
||||
}
|
||||
for iSpan, iBucket, it := -1, -1, other.floatBucketIterator(false, h.ZeroThreshold, h.Schema); it.Next(); {
|
||||
b := it.At()
|
||||
b.Count *= -1
|
||||
h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan = addBucket(
|
||||
b, h.NegativeSpans, h.NegativeBuckets, iSpan, iBucket, iInSpan, index,
|
||||
)
|
||||
index = b.Index
|
||||
otherPositiveSpans := other.PositiveSpans
|
||||
otherPositiveBuckets := other.PositiveBuckets
|
||||
otherNegativeSpans := other.NegativeSpans
|
||||
otherNegativeBuckets := other.NegativeBuckets
|
||||
if other.Schema != h.Schema {
|
||||
otherPositiveSpans, otherPositiveBuckets = mergeToSchema(other.PositiveSpans, other.PositiveBuckets, other.Schema, h.Schema)
|
||||
otherNegativeSpans, otherNegativeBuckets = mergeToSchema(other.NegativeSpans, other.NegativeBuckets, other.Schema, h.Schema)
|
||||
}
|
||||
|
||||
h.PositiveSpans, h.PositiveBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.PositiveSpans, h.PositiveBuckets, otherPositiveSpans, otherPositiveBuckets)
|
||||
h.NegativeSpans, h.NegativeBuckets = addBuckets(h.Schema, h.ZeroThreshold, true, h.NegativeSpans, h.NegativeBuckets, otherNegativeSpans, otherNegativeBuckets)
|
||||
return h
|
||||
}
|
||||
|
||||
|
@ -298,103 +284,6 @@ func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// addBucket takes the "coordinates" of the last bucket that was handled and
|
||||
// adds the provided bucket after it. If a corresponding bucket exists, the
|
||||
// count is added. If not, the bucket is inserted. The updated slices and the
|
||||
// coordinates of the inserted or added-to bucket are returned.
|
||||
func addBucket(
|
||||
b Bucket[float64],
|
||||
spans []Span, buckets []float64,
|
||||
iSpan, iBucket int,
|
||||
iInSpan, index int32,
|
||||
) (
|
||||
newSpans []Span, newBuckets []float64,
|
||||
newISpan, newIBucket int, newIInSpan int32,
|
||||
) {
|
||||
if iSpan == -1 {
|
||||
// First add, check if it is before all spans.
|
||||
if len(spans) == 0 || spans[0].Offset > b.Index {
|
||||
// Add bucket before all others.
|
||||
buckets = append(buckets, 0)
|
||||
copy(buckets[1:], buckets)
|
||||
buckets[0] = b.Count
|
||||
if len(spans) > 0 && spans[0].Offset == b.Index+1 {
|
||||
spans[0].Length++
|
||||
spans[0].Offset--
|
||||
return spans, buckets, 0, 0, 0
|
||||
}
|
||||
spans = append(spans, Span{})
|
||||
copy(spans[1:], spans)
|
||||
spans[0] = Span{Offset: b.Index, Length: 1}
|
||||
if len(spans) > 1 {
|
||||
// Convert the absolute offset in the formerly
|
||||
// first span to a relative offset.
|
||||
spans[1].Offset -= b.Index + 1
|
||||
}
|
||||
return spans, buckets, 0, 0, 0
|
||||
}
|
||||
if spans[0].Offset == b.Index {
|
||||
// Just add to first bucket.
|
||||
buckets[0] += b.Count
|
||||
return spans, buckets, 0, 0, 0
|
||||
}
|
||||
// We are behind the first bucket, so set everything to the
|
||||
// first bucket and continue normally.
|
||||
iSpan, iBucket, iInSpan = 0, 0, 0
|
||||
index = spans[0].Offset
|
||||
}
|
||||
deltaIndex := b.Index - index
|
||||
for {
|
||||
remainingInSpan := int32(spans[iSpan].Length) - iInSpan
|
||||
if deltaIndex < remainingInSpan {
|
||||
// Bucket is in current span.
|
||||
iBucket += int(deltaIndex)
|
||||
iInSpan += deltaIndex
|
||||
buckets[iBucket] += b.Count
|
||||
return spans, buckets, iSpan, iBucket, iInSpan
|
||||
}
|
||||
deltaIndex -= remainingInSpan
|
||||
iBucket += int(remainingInSpan)
|
||||
iSpan++
|
||||
if iSpan == len(spans) || deltaIndex < spans[iSpan].Offset {
|
||||
// Bucket is in gap behind previous span (or there are no further spans).
|
||||
buckets = append(buckets, 0)
|
||||
copy(buckets[iBucket+1:], buckets[iBucket:])
|
||||
buckets[iBucket] = b.Count
|
||||
if deltaIndex == 0 {
|
||||
// Directly after previous span, extend previous span.
|
||||
if iSpan < len(spans) {
|
||||
spans[iSpan].Offset--
|
||||
}
|
||||
iSpan--
|
||||
iInSpan = int32(spans[iSpan].Length)
|
||||
spans[iSpan].Length++
|
||||
return spans, buckets, iSpan, iBucket, iInSpan
|
||||
}
|
||||
if iSpan < len(spans) && deltaIndex == spans[iSpan].Offset-1 {
|
||||
// Directly before next span, extend next span.
|
||||
iInSpan = 0
|
||||
spans[iSpan].Offset--
|
||||
spans[iSpan].Length++
|
||||
return spans, buckets, iSpan, iBucket, iInSpan
|
||||
}
|
||||
// No next span, or next span is not directly adjacent to new bucket.
|
||||
// Add new span.
|
||||
iInSpan = 0
|
||||
if iSpan < len(spans) {
|
||||
spans[iSpan].Offset -= deltaIndex + 1
|
||||
}
|
||||
spans = append(spans, Span{})
|
||||
copy(spans[iSpan+1:], spans[iSpan:])
|
||||
spans[iSpan] = Span{Length: 1, Offset: deltaIndex}
|
||||
return spans, buckets, iSpan, iBucket, iInSpan
|
||||
}
|
||||
// Try start of next span.
|
||||
deltaIndex -= spans[iSpan].Offset
|
||||
iInSpan = 0
|
||||
}
|
||||
}
|
||||
|
||||
// Compact eliminates empty buckets at the beginning and end of each span, then
|
||||
// merges spans that are consecutive or at most maxEmptyBuckets apart, and
|
||||
// finally splits spans that contain more consecutive empty buckets than
|
||||
|
@ -1033,3 +922,133 @@ func mergeToSchema(originSpans []Span, originBuckets []float64, originSchema, ta
|
|||
|
||||
return targetSpans, targetBuckets
|
||||
}
|
||||
|
||||
// addBuckets adds the buckets described by spansB/bucketsB to the buckets described by spansA/bucketsA,
|
||||
// creating missing buckets in spansA/bucketsA as needed.
|
||||
// It returns the resulting spans/buckets (which must be used instead of the original spansA/bucketsA,
|
||||
// although spansA/bucketsA might get modified by this function).
|
||||
// All buckets must use the same provided schema.
|
||||
// Buckets in spansB/bucketsB with an absolute upper limit ≤ threshold are ignored.
|
||||
// If negative is true, the buckets in spansB/bucketsB are subtracted rather than added.
|
||||
func addBuckets(
|
||||
schema int32, threshold float64, negative bool,
|
||||
spansA []Span, bucketsA []float64,
|
||||
spansB []Span, bucketsB []float64,
|
||||
) ([]Span, []float64) {
|
||||
var (
|
||||
iSpan int = -1
|
||||
iBucket int = -1
|
||||
iInSpan int32
|
||||
indexA int32
|
||||
indexB int32
|
||||
bIdxB int
|
||||
bucketB float64
|
||||
deltaIndex int32
|
||||
lowerThanThreshold = true
|
||||
)
|
||||
|
||||
for _, spanB := range spansB {
|
||||
indexB += spanB.Offset
|
||||
for j := 0; j < int(spanB.Length); j++ {
|
||||
if lowerThanThreshold && getBound(indexB, schema) <= threshold {
|
||||
goto nextLoop
|
||||
}
|
||||
lowerThanThreshold = false
|
||||
|
||||
bucketB = bucketsB[bIdxB]
|
||||
if negative {
|
||||
bucketB *= -1
|
||||
}
|
||||
|
||||
if iSpan == -1 {
|
||||
if len(spansA) == 0 || spansA[0].Offset > indexB {
|
||||
// Add bucket before all others.
|
||||
bucketsA = append(bucketsA, 0)
|
||||
copy(bucketsA[1:], bucketsA)
|
||||
bucketsA[0] = bucketB
|
||||
if len(spansA) > 0 && spansA[0].Offset == indexB+1 {
|
||||
spansA[0].Length++
|
||||
spansA[0].Offset--
|
||||
goto nextLoop
|
||||
} else {
|
||||
spansA = append(spansA, Span{})
|
||||
copy(spansA[1:], spansA)
|
||||
spansA[0] = Span{Offset: indexB, Length: 1}
|
||||
if len(spansA) > 1 {
|
||||
// Convert the absolute offset in the formerly
|
||||
// first span to a relative offset.
|
||||
spansA[1].Offset -= indexB + 1
|
||||
}
|
||||
goto nextLoop
|
||||
}
|
||||
} else if spansA[0].Offset == indexB {
|
||||
// Just add to first bucket.
|
||||
bucketsA[0] += bucketB
|
||||
goto nextLoop
|
||||
}
|
||||
iSpan, iBucket, iInSpan = 0, 0, 0
|
||||
indexA = spansA[0].Offset
|
||||
}
|
||||
deltaIndex = indexB - indexA
|
||||
for {
|
||||
remainingInSpan := int32(spansA[iSpan].Length) - iInSpan
|
||||
if deltaIndex < remainingInSpan {
|
||||
// Bucket is in current span.
|
||||
iBucket += int(deltaIndex)
|
||||
iInSpan += deltaIndex
|
||||
bucketsA[iBucket] += bucketB
|
||||
break
|
||||
} else {
|
||||
deltaIndex -= remainingInSpan
|
||||
iBucket += int(remainingInSpan)
|
||||
iSpan++
|
||||
if iSpan == len(spansA) || deltaIndex < spansA[iSpan].Offset {
|
||||
// Bucket is in gap behind previous span (or there are no further spans).
|
||||
bucketsA = append(bucketsA, 0)
|
||||
copy(bucketsA[iBucket+1:], bucketsA[iBucket:])
|
||||
bucketsA[iBucket] = bucketB
|
||||
switch {
|
||||
case deltaIndex == 0:
|
||||
// Directly after previous span, extend previous span.
|
||||
if iSpan < len(spansA) {
|
||||
spansA[iSpan].Offset--
|
||||
}
|
||||
iSpan--
|
||||
iInSpan = int32(spansA[iSpan].Length)
|
||||
spansA[iSpan].Length++
|
||||
goto nextLoop
|
||||
case iSpan < len(spansA) && deltaIndex == spansA[iSpan].Offset-1:
|
||||
// Directly before next span, extend next span.
|
||||
iInSpan = 0
|
||||
spansA[iSpan].Offset--
|
||||
spansA[iSpan].Length++
|
||||
goto nextLoop
|
||||
default:
|
||||
// No next span, or next span is not directly adjacent to new bucket.
|
||||
// Add new span.
|
||||
iInSpan = 0
|
||||
if iSpan < len(spansA) {
|
||||
spansA[iSpan].Offset -= deltaIndex + 1
|
||||
}
|
||||
spansA = append(spansA, Span{})
|
||||
copy(spansA[iSpan+1:], spansA[iSpan:])
|
||||
spansA[iSpan] = Span{Length: 1, Offset: deltaIndex}
|
||||
goto nextLoop
|
||||
}
|
||||
} else {
|
||||
// Try start of next span.
|
||||
deltaIndex -= spansA[iSpan].Offset
|
||||
iInSpan = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nextLoop:
|
||||
indexA = indexB
|
||||
indexB++
|
||||
bIdxB++
|
||||
}
|
||||
}
|
||||
|
||||
return spansA, bucketsA
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue