histograms: Return actually useful counter reset hints

This is a bit more conservative than we could be. As long as a chunk
isn't the first in a block, we can be pretty sure that the previous
chunk won't disappear. However, the incremental gain of returning
NotCounterReset in these cases is probably very small and might not be
worth the code complications.

Wwith this, we now also pay attention to an explicitly set counter
reset during ingestion. While the case doesn't show up in practice
yet, there could be scenarios where the metric source knows there was
a counter reset even if it might not be visible from the values in the
histogram. It is also useful for testing.

Signed-off-by: beorn7 <beorn@grafana.com>
pull/11864/head
beorn7 2023-01-18 17:59:29 +01:00
parent 57c18420ab
commit 1cfc8f65a3
13 changed files with 355 additions and 142 deletions

View File

@ -3125,7 +3125,7 @@ func TestRangeQuery(t *testing.T) {
}
}
func TestSparseHistogramRate(t *testing.T) {
func TestNativeHistogramRate(t *testing.T) {
// TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there.
test, err := NewTest(t, "")
@ -3155,20 +3155,22 @@ func TestSparseHistogramRate(t *testing.T) {
require.Len(t, vector, 1)
actualHistogram := vector[0].H
expectedHistogram := &histogram.FloatHistogram{
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 8. / 15.,
Sum: 1.226666666666667,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
// TODO(beorn7): This should be GaugeType. Change it once supported by PromQL.
CounterResetHint: histogram.NotCounterReset,
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 8. / 15.,
Sum: 1.226666666666667,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
}
require.Equal(t, expectedHistogram, actualHistogram)
}
func TestSparseFloatHistogramRate(t *testing.T) {
func TestNativeFloatHistogramRate(t *testing.T) {
// TODO(beorn7): Integrate histograms into the PromQL testing framework
// and write more tests there.
test, err := NewTest(t, "")
@ -3198,20 +3200,22 @@ func TestSparseFloatHistogramRate(t *testing.T) {
require.Len(t, vector, 1)
actualHistogram := vector[0].H
expectedHistogram := &histogram.FloatHistogram{
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 8. / 15.,
Sum: 1.226666666666667,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
// TODO(beorn7): This should be GaugeType. Change it once supported by PromQL.
CounterResetHint: histogram.NotCounterReset,
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 8. / 15.,
Sum: 1.226666666666667,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
}
require.Equal(t, expectedHistogram, actualHistogram)
}
func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
h := &histogram.Histogram{
@ -3290,7 +3294,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) {
}
}
func TestSparseHistogram_HistogramQuantile(t *testing.T) {
func TestNativeHistogram_HistogramQuantile(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
type subCase struct {
@ -3526,7 +3530,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) {
}
}
func TestSparseHistogram_HistogramFraction(t *testing.T) {
func TestNativeHistogram_HistogramFraction(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
type subCase struct {
@ -3961,7 +3965,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) {
}
}
func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) {
func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
// TODO(codesome): Integrate histograms into the PromQL testing framework
// and write more tests there.
cases := []struct {

View File

@ -440,6 +440,10 @@ type chainSampleIterator struct {
curr chunkenc.Iterator
lastT int64
// Whether the previous and the current sample are direct neighbors
// within the same base iterator.
consecutive bool
}
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
@ -485,6 +489,9 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
if c.curr != nil && c.lastT >= t {
return c.curr.Seek(c.lastT)
}
// Don't bother to find out if the next sample is consecutive. Callers
// of Seek usually aren't interested anyway.
c.consecutive = false
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) != chunkenc.ValNone {
@ -511,14 +518,26 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) {
if c.curr == nil {
panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.")
}
return c.curr.AtHistogram()
t, h := c.curr.AtHistogram()
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore that there was no counter reset.
if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset {
h.CounterResetHint = histogram.UnknownCounterReset
}
return t, h
}
func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
if c.curr == nil {
panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.")
}
return c.curr.AtFloatHistogram()
t, fh := c.curr.AtFloatHistogram()
// If the current sample is not consecutive with the previous one, we
// cannot be sure anymore that there was no counter reset.
if !c.consecutive && fh.CounterResetHint == histogram.NotCounterReset {
fh.CounterResetHint = histogram.UnknownCounterReset
}
return t, fh
}
func (c *chainSampleIterator) AtT() int64 {
@ -529,7 +548,13 @@ func (c *chainSampleIterator) AtT() int64 {
}
func (c *chainSampleIterator) Next() chunkenc.ValueType {
var (
currT int64
currValueType chunkenc.ValueType
iteratorChanged bool
)
if c.h == nil {
iteratorChanged = true
c.h = samplesIteratorHeap{}
// We call c.curr.Next() as the first thing below.
// So, we don't call Next() on it here.
@ -545,8 +570,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
return chunkenc.ValNone
}
var currT int64
var currValueType chunkenc.ValueType
for {
currValueType = c.curr.Next()
if currValueType != chunkenc.ValNone {
@ -576,6 +599,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
}
c.curr = heap.Pop(&c.h).(chunkenc.Iterator)
iteratorChanged = true
currT = c.curr.AtT()
currValueType = c.curr.Seek(currT)
if currT != c.lastT {
@ -583,6 +607,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType {
}
}
c.consecutive = !iteratorChanged
c.lastT = currT
return currValueType
}

View File

@ -632,6 +632,16 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, flo
},
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
}
if ts != mint {
// By setting the counter reset hint to "no counter
// reset" for all histograms but the first, we cover the
// most common cases. If the series is manipulated later
// or spans more than one block when ingested into the
// storage, the hint has to be adjusted. Note that the
// storage itself treats this particular hint the same
// as "unknown".
h.CounterResetHint = histogram.NotCounterReset
}
if floatHistogram {
return sample{t: ts, fh: h.ToFloat()}
}
@ -661,6 +671,13 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in
},
PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0},
}
if count > 1 && count%5 != 1 {
// Same rationale for this as above in
// genHistogramSeries, just that we have to be
// smarter to find out if the previous sample
// was a histogram, too.
h.CounterResetHint = histogram.NotCounterReset
}
if floatHistogram {
s = sample{t: ts, fh: h.ToFloat()}
} else {

View File

@ -628,12 +628,8 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis
return it.t, &histogram.FloatHistogram{Sum: it.sum.value}
}
it.atFloatHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.FloatHistogram{
CounterResetHint: crHint,
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt.value,
ZeroCount: it.zCnt.value,
Sum: it.sum.value,

View File

@ -66,7 +66,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
app.AppendFloatHistogram(ts, h.ToFloat())
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
expH := h.ToFloat()
expH.CounterResetHint = histogram.NotCounterReset
exp = append(exp, floatResult{t: ts, h: expH})
require.Equal(t, 2, c.NumSamples())
// Add update with new appender.
@ -81,7 +83,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) {
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
app.AppendFloatHistogram(ts, h.ToFloat())
exp = append(exp, floatResult{t: ts, h: h.ToFloat()})
expH = h.ToFloat()
expH.CounterResetHint = histogram.NotCounterReset
exp = append(exp, floatResult{t: ts, h: expH})
require.Equal(t, 3, c.NumSamples())
// 1. Expand iterator in simple case.
@ -209,9 +213,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) {
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
h1.NegativeSpans = h2.NegativeSpans
h1.NegativeBuckets = []int64{0, 1}
expH2 := h2.ToFloat()
expH2.CounterResetHint = histogram.NotCounterReset
exp := []floatResult{
{t: ts1, h: h1.ToFloat()},
{t: ts2, h: h2.ToFloat()},
{t: ts2, h: expH2},
}
it := c.Iterator(nil)
var act []floatResult

View File

@ -667,12 +667,8 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) {
return it.t, &histogram.Histogram{Sum: it.sum}
}
it.atHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.Histogram{
CounterResetHint: crHint,
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: it.cnt,
ZeroCount: it.zCnt,
Sum: it.sum,
@ -690,12 +686,8 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra
return it.t, &histogram.FloatHistogram{Sum: it.sum}
}
it.atFloatHistogramCalled = true
crHint := histogram.UnknownCounterReset
if it.counterResetHeader == GaugeType {
crHint = histogram.GaugeType
}
return it.t, &histogram.FloatHistogram{
CounterResetHint: crHint,
CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead),
Count: float64(it.cnt),
ZeroCount: float64(it.zCnt),
Sum: it.sum,

View File

@ -452,3 +452,38 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV {
}
return out
}
// counterResetHint returns a CounterResetHint based on the CounterResetHeader
// and on the position into the chunk.
func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint {
switch {
case crh == GaugeType:
// A gauge histogram chunk only contains gauge histograms.
return histogram.GaugeType
case numRead > 1:
// In a counter histogram chunk, there will not be any counter
// resets after the first histogram.
return histogram.NotCounterReset
case crh == CounterReset:
// If the chunk was started because of a counter reset, we can
// safely return that hint. This histogram always has to be
// treated as a counter reset.
return histogram.CounterReset
default:
// Sadly, we have to return "unknown" as the hint for all other
// cases, even if we know that the chunk was started without a
// counter reset. But we cannot be sure that the previous chunk
// still exists in the TSDB, so we conservatively return
// "unknown". On the bright side, this case should be relatively
// rare.
//
// TODO(beorn7): Nevertheless, if the current chunk is in the
// middle of a block (not the first chunk in the block for this
// series), it's probably safe to assume that the previous chunk
// will exist in the TSDB for as long as the current chunk
// exist, and we could safely return
// "histogram.NotCounterReset". This needs some more work and
// might not be worth the effort and/or risk. To be vetted...
return histogram.UnknownCounterReset
}
}

