Fix setting reset header to gauge histogram in seriesToChunkEncoder (#12329)

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
pull/12340/head
George Krajcsovits 2 years ago committed by GitHub
parent 7c2de14b0b
commit f5fcaa3872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -297,9 +297,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
seriesIter := s.Series.Iterator(nil) seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
chunkCreated := false
if typ != lastType || i >= seriesToChunkEncoderSplit { if typ != lastType || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one. // Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk) chks = appendChunk(chks, mint, maxt, chk)
chunkCreated = true
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding())
if err != nil { if err != nil {
return errChunksIterator{err: err} return errChunksIterator{err: err}
@ -330,6 +332,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
if ok, counterReset := app.AppendHistogram(t, h); !ok { if ok, counterReset := app.AppendHistogram(t, h); !ok {
chks = appendChunk(chks, mint, maxt, chk) chks = appendChunk(chks, mint, maxt, chk)
histChunk := chunkenc.NewHistogramChunk() histChunk := chunkenc.NewHistogramChunk()
chunkCreated = true
if counterReset { if counterReset {
histChunk.SetCounterResetHeader(chunkenc.CounterReset) histChunk.SetCounterResetHeader(chunkenc.CounterReset)
} }
@ -346,11 +349,15 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
panic("unexpected error while appending histogram") panic("unexpected error while appending histogram")
} }
} }
if chunkCreated && h.CounterResetHint == histogram.GaugeType {
chk.(*chunkenc.HistogramChunk).SetCounterResetHeader(chunkenc.GaugeType)
}
case chunkenc.ValFloatHistogram: case chunkenc.ValFloatHistogram:
t, fh = seriesIter.AtFloatHistogram() t, fh = seriesIter.AtFloatHistogram()
if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok { if ok, counterReset := app.AppendFloatHistogram(t, fh); !ok {
chks = appendChunk(chks, mint, maxt, chk) chks = appendChunk(chks, mint, maxt, chk)
floatHistChunk := chunkenc.NewFloatHistogramChunk() floatHistChunk := chunkenc.NewFloatHistogramChunk()
chunkCreated = true
if counterReset { if counterReset {
floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset) floatHistChunk.SetCounterResetHeader(chunkenc.CounterReset)
} }
@ -366,6 +373,9 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
panic("unexpected error while float appending histogram") panic("unexpected error while float appending histogram")
} }
} }
if chunkCreated && fh.CounterResetHint == histogram.GaugeType {
chk.(*chunkenc.FloatHistogramChunk).SetCounterResetHeader(chunkenc.GaugeType)
}
default: default:
return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())} return errChunksIterator{err: fmt.Errorf("unknown sample type %s", typ.String())}
} }

