histograms: Improve tests and fix exposed bugs

This adds negative buckets and access of float histograms to
TestHistogramChunkSameBuckets and TestHistogramChunkBucketChanges.

It also exercises a specific pattern of reusing an iterator (one where
no access has happened).

This exposes two bugs (where entries for positive buckets where used
where the corresponding entries for negative buckets should have been
used). One was fixed in #11627 (not merged), which triggered the work
in this commit.

This commit fixes both issues, so #11627 can be closed.

It also simplifies the code in the histogramIterator.Next method that
aims to recycle existing slice capacity.

Furthermore, this is on top of the release-2.40 branch because we
should probably cut a bugfix release for this.

Signed-off-by: beorn7 <beorn@grafana.com>
pull/11699/head
beorn7 2022-12-11 23:39:53 +01:00
parent e1506e7be8
commit 5f366e9b62
2 changed files with 95 additions and 74 deletions

View File

@ -176,7 +176,7 @@ func newHistogramIterator(b []byte) *histogramIterator {
} }
func (c *HistogramChunk) iterator(it Iterator) *histogramIterator { func (c *HistogramChunk) iterator(it Iterator) *histogramIterator {
// This commet is copied from XORChunk.iterator: // This comment is copied from XORChunk.iterator:
// Should iterators guarantee to act on a copy of the data so it doesn't lock append? // Should iterators guarantee to act on a copy of the data so it doesn't lock append?
// When using striped locks to guard access to chunks, probably yes. // When using striped locks to guard access to chunks, probably yes.
// Could only copy data if the chunk is not completed yet. // Could only copy data if the chunk is not completed yet.
@ -651,7 +651,7 @@ func (it *histogramIterator) Reset(b []byte) {
} }
it.pBucketsDelta = it.pBucketsDelta[:0] it.pBucketsDelta = it.pBucketsDelta[:0]
it.pBucketsDelta = it.pBucketsDelta[:0] it.nBucketsDelta = it.nBucketsDelta[:0]
it.sum = 0 it.sum = 0
it.leading = 0 it.leading = 0
@ -677,36 +677,17 @@ func (it *histogramIterator) Next() ValueType {
it.zThreshold = zeroThreshold it.zThreshold = zeroThreshold
it.pSpans, it.nSpans = posSpans, negSpans it.pSpans, it.nSpans = posSpans, negSpans
numPBuckets, numNBuckets := countSpans(posSpans), countSpans(negSpans) numPBuckets, numNBuckets := countSpans(posSpans), countSpans(negSpans)
// Allocate bucket slices as needed, recycling existing slices // The code below recycles existing slices in case this iterator
// in case this iterator was reset and already has slices of a // was reset and already has slices of a sufficient capacity.
// sufficient capacity.
if numPBuckets > 0 { if numPBuckets > 0 {
if cap(it.pBuckets) < numPBuckets { it.pBuckets = append(it.pBuckets, make([]int64, numPBuckets)...)
it.pBuckets = make([]int64, numPBuckets) it.pBucketsDelta = append(it.pBucketsDelta, make([]int64, numPBuckets)...)
// If cap(it.pBuckets) isn't sufficient, neither is the cap of the others. it.pFloatBuckets = append(it.pFloatBuckets, make([]float64, numPBuckets)...)
it.pBucketsDelta = make([]int64, numPBuckets)
it.pFloatBuckets = make([]float64, numPBuckets)
} else {
for i := 0; i < numPBuckets; i++ {
it.pBuckets = append(it.pBuckets, 0)
it.pBucketsDelta = append(it.pBucketsDelta, 0)
it.pFloatBuckets = append(it.pFloatBuckets, 0)
}
}
} }
if numNBuckets > 0 { if numNBuckets > 0 {
if cap(it.nBuckets) < numNBuckets { it.nBuckets = append(it.nBuckets, make([]int64, numNBuckets)...)
it.nBuckets = make([]int64, numNBuckets) it.nBucketsDelta = append(it.nBucketsDelta, make([]int64, numNBuckets)...)
// If cap(it.nBuckets) isn't sufficient, neither is the cap of the others. it.nFloatBuckets = append(it.nFloatBuckets, make([]float64, numNBuckets)...)
it.nBucketsDelta = make([]int64, numNBuckets)
it.nFloatBuckets = make([]float64, numNBuckets)
} else {
for i := 0; i < numNBuckets; i++ {
it.nBuckets = append(it.nBuckets, 0)
it.nBucketsDelta = append(it.nBucketsDelta, 0)
it.pFloatBuckets = append(it.pFloatBuckets, 0)
}
}
} }
// Now read the actual data. // Now read the actual data.

View File

@ -21,9 +21,15 @@ import (
"github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/histogram"
) )
type result struct {
t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
func TestHistogramChunkSameBuckets(t *testing.T) { func TestHistogramChunkSameBuckets(t *testing.T) {
c := NewHistogramChunk() c := NewHistogramChunk()
var exp []res var exp []result
// Create fresh appender and add the first histogram. // Create fresh appender and add the first histogram.
app, err := c.Appender() app, err := c.Appender()
@ -32,7 +38,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
ts := int64(1234567890) ts := int64(1234567890)
h := &histogram.Histogram{ h := &histogram.Histogram{
Count: 5, Count: 15,
ZeroCount: 2, ZeroCount: 2,
Sum: 18.4, Sum: 18.4,
ZeroThreshold: 1e-100, ZeroThreshold: 1e-100,
@ -42,20 +48,26 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
{Offset: 1, Length: 2}, {Offset: 1, Length: 2},
}, },
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5) PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
{Offset: 2, Length: 3},
},
NegativeBuckets: []int64{2, 1, -1, -1}, // counts: 2, 3, 2, 1 (total 8)
} }
app.AppendHistogram(ts, h) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
require.Equal(t, 1, c.NumSamples()) require.Equal(t, 1, c.NumSamples())
// Add an updated histogram. // Add an updated histogram.
ts += 16 ts += 16
h = h.Copy() h = h.Copy()
h.Count += 9 h.Count = 32
h.ZeroCount++ h.ZeroCount++
h.Sum = 24.4 h.Sum = 24.4
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
app.AppendHistogram(ts, h) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
require.Equal(t, 2, c.NumSamples()) require.Equal(t, 2, c.NumSamples())
// Add update with new appender. // Add update with new appender.
@ -64,59 +76,77 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
ts += 14 ts += 14
h = h.Copy() h = h.Copy()
h.Count += 13 h.Count = 54
h.ZeroCount += 2 h.ZeroCount += 2
h.Sum = 24.4 h.Sum = 24.4
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
app.AppendHistogram(ts, h) app.AppendHistogram(ts, h)
exp = append(exp, res{t: ts, h: h}) exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
require.Equal(t, 3, c.NumSamples()) require.Equal(t, 3, c.NumSamples())
// 1. Expand iterator in simple case. // 1. Expand iterator in simple case.
it := c.iterator(nil) it := c.Iterator(nil)
require.NoError(t, it.Err()) require.NoError(t, it.Err())
var act []res var act []result
for it.Next() == ValHistogram { for it.Next() == ValHistogram {
ts, h := it.AtHistogram() ts, h := it.AtHistogram()
act = append(act, res{t: ts, h: h}) fts, fh := it.AtFloatHistogram()
require.Equal(t, ts, fts)
act = append(act, result{t: ts, h: h, fh: fh})
} }
require.NoError(t, it.Err()) require.NoError(t, it.Err())
require.Equal(t, exp, act) require.Equal(t, exp, act)
// 2. Expand second iterator while reusing first one. // 2. Expand second iterator while reusing first one.
it2 := c.Iterator(it) it2 := c.Iterator(it)
var res2 []res var act2 []result
for it2.Next() == ValHistogram { for it2.Next() == ValHistogram {
ts, h := it2.AtHistogram() ts, h := it2.AtHistogram()
res2 = append(res2, res{t: ts, h: h}) fts, fh := it2.AtFloatHistogram()
require.Equal(t, ts, fts)
act2 = append(act2, result{t: ts, h: h, fh: fh})
} }
require.NoError(t, it2.Err()) require.NoError(t, it2.Err())
require.Equal(t, exp, res2) require.Equal(t, exp, act2)
// 3. Test iterator Seek. // 3. Now recycle an iterator that was never used to access anything.
// mid := len(exp) / 2 itX := c.Iterator(nil)
for itX.Next() == ValHistogram {
// Just iterate through without accessing anything.
}
it3 := c.iterator(itX)
var act3 []result
for it3.Next() == ValHistogram {
ts, h := it3.AtHistogram()
fts, fh := it3.AtFloatHistogram()
require.Equal(t, ts, fts)
act3 = append(act3, result{t: ts, h: h, fh: fh})
}
require.NoError(t, it3.Err())
require.Equal(t, exp, act3)
// it3 := c.Iterator(nil) // 4. Test iterator Seek.
// var res3 []pair mid := len(exp) / 2
// require.Equal(t, true, it3.Seek(exp[mid].t)) it4 := c.Iterator(nil)
var act4 []result
require.Equal(t, ValHistogram, it4.Seek(exp[mid].t))
// Below ones should not matter. // Below ones should not matter.
// require.Equal(t, true, it3.Seek(exp[mid].t)) require.Equal(t, ValHistogram, it4.Seek(exp[mid].t))
// require.Equal(t, true, it3.Seek(exp[mid].t)) require.Equal(t, ValHistogram, it4.Seek(exp[mid].t))
// ts, v = it3.At() ts, h = it4.AtHistogram()
// res3 = append(res3, pair{t: ts, v: v}) fts, fh := it4.AtFloatHistogram()
require.Equal(t, ts, fts)
// for it3.Next() { act4 = append(act4, result{t: ts, h: h, fh: fh})
// ts, v := it3.At() for it4.Next() == ValHistogram {
// res3 = append(res3, pair{t: ts, v: v}) ts, h := it4.AtHistogram()
// } fts, fh := it4.AtFloatHistogram()
// require.NoError(t, it3.Err()) require.Equal(t, ts, fts)
// require.Equal(t, exp[mid:], res3) act4 = append(act4, result{t: ts, h: h, fh: fh})
// require.Equal(t, false, it3.Seek(exp[len(exp)-1].t+1)) }
} require.NoError(t, it4.Err())
require.Equal(t, exp[mid:], act4)
type res struct { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1))
t int64
h *histogram.Histogram
} }
// Mimics the scenario described for compareSpans(). // Mimics the scenario described for compareSpans().
@ -130,7 +160,7 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
ts1 := int64(1234567890) ts1 := int64(1234567890)
h1 := &histogram.Histogram{ h1 := &histogram.Histogram{
Count: 5, Count: 27,
ZeroCount: 2, ZeroCount: 2,
Sum: 18.4, Sum: 18.4,
ZeroThreshold: 1e-125, ZeroThreshold: 1e-125,
@ -143,6 +173,8 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
{Offset: 1, Length: 1}, {Offset: 1, Length: 1},
}, },
PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24) PositiveBuckets: []int64{6, -3, 0, -1, 2, 1, -4}, // counts: 6, 3, 3, 2, 4, 5, 1 (total 24)
NegativeSpans: []histogram.Span{{Offset: 1, Length: 1}},
NegativeBuckets: []int64{1},
} }
app.AppendHistogram(ts1, h1) app.AppendHistogram(ts1, h1)
@ -157,19 +189,23 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
{Offset: 1, Length: 4}, {Offset: 1, Length: 4},
{Offset: 3, Length: 3}, {Offset: 3, Length: 3},
} }
h2.Count += 9 h2.NegativeSpans = []histogram.Span{{Offset: 0, Length: 2}}
h2.Count = 35
h2.ZeroCount++ h2.ZeroCount++
h2.Sum = 30 h2.Sum = 30
// Existing histogram should get values converted from the above to: // Existing histogram should get values converted from the above to:
// 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between) // 6 3 0 3 0 0 2 4 5 0 1 (previous values with some new empty buckets in between)
// so the new histogram should have new counts >= these per-bucket counts, e.g.: // so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30) h2.PositiveBuckets = []int64{7, -2, -4, 2, -2, -1, 2, 3, 0, -5, 1} // 7 5 1 3 1 0 2 5 5 0 1 (total 30)
// Existing histogram should get values converted from the above to:
// 0 1 (previous values with some new empty buckets in between)
// so the new histogram should have new counts >= these per-bucket counts, e.g.:
h2.NegativeBuckets = []int64{2, -1} // 2 1 (total 3)
// This is how span changes will be handled. // This is how span changes will be handled.
hApp, _ := app.(*HistogramAppender) hApp, _ := app.(*HistogramAppender)
posInterjections, negInterjections, ok, cr := hApp.Appendable(h2) posInterjections, negInterjections, ok, cr := hApp.Appendable(h2)
require.Greater(t, len(posInterjections), 0) require.Greater(t, len(posInterjections), 0)
require.Equal(t, 0, len(negInterjections)) require.Greater(t, len(negInterjections), 0)
require.True(t, ok) // Only new buckets came in. require.True(t, ok) // Only new buckets came in.
require.False(t, cr) require.False(t, cr)
c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans) c, app = hApp.Recode(posInterjections, negInterjections, h2.PositiveSpans, h2.NegativeSpans)
@ -182,21 +218,25 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
// metadata as well as the expanded buckets. // metadata as well as the expanded buckets.
h1.PositiveSpans = h2.PositiveSpans h1.PositiveSpans = h2.PositiveSpans
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
exp := []res{ h1.NegativeSpans = h2.NegativeSpans
{t: ts1, h: h1}, h1.NegativeBuckets = []int64{0, 1}
{t: ts2, h: h2}, exp := []result{
{t: ts1, h: h1, fh: h1.ToFloat()},
{t: ts2, h: h2, fh: h2.ToFloat()},
} }
it := c.Iterator(nil) it := c.Iterator(nil)
var act []res var act []result
for it.Next() == ValHistogram { for it.Next() == ValHistogram {
ts, h := it.AtHistogram() ts, h := it.AtHistogram()
act = append(act, res{t: ts, h: h}) fts, fh := it.AtFloatHistogram()
require.Equal(t, ts, fts)
act = append(act, result{t: ts, h: h, fh: fh})
} }
require.NoError(t, it.Err()) require.NoError(t, it.Err())
require.Equal(t, exp, act) require.Equal(t, exp, act)
} }
func TestHistoChunkAppendable(t *testing.T) { func TestHistogramChunkAppendable(t *testing.T) {
c := Chunk(NewHistogramChunk()) c := Chunk(NewHistogramChunk())
// Create fresh appender and add the first histogram. // Create fresh appender and add the first histogram.