View File

@ -67,7 +67,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14)
h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15)
app.AppendHistogram(ts, h)
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
hExp := h.Copy()
hExp.CounterResetHint = histogram.NotCounterReset
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
require.Equal(t, 2, c.NumSamples())
// Add update with new appender.
@ -82,7 +84,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) {
h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27)
h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22)
app.AppendHistogram(ts, h)
exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()})
hExp = h.Copy()
hExp.CounterResetHint = histogram.NotCounterReset
exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()})
require.Equal(t, 3, c.NumSamples())
// 1. Expand iterator in simple case.
@ -220,9 +224,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) {
h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}
h1.NegativeSpans = h2.NegativeSpans
h1.NegativeBuckets = []int64{0, 1}
hExp := h2.Copy()
hExp.CounterResetHint = histogram.NotCounterReset
exp := []result{
{t: ts1, h: h1, fh: h1.ToFloat()},
{t: ts2, h: h2, fh: h2.ToFloat()},
{t: ts2, h: hExp, fh: hExp.ToFloat()},
}
it := c.Iterator(nil)
var act []result
@ -463,11 +469,12 @@ func TestAtFloatHistogram(t *testing.T) {
NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2},
},
{
Schema: 0,
Count: 36,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
CounterResetHint: histogram.NotCounterReset,
Schema: 0,
Count: 36,
Sum: 2345.6,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},
@ -482,11 +489,12 @@ func TestAtFloatHistogram(t *testing.T) {
NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2},
},
{
Schema: 0,
Count: 36,
Sum: 1111.1,
ZeroThreshold: 0.001,
ZeroCount: 5,
CounterResetHint: histogram.NotCounterReset,
Schema: 0,
Count: 36,
Sum: 1111.1,
ZeroThreshold: 0.001,
ZeroCount: 5,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 0, Length: 0},