@ -127,13 +127,12 @@ func TestChunkSeriesSetToSeriesSet(t *testing.T) {
type histogramTest struct { type histogramTest struct {
samples []tsdbutil.Sample samples []tsdbutil.Sample
expectedChunks int expectedCounterResetHeaders []chunkenc.CounterResetHeader
expectedCounterReset bool
} }
func TestHistogramSeriesToChunks(t *testing.T) { func TestHistogramSeriesToChunks(t *testing.T) {
h1 := &histogram.Histogram{ h1 := &histogram.Histogram{
Count: 3, Count: 7,
ZeroCount: 2, ZeroCount: 2,
ZeroThreshold: 0.001, ZeroThreshold: 0.001,
Sum: 100, Sum: 100,
@ -158,7 +157,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
} }
// Implicit counter reset by reduction in buckets, not appendable. // Implicit counter reset by reduction in buckets, not appendable.
h2down := &histogram.Histogram{ h2down := &histogram.Histogram{
Count: 8, Count: 10,
ZeroCount: 2, ZeroCount: 2,
ZeroThreshold: 0.001, ZeroThreshold: 0.001,
Sum: 100, Sum: 100,
@ -171,7 +170,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
} }
fh1 := &histogram.FloatHistogram{ fh1 := &histogram.FloatHistogram{
Count: 4, Count: 6,
ZeroCount: 2, ZeroCount: 2,
ZeroThreshold: 0.001, ZeroThreshold: 0.001,
Sum: 100, Sum: 100,
@ -183,7 +182,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
} }
// Appendable to fh1. // Appendable to fh1.
fh2 := &histogram.FloatHistogram{ fh2 := &histogram.FloatHistogram{
Count: 15, Count: 17,
ZeroCount: 2, ZeroCount: 2,
ZeroThreshold: 0.001, ZeroThreshold: 0.001,
Sum: 100, Sum: 100,
@ -196,7 +195,7 @@ func TestHistogramSeriesToChunks(t *testing.T) {
} }
// Implicit counter reset by reduction in buckets, not appendable. // Implicit counter reset by reduction in buckets, not appendable.
fh2down := &histogram.FloatHistogram{ fh2down := &histogram.FloatHistogram{
Count: 13, Count: 15,
ZeroCount: 2, ZeroCount: 2,
ZeroThreshold: 0.001, ZeroThreshold: 0.001,
Sum: 100, Sum: 100,
@ -208,6 +207,60 @@ func TestHistogramSeriesToChunks(t *testing.T) {
PositiveBuckets: []float64{2, 2, 7, 2}, PositiveBuckets: []float64{2, 2, 7, 2},
} }
// Gauge histogram.
gh1 := &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 7,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []int64{2, 1}, // Abs: 2, 3
}
gh2 := &histogram.Histogram{
CounterResetHint: histogram.GaugeType,
Count: 12,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{2, 1, -2, 3}, // Abs: 2, 3, 1, 4
}
// Float gauge histogram.
gfh1 := &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Count: 6,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
},
PositiveBuckets: []float64{3, 1},
}
gfh2 := &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Count: 17,
ZeroCount: 2,
ZeroThreshold: 0.001,
Sum: 100,
Schema: 0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{4, 2, 7, 2},
}
staleHistogram := &histogram.Histogram{ staleHistogram := &histogram.Histogram{
Sum: math.Float64frombits(value.StaleNaN), Sum: math.Float64frombits(value.StaleNaN),
} }
@ -220,74 +273,70 @@ func TestHistogramSeriesToChunks(t *testing.T) {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: h1}, hSample{t: 1, h: h1},
}, },
expectedChunks: 1, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
}, },
"two histograms encoded to a single chunk": { "two histograms encoded to a single chunk": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: h1}, hSample{t: 1, h: h1},
hSample{t: 2, h: h2}, hSample{t: 2, h: h2},
}, },
expectedChunks: 1, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
}, },
"two histograms encoded to two chunks": { "two histograms encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: h2}, hSample{t: 1, h: h2},
hSample{t: 2, h: h1}, hSample{t: 2, h: h1},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedCounterReset: true,
}, },
"histogram and stale sample encoded to two chunks": { "histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: staleHistogram}, hSample{t: 1, h: staleHistogram},
hSample{t: 2, h: h1}, hSample{t: 2, h: h1},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
}, },
"histogram and reduction in bucket encoded to two chunks": { "histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: h1}, hSample{t: 1, h: h1},
hSample{t: 2, h: h2down}, hSample{t: 2, h: h2down},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedCounterReset: true,
}, },
// Float histograms. // Float histograms.
"single float histogram to single chunk": { "single float histogram to single chunk": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: fh1}, fhSample{t: 1, fh: fh1},
}, },
expectedChunks: 1, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
}, },
"two float histograms encoded to a single chunk": { "two float histograms encoded to a single chunk": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: fh1}, fhSample{t: 1, fh: fh1},
fhSample{t: 2, fh: fh2}, fhSample{t: 2, fh: fh2},
}, },
expectedChunks: 1, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset},
}, },
"two float histograms encoded to two chunks": { "two float histograms encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: fh2}, fhSample{t: 1, fh: fh2},
fhSample{t: 2, fh: fh1}, fhSample{t: 2, fh: fh1},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedCounterReset: true,
}, },
"float histogram and stale sample encoded to two chunks": { "float histogram and stale sample encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: staleFloatHistogram}, fhSample{t: 1, fh: staleFloatHistogram},
fhSample{t: 2, fh: fh1}, fhSample{t: 2, fh: fh1},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
}, },
"float histogram and reduction in bucket encoded to two chunks": { "float histogram and reduction in bucket encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: fh1}, fhSample{t: 1, fh: fh1},
fhSample{t: 2, fh: fh2down}, fhSample{t: 2, fh: fh2down},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset},
expectedCounterReset: true,
}, },
// Mixed. // Mixed.
"histogram and float histogram encoded to two chunks": { "histogram and float histogram encoded to two chunks": {
@ -295,21 +344,61 @@ func TestHistogramSeriesToChunks(t *testing.T) {
hSample{t: 1, h: h1}, hSample{t: 1, h: h1},
fhSample{t: 2, fh: fh2}, fhSample{t: 2, fh: fh2},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
}, },
"float histogram and histogram encoded to two chunks": { "float histogram and histogram encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
fhSample{t: 1, fh: fh1}, fhSample{t: 1, fh: fh1},
hSample{t: 2, h: h2}, hSample{t: 2, h: h2},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
}, },
"histogram and stale float histogram encoded to two chunks": { "histogram and stale float histogram encoded to two chunks": {
samples: []tsdbutil.Sample{ samples: []tsdbutil.Sample{
hSample{t: 1, h: h1}, hSample{t: 1, h: h1},
fhSample{t: 2, fh: staleFloatHistogram}, fhSample{t: 2, fh: staleFloatHistogram},
}, },
expectedChunks: 2, expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset},
},
"single gauge histogram encoded to one chunk": {
samples: []tsdbutil.Sample{
hSample{t: 1, h: gh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two gauge histograms encoded to one chunk when counter increases": {
samples: []tsdbutil.Sample{
hSample{t: 1, h: gh1},
hSample{t: 2, h: gh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two gauge histograms encoded to one chunk when counter decreases": {
samples: []tsdbutil.Sample{
hSample{t: 1, h: gh2},
hSample{t: 2, h: gh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"single gauge float histogram encoded to one chunk": {
samples: []tsdbutil.Sample{
fhSample{t: 1, fh: gfh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two float gauge histograms encoded to one chunk when counter increases": {
samples: []tsdbutil.Sample{
fhSample{t: 1, fh: gfh1},
fhSample{t: 2, fh: gfh2},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
},
"two float gauge histograms encoded to one chunk when counter decreases": {
samples: []tsdbutil.Sample{
fhSample{t: 1, fh: gfh2},
fhSample{t: 2, fh: gfh1},
},
expectedCounterResetHeaders: []chunkenc.CounterResetHeader{chunkenc.GaugeType},
}, },
} }
@ -322,13 +411,24 @@ func TestHistogramSeriesToChunks(t *testing.T) {
func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) { func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
lbs := labels.FromStrings("__name__", "up", "instance", "localhost:8080") lbs := labels.FromStrings("__name__", "up", "instance", "localhost:8080")
series := NewListSeries(lbs, test.samples) copiedSamples := []tsdbutil.Sample{}
for _, s := range test.samples {
switch cs := s.(type) {
case hSample:
copiedSamples = append(copiedSamples, hSample{t: cs.t, h: cs.h.Copy()})
case fhSample:
copiedSamples = append(copiedSamples, fhSample{t: cs.t, fh: cs.fh.Copy()})
default:
t.Error("internal error, unexpected type")
}
}
series := NewListSeries(lbs, copiedSamples)
encoder := NewSeriesToChunkEncoder(series) encoder := NewSeriesToChunkEncoder(series)
require.EqualValues(t, lbs, encoder.Labels()) require.EqualValues(t, lbs, encoder.Labels())
chks, err := ExpandChunks(encoder.Iterator(nil)) chks, err := ExpandChunks(encoder.Iterator(nil))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, test.expectedChunks, len(chks)) require.Equal(t, len(test.expectedCounterResetHeaders), len(chks))
// Decode all encoded samples and assert they are equal to the original ones. // Decode all encoded samples and assert they are equal to the original ones.
encodedSamples := expandHistogramSamples(chks) encodedSamples := expandHistogramSamples(chks)
@ -339,8 +439,10 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
case hSample: case hSample:
encodedSample, ok := encodedSamples[i].(hSample) encodedSample, ok := encodedSamples[i].(hSample)
require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i)) require.True(t, ok, "expect histogram", fmt.Sprintf("at idx %d", i))
// Ignore counter reset here, will check on chunk level. // Ignore counter reset if not gauge here, will check on chunk level.
if expectedSample.h.CounterResetHint != histogram.GaugeType {
encodedSample.h.CounterResetHint = histogram.UnknownCounterReset encodedSample.h.CounterResetHint = histogram.UnknownCounterReset
}
if value.IsStaleNaN(expectedSample.h.Sum) { if value.IsStaleNaN(expectedSample.h.Sum) {
require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i)) require.True(t, value.IsStaleNaN(encodedSample.h.Sum), fmt.Sprintf("at idx %d", i))
continue continue
@ -349,8 +451,10 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
case fhSample: case fhSample:
encodedSample, ok := encodedSamples[i].(fhSample) encodedSample, ok := encodedSamples[i].(fhSample)
require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i)) require.True(t, ok, "expect float histogram", fmt.Sprintf("at idx %d", i))
// Ignore counter reset here, will check on chunk level. // Ignore counter reset if not gauge here, will check on chunk level.
if expectedSample.fh.CounterResetHint != histogram.GaugeType {
encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset encodedSample.fh.CounterResetHint = histogram.UnknownCounterReset
}
if value.IsStaleNaN(expectedSample.fh.Sum) { if value.IsStaleNaN(expectedSample.fh.Sum) {
require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i)) require.True(t, value.IsStaleNaN(encodedSample.fh.Sum), fmt.Sprintf("at idx %d", i))
continue continue
@ -361,15 +465,8 @@ func testHistogramsSeriesToChunks(t *testing.T, test histogramTest) {
} }
} }
// If a counter reset hint is expected, it can only be found in the second chunk. for i, expectedCounterResetHint := range test.expectedCounterResetHeaders {
// Otherwise, we assert an unknown counter reset hint in all chunks. require.Equal(t, expectedCounterResetHint, getCounterResetHint(chks[i]), fmt.Sprintf("chunk at index %d", i))
if test.expectedCounterReset {
require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chks[0]))
require.Equal(t, chunkenc.CounterReset, getCounterResetHint(chks[1]))
} else {
for _, chk := range chks {
require.Equal(t, chunkenc.UnknownCounterReset, getCounterResetHint(chk))
}
} }
} }

Loading…
Cancel
Save