Browse Source

Merge pull request #15025 from prometheus/notreset-bug

Fix bug in rate vs float and histogram mixup
pull/15027/merge
Björn Rabenstein 2 months ago committed by GitHub
parent
commit
78f792135c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 115
      promql/engine_test.go
  2. 71
      storage/buffer.go
  3. 50
      storage/buffer_test.go
  4. 28
      storage/series.go
  5. 12
      tsdb/chunks/samples.go
  6. 11
      tsdb/head.go

115
promql/engine_test.go

@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
@ -29,11 +30,13 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/teststorage"
@ -3781,3 +3784,115 @@ func TestRateAnnotations(t *testing.T) {
})
}
}
func TestHistogramRateWithFloatStaleness(t *testing.T) {
// Make a chunk with two normal histograms of the same value.
h1 := histogram.Histogram{
Schema: 2,
Count: 10,
Sum: 100,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{100},
}
c1 := chunkenc.NewHistogramChunk()
app, err := c1.Appender()
require.NoError(t, err)
var (
newc chunkenc.Chunk
recoded bool
)
newc, recoded, app, err = app.AppendHistogram(nil, 0, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
newc, recoded, _, err = app.AppendHistogram(nil, 10, h1.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
// Make a chunk with a single float stale marker.
c2 := chunkenc.NewXORChunk()
app, err = c2.Appender()
require.NoError(t, err)
app.Append(20, math.Float64frombits(value.StaleNaN))
// Make a chunk with two normal histograms that have zero value.
h2 := histogram.Histogram{
Schema: 2,
}
c3 := chunkenc.NewHistogramChunk()
app, err = c3.Appender()
require.NoError(t, err)
newc, recoded, app, err = app.AppendHistogram(nil, 30, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
newc, recoded, _, err = app.AppendHistogram(nil, 40, h2.Copy(), false)
require.NoError(t, err)
require.False(t, recoded)
require.Nil(t, newc)
querier := storage.MockQuerier{
SelectMockFunction: func(_ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet {
return &singleSeriesSet{
series: mockSeries{chunks: []chunkenc.Chunk{c1, c2, c3}, labelSet: []string{"__name__", "foo"}},
}
},
}
queriable := storage.MockQueryable{MockQuerier: &querier}
engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
q, err := engine.NewInstantQuery(context.Background(), &queriable, nil, "rate(foo[40s])", timestamp.Time(45))
require.NoError(t, err)
defer q.Close()
res := q.Exec(context.Background())
require.NoError(t, res.Err)
vec, err := res.Vector()
require.NoError(t, err)
// Single sample result.
require.Len(t, vec, 1)
// The result is a histogram.
require.NotNil(t, vec[0].H)
// The result should be zero as the histogram has not increased, so the rate is zero.
require.Equal(t, 0.0, vec[0].H.Count)
require.Equal(t, 0.0, vec[0].H.Sum)
}
type singleSeriesSet struct {
series storage.Series
consumed bool
}
func (s *singleSeriesSet) Next() bool { c := s.consumed; s.consumed = true; return !c }
func (s singleSeriesSet) At() storage.Series { return s.series }
func (s singleSeriesSet) Err() error { return nil }
func (s singleSeriesSet) Warnings() annotations.Annotations { return nil }
type mockSeries struct {
chunks []chunkenc.Chunk
labelSet []string
}
func (s mockSeries) Labels() labels.Labels {
return labels.FromStrings(s.labelSet...)
}
func (s mockSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
iterables := []chunkenc.Iterator{}
for _, c := range s.chunks {
iterables = append(iterables, c.Iterator(nil))
}
return storage.ChainSampleIteratorFromIterators(it, iterables)
}

71
storage/buffer.go

@ -187,6 +187,10 @@ func (s fSample) Type() chunkenc.ValueType {
return chunkenc.ValFloat
}
func (s fSample) Copy() chunks.Sample {
return s
}
type hSample struct {
t int64
h *histogram.Histogram
@ -212,6 +216,10 @@ func (s hSample) Type() chunkenc.ValueType {
return chunkenc.ValHistogram
}
func (s hSample) Copy() chunks.Sample {
return hSample{t: s.t, h: s.h.Copy()}
}
type fhSample struct {
t int64
fh *histogram.FloatHistogram
@ -237,6 +245,10 @@ func (s fhSample) Type() chunkenc.ValueType {
return chunkenc.ValFloatHistogram
}
func (s fhSample) Copy() chunks.Sample {
return fhSample{t: s.t, fh: s.fh.Copy()}
}
type sampleRing struct {
delta int64
@ -535,55 +547,8 @@ func (r *sampleRing) addFH(s fhSample) {
}
}
// genericAdd is a generic implementation of adding a chunks.Sample
// implementation to a buffer of a sample ring. However, the Go compiler
// currently (go1.20) decides to not expand the code during compile time, but
// creates dynamic code to handle the different types. That has a significant
// overhead during runtime, noticeable in PromQL benchmarks. For example, the
// "RangeQuery/expr=rate(a_hundred[1d]),steps=.*" benchmarks show about 7%
// longer runtime, 9% higher allocation size, and 10% more allocations.
// Therefore, genericAdd has been manually implemented for all the types
// (addSample, addF, addH, addFH) below.
//
// func genericAdd[T chunks.Sample](s T, buf []T, r *sampleRing) []T {
// l := len(buf)
// // Grow the ring buffer if it fits no more elements.
// if l == 0 {
// buf = make([]T, 16)
// l = 16
// }
// if l == r.l {
// newBuf := make([]T, 2*l)
// copy(newBuf[l+r.f:], buf[r.f:])
// copy(newBuf, buf[:r.f])
//
// buf = newBuf
// r.i = r.f
// r.f += l
// l = 2 * l
// } else {
// r.i++
// if r.i >= l {
// r.i -= l
// }
// }
//
// buf[r.i] = s
// r.l++
//
// // Free head of the buffer of samples that just fell out of the range.
// tmin := s.T() - r.delta
// for buf[r.f].T() < tmin {
// r.f++
// if r.f >= l {
// r.f -= l
// }
// r.l--
// }
// return buf
// }
// addSample is a handcoded specialization of genericAdd (see above).
// addSample adds a sample to a buffer of chunks.Sample, i.e. the general case
// using an interface as the type.
func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
@ -607,7 +572,7 @@ func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sam
}
}
buf[r.i] = s
buf[r.i] = s.Copy()
r.l++
// Free head of the buffer of samples that just fell out of the range.
@ -622,7 +587,7 @@ func addSample(s chunks.Sample, buf []chunks.Sample, r *sampleRing) []chunks.Sam
return buf
}
// addF is a handcoded specialization of genericAdd (see above).
// addF adds an fSample to a (specialized) fSample buffer.
func addF(s fSample, buf []fSample, r *sampleRing) []fSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
@ -661,7 +626,7 @@ func addF(s fSample, buf []fSample, r *sampleRing) []fSample {
return buf
}
// addH is a handcoded specialization of genericAdd (see above).
// addF adds an hSample to a (specialized) hSample buffer.
func addH(s hSample, buf []hSample, r *sampleRing) []hSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.
@ -705,7 +670,7 @@ func addH(s hSample, buf []hSample, r *sampleRing) []hSample {
return buf
}
// addFH is a handcoded specialization of genericAdd (see above).
// addFH adds an fhSample to a (specialized) fhSample buffer.
func addFH(s fhSample, buf []fhSample, r *sampleRing) []fhSample {
l := len(buf)
// Grow the ring buffer if it fits no more elements.

50
storage/buffer_test.go

@ -314,6 +314,56 @@ func TestBufferedSeriesIteratorMixedHistograms(t *testing.T) {
require.Equal(t, histograms[1].ToFloat(nil), fh)
}
func TestBufferedSeriesIteratorMixedFloatsAndHistograms(t *testing.T) {
histograms := tsdbutil.GenerateTestHistograms(5)
it := NewBufferIterator(NewListSeriesIteratorWithCopy(samples{
hSample{t: 1, h: histograms[0].Copy()},
fSample{t: 2, f: 2},
hSample{t: 3, h: histograms[1].Copy()},
hSample{t: 4, h: histograms[2].Copy()},
fhSample{t: 3, fh: histograms[3].ToFloat(nil)},
fhSample{t: 4, fh: histograms[4].ToFloat(nil)},
}), 6)
require.Equal(t, chunkenc.ValNone, it.Seek(7))
require.NoError(t, it.Err())
buf := it.Buffer()
require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h0 := buf.AtHistogram()
require.Equal(t, histograms[0], h0)
require.Equal(t, chunkenc.ValFloat, buf.Next())
_, v := buf.At()
require.Equal(t, 2.0, v)
require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h1 := buf.AtHistogram()
require.Equal(t, histograms[1], h1)
require.Equal(t, chunkenc.ValHistogram, buf.Next())
_, h2 := buf.AtHistogram()
require.Equal(t, histograms[2], h2)
require.Equal(t, chunkenc.ValFloatHistogram, buf.Next())
_, h3 := buf.AtFloatHistogram(nil)
require.Equal(t, histograms[3].ToFloat(nil), h3)
require.Equal(t, chunkenc.ValFloatHistogram, buf.Next())
_, h4 := buf.AtFloatHistogram(nil)
require.Equal(t, histograms[4].ToFloat(nil), h4)
// Test for overwrite bug where the buffered histogram was reused
// between items in the buffer.
require.Equal(t, histograms[0], h0)
require.Equal(t, histograms[1], h1)
require.Equal(t, histograms[2], h2)
require.Equal(t, histograms[3].ToFloat(nil), h3)
require.Equal(t, histograms[4].ToFloat(nil), h4)
}
func BenchmarkBufferedSeriesIterator(b *testing.B) {
// Simulate a 5 minute rate.
it := NewBufferIterator(newFakeSeriesIterator(int64(b.N), 30), 5*60)

28
storage/series.go

@ -171,6 +171,34 @@ func (it *listSeriesIterator) Seek(t int64) chunkenc.ValueType {
func (it *listSeriesIterator) Err() error { return nil }
type listSeriesIteratorWithCopy struct {
*listSeriesIterator
}
func NewListSeriesIteratorWithCopy(samples Samples) chunkenc.Iterator {
return &listSeriesIteratorWithCopy{
listSeriesIterator: &listSeriesIterator{samples: samples, idx: -1},
}
}
func (it *listSeriesIteratorWithCopy) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
t, ih := it.listSeriesIterator.AtHistogram(nil)
if h == nil || ih == nil {
return t, ih
}
ih.CopyTo(h)
return t, h
}
func (it *listSeriesIteratorWithCopy) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
t, ih := it.listSeriesIterator.AtFloatHistogram(nil)
if fh == nil || ih == nil {
return t, ih
}
ih.CopyTo(fh)
return t, fh
}
type listChunkSeriesIterator struct {
chks []chunks.Meta
idx int

12
tsdb/chunks/samples.go

@ -29,6 +29,7 @@ type Sample interface {
H() *histogram.Histogram
FH() *histogram.FloatHistogram
Type() chunkenc.ValueType
Copy() Sample // Returns a deep copy.
}
type SampleSlice []Sample
@ -70,6 +71,17 @@ func (s sample) Type() chunkenc.ValueType {
}
}
func (s sample) Copy() Sample {
c := sample{t: s.t, f: s.f}
if s.h != nil {
c.h = s.h.Copy()
}
if s.fh != nil {
c.fh = s.fh.Copy()
}
return c
}
// GenerateSamples starting at start and counting up numSamples.
func GenerateSamples(start, numSamples int) []Sample {
return generateSamples(start, numSamples, func(i int) Sample {

11
tsdb/head.go

@ -2081,6 +2081,17 @@ func (s sample) Type() chunkenc.ValueType {
}
}
func (s sample) Copy() chunks.Sample {
c := sample{t: s.t, f: s.f}
if s.h != nil {
c.h = s.h.Copy()
}
if s.fh != nil {
c.fh = s.fh.Copy()
}
return c
}
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct {

Loading…
Cancel
Save