View File

@ -1308,17 +1308,31 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() }
ctx := context.Background()
appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
appendHistogram := func(
lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample,
) {
t.Helper()
app := head.Appender(ctx)
for tsMinute := from; tsMinute <= to; tsMinute++ {
var err error
if floatTest {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
efh := h.ToFloat()
if tsMinute == from {
efh.CounterResetHint = histogram.UnknownCounterReset
} else {
efh.CounterResetHint = histogram.NotCounterReset
}
*exp = append(*exp, sample{t: minute(tsMinute), fh: efh})
} else {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
eh := h.Copy()
if tsMinute == from {
eh.CounterResetHint = histogram.UnknownCounterReset
} else {
eh.CounterResetHint = histogram.NotCounterReset
}
*exp = append(*exp, sample{t: minute(tsMinute), h: eh})
}
require.NoError(t, err)
}

View File

@ -5836,16 +5836,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
})
ctx := context.Background()
appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) {
appendHistogram := func(
lbls labels.Labels, tsMinute int, h *histogram.Histogram,
exp *[]tsdbutil.Sample, expCRH histogram.CounterResetHint,
) {
t.Helper()
var err error
app := db.Appender(ctx)
if floatHistogram {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat())
*exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()})
efh := h.ToFloat()
efh.CounterResetHint = expCRH
*exp = append(*exp, sample{t: minute(tsMinute), fh: efh})
} else {
_, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil)
*exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()})
eh := h.Copy()
eh.CounterResetHint = expCRH
*exp = append(*exp, sample{t: minute(tsMinute), h: eh})
}
require.NoError(t, err)
require.NoError(t, app.Commit())
@ -5897,23 +5904,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
t.Run("series with only histograms", func(t *testing.T) {
h := baseH.Copy() // This is shared across all sub tests.
appendHistogram(series1, 100, h, &exp1)
appendHistogram(series1, 100, h, &exp1, histogram.UnknownCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
h.PositiveBuckets[0]++
h.NegativeBuckets[0] += 2
h.Count += 10
appendHistogram(series1, 101, h, &exp1)
appendHistogram(series1, 101, h, &exp1, histogram.NotCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
t.Run("changing schema", func(t *testing.T) {
h.Schema = 2
appendHistogram(series1, 102, h, &exp1)
appendHistogram(series1, 102, h, &exp1, histogram.UnknownCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Schema back to old.
h.Schema = 1
appendHistogram(series1, 103, h, &exp1)
appendHistogram(series1, 103, h, &exp1, histogram.UnknownCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
})
@ -5942,7 +5949,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
h.PositiveSpans[1].Length++
h.PositiveBuckets = append(h.PositiveBuckets, 1)
h.Count += 3
appendHistogram(series1, 104, h, &exp1)
appendHistogram(series1, 104, h, &exp1, histogram.NotCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// Because of the previous two histograms being on the active chunk,
@ -5980,14 +5987,14 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
h.Count += 3
// {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1}
h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...)
appendHistogram(series1, 105, h, &exp1)
appendHistogram(series1, 105, h, &exp1, histogram.NotCounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
// We add 4 more histograms to clear out the buffer and see the re-encoded histograms.
appendHistogram(series1, 106, h, &exp1)
appendHistogram(series1, 107, h, &exp1)
appendHistogram(series1, 108, h, &exp1)
appendHistogram(series1, 109, h, &exp1)
appendHistogram(series1, 106, h, &exp1, histogram.NotCounterReset)
appendHistogram(series1, 107, h, &exp1, histogram.NotCounterReset)
appendHistogram(series1, 108, h, &exp1, histogram.NotCounterReset)
appendHistogram(series1, 109, h, &exp1, histogram.NotCounterReset)
// Update the expected histograms to reflect the re-encoding.
if floatHistogram {
@ -6020,7 +6027,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
t.Run("buckets disappearing", func(t *testing.T) {
h.PositiveSpans[1].Length--
h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1]
appendHistogram(series1, 110, h, &exp1)
appendHistogram(series1, 110, h, &exp1, histogram.CounterReset)
testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1})
})
})
@ -6032,9 +6039,9 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
h := baseH.Copy()
appendHistogram(series2, 103, h, &exp2)
appendHistogram(series2, 104, h, &exp2)
appendHistogram(series2, 105, h, &exp2)
appendHistogram(series2, 103, h, &exp2, histogram.UnknownCounterReset)
appendHistogram(series2, 104, h, &exp2, histogram.NotCounterReset)
appendHistogram(series2, 105, h, &exp2, histogram.NotCounterReset)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
// Switching between float and histograms again.
@ -6042,16 +6049,16 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
appendFloat(series2, 107, 107, &exp2)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
appendHistogram(series2, 108, h, &exp2)
appendHistogram(series2, 109, h, &exp2)
appendHistogram(series2, 108, h, &exp2, histogram.UnknownCounterReset)
appendHistogram(series2, 109, h, &exp2, histogram.NotCounterReset)
testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2})
})
t.Run("series starting with histogram and then getting float", func(t *testing.T) {
h := baseH.Copy()
appendHistogram(series3, 101, h, &exp3)
appendHistogram(series3, 102, h, &exp3)
appendHistogram(series3, 103, h, &exp3)
appendHistogram(series3, 101, h, &exp3, histogram.UnknownCounterReset)
appendHistogram(series3, 102, h, &exp3, histogram.NotCounterReset)
appendHistogram(series3, 103, h, &exp3, histogram.NotCounterReset)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 104, 100, &exp3)
@ -6060,8 +6067,8 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) {
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
// Switching between histogram and float again.
appendHistogram(series3, 107, h, &exp3)
appendHistogram(series3, 108, h, &exp3)
appendHistogram(series3, 107, h, &exp3, histogram.UnknownCounterReset)
appendHistogram(series3, 108, h, &exp3, histogram.NotCounterReset)
testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3})
appendFloat(series3, 109, 106, &exp3)
@ -6091,7 +6098,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
t.Helper()
opts := DefaultOptions()
opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works
opts.AllowOverlappingCompaction = true // TODO(jesusvazquez): This replaced AllowOverlappingBlocks, make sure that works.
db := openTestDB(t, opts, nil)
t.Cleanup(func() {
require.NoError(t, db.Close())
@ -6137,7 +6144,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
require.Equal(t, exp, res)
compareSeries(t, exp, res)
// Compact all the blocks together and query again.
blocks := db.Blocks()
@ -6154,10 +6161,23 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
require.Equal(t, exp, res)
// After compaction, we do not require "unknown" counter resets
// due to origin from different overlapping chunks anymore.
for _, ss := range exp {
for i, s := range ss[1:] {
if s.H() != nil && ss[i].H() != nil && s.H().CounterResetHint == histogram.UnknownCounterReset {
s.H().CounterResetHint = histogram.NotCounterReset
}
if s.FH() != nil && ss[i].FH() != nil && s.FH().CounterResetHint == histogram.UnknownCounterReset {
s.FH().CounterResetHint = histogram.NotCounterReset
}
}
}
compareSeries(t, exp, res)
}
for _, floatHistogram := range []bool{true} {
for _, floatHistogram := range []bool{false, true} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) {
t.Run("serial blocks with only histograms", func(t *testing.T) {
testBlockQuerying(t,
@ -6272,3 +6292,45 @@ func TestNativeHistogramFlag(t *testing.T) {
l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}},
}, act)
}
// compareSeries essentially replaces `require.Equal(t, expected, actual) in
// situations where the actual series might contain more counter reset hints
// "unknown" than the expected series. This can easily happen for long series
// that trigger new chunks. This function therefore tolerates counter reset
// hints "CounterReset" and "NotCounterReset" in an expected series where the
// actual series contains a counter reset hint "UnknownCounterReset".
// "GaugeType" hints are still strictly checked, and any "UnknownCounterReset"
// in an expected series has to be matched precisely by the actual series.
func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sample) {
if len(expected) != len(actual) {
// The reason for the difference is not the counter reset hints
// (alone), so let's use the pretty diffing by the require
// package.
require.Equal(t, expected, actual, "number of series differs")
}
for key, eSamples := range expected {
aSamples, ok := actual[key]
if !ok {
require.Equal(t, expected, actual, "expected series %q not found", key)
}
if len(eSamples) != len(aSamples) {
require.Equal(t, eSamples, aSamples, "number of samples for series %q differs", key)
}
for i, eS := range eSamples {
aS := aSamples[i]
aH, eH := aS.H(), eS.H()
aFH, eFH := aS.FH(), eS.FH()
switch {
case aH != nil && eH != nil && aH.CounterResetHint == histogram.UnknownCounterReset && eH.CounterResetHint != histogram.GaugeType:
eH = eH.Copy()
eH.CounterResetHint = histogram.UnknownCounterReset
eS = sample{t: eS.T(), h: eH}
case aFH != nil && eFH != nil && aFH.CounterResetHint == histogram.UnknownCounterReset && eFH.CounterResetHint != histogram.GaugeType:
eFH = eFH.Copy()
eFH.CounterResetHint = histogram.UnknownCounterReset
eS = sample{t: eS.T(), fh: eFH}
}
require.Equal(t, eS, aS, "sample %d in series %q differs", i, key)
}
}
}

