Browse Source

Query histograms from TSDB and unit test for append+query (#9022)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/9027/head
Ganesh Vernekar 3 years ago committed by GitHub
parent
commit
f4d3af73f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      promql/value.go
  2. 4
      storage/buffer.go
  3. 8
      storage/buffer_test.go
  4. 7
      storage/merge.go
  5. 4
      storage/remote/codec.go
  6. 4
      storage/series.go
  7. 7
      tsdb/chunkenc/chunk.go
  8. 4
      tsdb/chunkenc/histo.go
  9. 4
      tsdb/chunkenc/xor.go
  10. 4
      tsdb/head.go
  11. 65
      tsdb/head_test.go
  12. 26
      tsdb/querier.go
  13. 4
      tsdb/tsdbutil/buffer.go
  14. 5
      tsdb/tsdbutil/buffer_test.go

4
promql/value.go

@ -300,6 +300,10 @@ func (ssi *storageSeriesIterator) AtHistogram() (int64, histogram.SparseHistogra
return 0, histogram.SparseHistogram{}
}
func (ssi *storageSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (ssi *storageSeriesIterator) Next() bool {
ssi.curr++
return ssi.curr < len(ssi.points)

4
storage/buffer.go

@ -202,6 +202,10 @@ func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
}
func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (r *sampleRing) at(i int) (int64, float64) {
j := (r.f + i) % len(r.buf)
s := r.buf[j]

8
storage/buffer_test.go

@ -18,6 +18,7 @@ import (
"testing"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/require"
)
@ -198,6 +199,9 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
}
func (m *mockSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
@ -219,6 +223,10 @@ func (it *fakeSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return it.idx * it.step, histogram.SparseHistogram{} // value doesn't matter
}
func (it *fakeSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (it *fakeSeriesIterator) Next() bool {
it.idx++
return it.idx < it.nsamples

7
storage/merge.go

@ -489,6 +489,13 @@ func (c *chainSampleIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return c.curr.AtHistogram()
}
func (c *chainSampleIterator) ChunkEncoding() chunkenc.Encoding {
if c.curr == nil {
panic("chainSampleIterator.ChunkEncoding() called before first .Next() or after .Next() returned false.")
}
return c.curr.ChunkEncoding()
}
func (c *chainSampleIterator) Next() bool {
if c.h == nil {
c.h = samplesIteratorHeap{}

4
storage/remote/codec.go

@ -373,6 +373,10 @@ func (c *concreteSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram
return 0, histogram.SparseHistogram{}
}
func (c *concreteSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
// Next implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Next() bool {
c.cur++

4
storage/series.go

@ -95,6 +95,10 @@ func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
}
func (it *listSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < it.samples.Len()

7
tsdb/chunkenc/chunk.go

@ -103,6 +103,8 @@ type Iterator interface {
// Err returns the current error. It should be used only after iterator is
// exhausted, that is `Next` or `Seek` returns false.
Err() error
// ChunkEncoding returns the encoding of the chunk that it is iterating.
ChunkEncoding() Encoding
}
// NewNopIterator returns a new chunk iterator that does not hold any data.
@ -117,8 +119,9 @@ func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 }
func (nopIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return math.MinInt64, histogram.SparseHistogram{}
}
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
func (nopIterator) Next() bool { return false }
func (nopIterator) Err() error { return nil }
func (nopIterator) ChunkEncoding() Encoding { return EncNone }
// Pool is used to create and reuse chunk references to avoid allocations.
type Pool interface {

4
tsdb/chunkenc/histo.go

@ -403,6 +403,10 @@ func (it *histoIterator) At() (int64, float64) {
panic("cannot call histoIterator.At().")
}
func (it *histoIterator) ChunkEncoding() Encoding {
return EncSHS
}
func (it *histoIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return it.t, histogram.SparseHistogram{
Count: it.cnt,

4
tsdb/chunkenc/xor.go

@ -281,6 +281,10 @@ func (it *xorIterator) AtHistogram() (int64, histogram.SparseHistogram) {
panic("cannot call xorIterator.AtHistogram().")
}
func (it *xorIterator) ChunkEncoding() Encoding {
return EncXOR
}
func (it *xorIterator) Err() error {
return it.err
}

4
tsdb/head.go

@ -2678,7 +2678,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
// iterator returns a chunk iterator.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) (ttt chunkenc.Iterator) {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
@ -2837,7 +2837,7 @@ func (it *memSafeIterator) Next() bool {
return false
}
it.i++
if it.total-it.i > 4 {
if it.Iterator.ChunkEncoding() == chunkenc.EncSHS || it.total-it.i > 4 {
return it.Iterator.Next()
}
return true

65
tsdb/head_test.go

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -2177,3 +2178,67 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
ok = it.Seek(7)
require.False(t, ok)
}
func TestAppendHistogram(t *testing.T) {
l := labels.Labels{{Name: "a", Value: "b"}}
for _, numHistograms := range []int{1, 10, 150, 200, 250, 300} {
t.Run(fmt.Sprintf("%d", numHistograms), func(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))
app := head.Appender(context.Background())
type timedHist struct {
t int64
h histogram.SparseHistogram
}
expHists := make([]timedHist, 0, numHistograms)
for i, h := range generateHistograms(numHistograms) {
_, err := app.AppendHistogram(0, l, int64(i), h)
require.NoError(t, err)
expHists = append(expHists, timedHist{int64(i), h})
}
require.NoError(t, app.Commit())
q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
require.NoError(t, err)
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
actHists := make([]timedHist, 0, len(expHists))
for it.Next() {
t, h := it.AtHistogram()
actHists = append(actHists, timedHist{t, h.Copy()})
}
require.Equal(t, expHists, actHists)
})
}
}
func generateHistograms(n int) (r []histogram.SparseHistogram) {
for i := 0; i < n; i++ {
r = append(r, histogram.SparseHistogram{
Count: 5 + uint64(i*4),
ZeroCount: 2 + uint64(i),
Sum: 18.4 * float64(i+1),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{int64(i + 1), 1, -1, 0},
NegativeBuckets: []int64{},
})
}
return r
}

26
tsdb/querier.go

@ -633,7 +633,10 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) bool {
func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() }
func (p *populateWithDelSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
return p.curr.AtHistogram()
}
func (p *populateWithDelSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return p.curr.ChunkEncoding()
}
func (p *populateWithDelSeriesIterator) Err() error {
@ -823,7 +826,12 @@ func (it *DeletedIterator) At() (int64, float64) {
}
func (it *DeletedIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
t, h := it.Iter.AtHistogram()
return t, h
}
func (it *DeletedIterator) ChunkEncoding() chunkenc.Encoding {
return it.Iter.ChunkEncoding()
}
func (it *DeletedIterator) Seek(t int64) bool {
@ -835,7 +843,12 @@ func (it *DeletedIterator) Seek(t int64) bool {
}
// Now double check if the entry falls into a deleted interval.
ts, _ := it.At()
var ts int64
if it.ChunkEncoding() == chunkenc.EncSHS {
ts, _ = it.AtHistogram()
} else {
ts, _ = it.At()
}
for _, itv := range it.Intervals {
if ts < itv.Mint {
return true
@ -857,7 +870,12 @@ func (it *DeletedIterator) Seek(t int64) bool {
func (it *DeletedIterator) Next() bool {
Outer:
for it.Iter.Next() {
ts, _ := it.Iter.At()
var ts int64
if it.ChunkEncoding() == chunkenc.EncSHS {
ts, _ = it.AtHistogram()
} else {
ts, _ = it.At()
}
for _, tr := range it.Intervals {
if tr.InBounds(ts) {

4
tsdb/tsdbutil/buffer.go

@ -164,6 +164,10 @@ func (it *sampleRingIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
}
func (it *sampleRingIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (r *sampleRing) at(i int) (int64, float64) {
j := (r.f + i) % len(r.buf)
s := r.buf[j]

5
tsdb/tsdbutil/buffer_test.go

@ -19,6 +19,7 @@ import (
"testing"
"github.com/prometheus/prometheus/pkg/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/require"
)
@ -155,6 +156,10 @@ func (it *listSeriesIterator) AtHistogram() (int64, histogram.SparseHistogram) {
return 0, histogram.SparseHistogram{}
}
func (it *listSeriesIterator) ChunkEncoding() chunkenc.Encoding {
return chunkenc.EncXOR
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)

Loading…
Cancel
Save