View File

@ -2040,7 +2040,7 @@ func (h *Head) updateWALReplayStatusRead(current int) {
func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
for i := 0; i < n; i++ {
r = append(r, &histogram.Histogram{
h := histogram.Histogram{
Count: 10 + uint64(i*8),
ZeroCount: 2 + uint64(i),
ZeroThreshold: 0.001,
@ -2056,9 +2056,12 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
{Offset: 1, Length: 2},
},
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
})
}
if i > 0 {
h.CounterResetHint = histogram.NotCounterReset
}
r = append(r, &h)
}
return r
}
@ -2084,13 +2087,12 @@ func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) {
NegativeBuckets: []int64{int64(i + 1), 1, -1, 0},
})
}
return r
}
func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
for i := 0; i < n; i++ {
r = append(r, &histogram.FloatHistogram{
h := histogram.FloatHistogram{
Count: 10 + float64(i*8),
ZeroCount: 2 + float64(i),
ZeroThreshold: 0.001,
@ -2106,7 +2108,11 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) {
{Offset: 1, Length: 2},
},
NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)},
})
}
if i > 0 {
h.CounterResetHint = histogram.NotCounterReset
}
r = append(r, &h)
}
return r

View File

@ -1152,17 +1152,27 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset bool
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
gauge := h.CounterResetHint == histogram.GaugeType
if app != nil {
if gauge {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h)
} else {
switch h.CounterResetHint {
case histogram.GaugeType:
gauge = true
if app != nil {
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(h)
}
case histogram.CounterReset:
// The caller tells us this is a counter reset, even if it
// doesn't look like one.
counterReset = true
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h)
}
}
@ -1235,18 +1245,27 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
pForwardInserts, nForwardInserts []chunkenc.Insert
pBackwardInserts, nBackwardInserts []chunkenc.Insert
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset bool
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
gauge := fh.CounterResetHint == histogram.GaugeType
if app != nil {
if gauge {
pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh)
} else {
switch fh.CounterResetHint {
case histogram.GaugeType:
gauge = true
if app != nil {
pForwardInserts, nForwardInserts,
pBackwardInserts, nBackwardInserts,
pMergedSpans, nMergedSpans,
okToAppend = app.AppendableGauge(fh)
}
case histogram.CounterReset:
// The caller tells us this is a counter reset, even if it
// doesn't look like one.
counterReset = true
default:
if app != nil {
pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh)
}
}

View File

@ -197,7 +197,6 @@ func BenchmarkLoadWAL(b *testing.B) {
continue
}
lastExemplarsPerSeries = exemplarsPerSeries
// fmt.Println("exemplars per series: ", exemplarsPerSeries)
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT),
func(b *testing.B) {
dir := b.TempDir()
@ -2834,17 +2833,13 @@ func TestAppendHistogram(t *testing.T) {
ingestTs := int64(0)
app := head.Appender(context.Background())
type timedHistogram struct {
t int64
h *histogram.Histogram
}
expHistograms := make([]timedHistogram, 0, 2*numHistograms)
expHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms)
// Counter integer histograms.
for _, h := range GenerateTestHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
expHistograms = append(expHistograms, sample{t: ingestTs, h: h})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
@ -2856,7 +2851,7 @@ func TestAppendHistogram(t *testing.T) {
for _, h := range GenerateTestGaugeHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, h, nil)
require.NoError(t, err)
expHistograms = append(expHistograms, timedHistogram{ingestTs, h})
expHistograms = append(expHistograms, sample{t: ingestTs, h: h})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
@ -2864,17 +2859,13 @@ func TestAppendHistogram(t *testing.T) {
}
}
type timedFloatHistogram struct {
t int64
h *histogram.FloatHistogram
}
expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms)
expFloatHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms)
// Counter float histograms.
for _, fh := range GenerateTestFloatHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
require.NoError(t, err)
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
@ -2886,7 +2877,7 @@ func TestAppendHistogram(t *testing.T) {
for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, ingestTs, nil, fh)
require.NoError(t, err)
expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh})
expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh})
ingestTs++
if ingestTs%50 == 0 {
require.NoError(t, app.Commit())
@ -2909,20 +2900,28 @@ func TestAppendHistogram(t *testing.T) {
require.False(t, ss.Next())
it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms))
actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms))
actHistograms := make([]tsdbutil.Sample, 0, len(expHistograms))
actFloatHistograms := make([]tsdbutil.Sample, 0, len(expFloatHistograms))
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
if typ == chunkenc.ValHistogram {
ts, h := it.AtHistogram()
actHistograms = append(actHistograms, timedHistogram{ts, h})
actHistograms = append(actHistograms, sample{t: ts, h: h})
} else if typ == chunkenc.ValFloatHistogram {
ts, fh := it.AtFloatHistogram()
actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh})
actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh})
}
}
require.Equal(t, expHistograms, actHistograms)
require.Equal(t, expFloatHistograms, actFloatHistograms)
compareSeries(
t,
map[string][]tsdbutil.Sample{"dummy": expHistograms},
map[string][]tsdbutil.Sample{"dummy": actHistograms},
)
compareSeries(
t,
map[string][]tsdbutil.Sample{"dummy": expFloatHistograms},
map[string][]tsdbutil.Sample{"dummy": actFloatHistograms},
)
})
}
}
@ -3019,7 +3018,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), h, nil)
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()})
eh := h.Copy()
if !gauge && ts > 30 && (ts-10)%20 == 1 {
// Need "unknown" hint after float sample.
eh.CounterResetHint = histogram.UnknownCounterReset
}
exp[k2] = append(exp[k2], sample{t: int64(ts), h: eh})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
@ -3051,7 +3055,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
h.NegativeBuckets = h.PositiveBuckets
_, err := app.AppendHistogram(0, s2, int64(ts), nil, h)
require.NoError(t, err)
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()})
eh := h.Copy()
if !gauge && ts > 30 && (ts-10)%20 == 1 {
// Need "unknown" hint after float sample.
eh.CounterResetHint = histogram.UnknownCounterReset
}
exp[k2] = append(exp[k2], sample{t: int64(ts), fh: eh})
if ts%20 == 0 {
require.NoError(t, app.Commit())
app = head.Appender(context.Background())
@ -3089,7 +3098,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) {
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*"))
require.Equal(t, exp, act)
compareSeries(t, exp, act)
}
testQuery()
@ -3506,6 +3515,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
ah.fh.Sum = 0
eh.fh = eh.fh.Copy()
eh.fh.Sum = 0
} else if i > 0 {
prev := expHistograms[i-1]
if prev.fh == nil || value.IsStaleNaN(prev.fh.Sum) {
eh.fh.CounterResetHint = histogram.UnknownCounterReset
}
}
require.Equal(t, eh, ah)
} else {
@ -3516,6 +3530,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) {
ah.h.Sum = 0
eh.h = eh.h.Copy()
eh.h.Sum = 0
} else if i > 0 {
prev := expHistograms[i-1]
if prev.h == nil || value.IsStaleNaN(prev.h.Sum) {
eh.h.CounterResetHint = histogram.UnknownCounterReset
}
}
require.Equal(t, eh, ah)
}
@ -3730,8 +3749,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
// If this is empty, samples above will be taken instead of this.
addToExp []tsdbutil.Sample
}{
// Histograms that end up in the expected samples are copied here so that we
// can independently set the CounterResetHint later.
{
samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}},
samples: []tsdbutil.Sample{sample{t: 100, h: hists[0].Copy()}},
expChunks: 1,
},
{
@ -3739,23 +3760,23 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
expChunks: 2,
},
{
samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}},
samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[0].Copy()}},
expChunks: 3,
},
{
samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}},
samples: []tsdbutil.Sample{sample{t: 220, h: hists[1].Copy()}},
expChunks: 4,
},
{
samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}},
samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3].Copy()}},
expChunks: 5,
},
{
samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}},
samples: []tsdbutil.Sample{sample{t: 100, h: hists[2].Copy()}},
err: storage.ErrOutOfOrderSample,
},
{
samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}},
samples: []tsdbutil.Sample{sample{t: 300, h: hists[3].Copy()}},
expChunks: 6,
},
{
@ -3763,7 +3784,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
err: storage.ErrOutOfOrderSample,
},
{
samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}},
samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4].Copy()}},
err: storage.ErrOutOfOrderSample,
},
{
@ -3789,7 +3810,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
},
addToExp: []tsdbutil.Sample{
sample{t: 800, v: 8},
sample{t: 900, h: hists[9]},
sample{t: 900, h: hists[9].Copy()},
},
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
},
@ -3800,7 +3821,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
sample{t: 1100, h: hists[9]},
},
addToExp: []tsdbutil.Sample{
sample{t: 1100, h: hists[9]},
sample{t: 1100, h: hists[9].Copy()},
},
expChunks: 8,
},
@ -3830,6 +3851,14 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
require.NoError(t, app.Rollback())
}
}
for i, s := range expResult[1:] {
switch {
case s.H() != nil && expResult[i].H() == nil:
s.(sample).h.CounterResetHint = histogram.UnknownCounterReset
case s.FH() != nil && expResult[i].FH() == nil:
s.(sample).fh.CounterResetHint = histogram.UnknownCounterReset
}
}
// Query back and expect same order of samples.
q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)