Browse Source

storage: allow re-use of iterators

Patterned after `Chunk.Iterator()`: pass the old iterator in so it
can be re-used to avoid allocating a new object.

(This commit does not do any re-use; it is just changing all the method
signatures so re-use is possible in later commits.)

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
pull/11334/head
Bryan Boreham 2 years ago
parent
commit
3c7de69059
  1. 2
      cmd/promtool/backfill_test.go
  2. 2
      cmd/promtool/rules_test.go
  3. 2
      cmd/promtool/tsdb.go
  4. 16
      promql/engine.go
  5. 2
      promql/test_test.go
  6. 2
      promql/value.go
  7. 2
      rules/manager.go
  8. 3
      rules/manager_test.go
  9. 4
      scrape/scrape_test.go
  10. 6
      storage/fanout_test.go
  11. 13
      storage/interface.go
  12. 14
      storage/merge.go
  13. 19
      storage/merge_test.go
  14. 9
      storage/remote/codec.go
  15. 2
      storage/remote/codec_test.go
  16. 26
      storage/series.go
  17. 8
      tsdb/block_test.go
  18. 7
      tsdb/compact.go
  19. 50
      tsdb/db_test.go
  20. 2
      tsdb/example_test.go
  21. 24
      tsdb/head_test.go
  22. 4
      tsdb/querier.go
  23. 15
      tsdb/querier_test.go
  24. 3
      tsdb/tsdbblockutil.go
  25. 4
      web/federate.go

2
cmd/promtool/backfill_test.go

@ -49,7 +49,7 @@ func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMa
samples := []backfillSample{}
for ss.Next() {
series := ss.At()
it := series.Iterator()
it := series.Iterator(nil)
require.NoError(t, it.Err())
for it.Next() == chunkenc.ValFloat {
ts, v := it.At()

2
cmd/promtool/rules_test.go

@ -139,7 +139,7 @@ func TestBackfillRuleIntegration(t *testing.T) {
} else {
require.Equal(t, 3, len(series.Labels()))
}
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
samplesCount++
ts, v := it.At()

2
cmd/promtool/tsdb.go

@ -644,7 +644,7 @@ func dumpSamples(path string, mint, maxt int64) (err error) {
for ss.Next() {
series := ss.At()
lbs := series.Labels()
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
ts, val := it.At()
fmt.Printf("%s %g %d\n", lbs, val, ts)

16
promql/engine.go

@ -1393,10 +1393,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
enh := &EvalNodeHelper{Out: make(Vector, 0, 1)}
// Process all the calls for one time series at a time.
it := storage.NewBuffer(selRange)
var chkIter chunkenc.Iterator
for i, s := range selVS.Series {
ev.currentSamples -= len(points)
points = points[:0]
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
metric := selVS.Series[i].Labels()
// The last_over_time function acts like offset; thus, it
// should keep the metric name. For all the other range
@ -1578,8 +1580,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
mat := make(Matrix, 0, len(e.Series))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range e.Series {
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: e.Series[i].Labels(),
Points: getPointSlice(numSteps),
@ -1723,8 +1727,10 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
}
vec := make(Vector, 0, len(node.Series))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range node.Series {
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
t, v, h, ok := ev.vectorSelectorSingle(it, node, ts)
if ok {
@ -1812,12 +1818,14 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
var chkIter chunkenc.Iterator
series := vs.Series
for i, s := range series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
ss := Series{
Metric: series[i].Labels(),
}

2
promql/test_test.go

@ -143,7 +143,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
got := Series{
Metric: storageSeries.Labels(),
}
it := storageSeries.Iterator()
it := storageSeries.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
got.Points = append(got.Points, Point{T: t, V: v})

2
promql/value.go

@ -363,7 +363,7 @@ func (ss *StorageSeries) Labels() labels.Labels {
}
// Iterator returns a new iterator of the data of the series.
func (ss *StorageSeries) Iterator() chunkenc.Iterator {
func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
return newStorageSeriesIterator(ss.series)
}

2
rules/manager.go

@ -807,7 +807,7 @@ func (g *Group) RestoreForState(ts time.Time) {
// Series found for the 'for' state.
var t int64
var v float64
it := s.Iterator()
it := s.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v = it.At()
}

3
rules/manager_test.go

@ -592,12 +592,13 @@ func TestStaleness(t *testing.T) {
// Convert a SeriesSet into a form usable with require.Equal.
func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
result := map[string][]promql.Point{}
var it chunkenc.Iterator
for ss.Next() {
series := ss.At()
points := []promql.Point{}
it := series.Iterator()
it := series.Iterator(it)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
points = append(points, promql.Point{T: t, V: v})

4
scrape/scrape_test.go

@ -2959,7 +2959,7 @@ func TestScrapeReportSingleAppender(t *testing.T) {
c := 0
for series.Next() {
i := series.At().Iterator()
i := series.At().Iterator(nil)
for i.Next() != chunkenc.ValNone {
c++
}
@ -3032,7 +3032,7 @@ func TestScrapeReportLimit(t *testing.T) {
var found bool
for series.Next() {
i := series.At().Iterator()
i := series.At().Iterator(nil)
for i.Next() == chunkenc.ValFloat {
_, v := i.At()
require.Equal(t, 1.0, v)

6
storage/fanout_test.go

@ -86,11 +86,12 @@ func TestFanout_SelectSorted(t *testing.T) {
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value
@ -112,11 +113,12 @@ func TestFanout_SelectSorted(t *testing.T) {
result := make(map[int64]float64)
var labelsResult labels.Labels
var iterator chunkenc.Iterator
for seriesSet.Next() {
series := seriesSet.At()
seriesLabels := series.Labels()
labelsResult = seriesLabels
iterator := series.Iterator()
iterator := series.Iterator(iterator)
for iterator.Next() == chunkenc.ValFloat {
timestamp, value := iterator.At()
result[timestamp] = value

13
storage/interface.go

@ -382,7 +382,7 @@ func (s mockSeries) Labels() labels.Labels {
return labels.FromStrings(s.labelSet...)
}
func (s mockSeries) Iterator() chunkenc.Iterator {
func (s mockSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
return chunkenc.MockSeriesIterator(s.timestamps, s.values)
}
@ -421,14 +421,17 @@ type Labels interface {
}
type SampleIterable interface {
// Iterator returns a new, independent iterator of the data of the series.
Iterator() chunkenc.Iterator
// Iterator returns an iterator of the data of the series.
// The iterator passed as argument is for re-use.
// Depending on implementation, the iterator can
// be re-used or a new iterator can be allocated.
Iterator(chunkenc.Iterator) chunkenc.Iterator
}
type ChunkIterable interface {
// Iterator returns a new, independent iterator that iterates over potentially overlapping
// Iterator returns an iterator that iterates over potentially overlapping
// chunks of the series, sorted by min time.
Iterator() chunks.Iterator
Iterator(chunks.Iterator) chunks.Iterator
}
type Warnings []error

14
storage/merge.go

@ -425,10 +425,10 @@ func ChainedSeriesMerge(series ...Series) Series {
}
return &SeriesEntry{
Lset: series[0].Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
iterators = append(iterators, s.Iterator(nil))
}
return NewChainSampleIterator(iterators)
},
@ -607,10 +607,10 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
iterators = append(iterators, s.Iterator(nil))
}
return &compactChunkIterator{
mergeFunc: mergeFunc,
@ -693,7 +693,7 @@ func (c *compactChunkIterator) Next() bool {
}
// Add last as it's not yet included in overlap. We operate on same series, so labels does not matter here.
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator()
iter = NewSeriesToChunkEncoder(c.mergeFunc(append(overlapping, newChunkToSeriesDecoder(nil, c.curr))...)).Iterator(nil)
if !iter.Next() {
if c.err = iter.Err(); c.err != nil {
return false
@ -751,10 +751,10 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
}
return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator())
iterators = append(iterators, s.Iterator(nil))
}
return &concatenatingChunkIterator{
iterators: iterators,

19
storage/merge_test.go

@ -202,8 +202,8 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
expectedSeries := tc.expected.At()
require.Equal(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(), nil)
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator(nil), nil)
actSmpl, actErr := ExpandSamples(actualSeries.Iterator(nil), nil)
require.Equal(t, expErr, actErr)
require.Equal(t, expSmpl, actSmpl)
}
@ -370,8 +370,8 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
expectedSeries := tc.expected.At()
require.Equal(t, expectedSeries.Labels(), actualSeries.Labels())
expChks, expErr := ExpandChunks(expectedSeries.Iterator())
actChks, actErr := ExpandChunks(actualSeries.Iterator())
expChks, expErr := ExpandChunks(expectedSeries.Iterator(nil))
actChks, actErr := ExpandChunks(actualSeries.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -533,8 +533,8 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
actChks, actErr := ExpandChunks(merged.Iterator(nil))
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -667,8 +667,8 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
merged := m(tc.input...)
require.Equal(t, tc.expected.Labels(), merged.Labels())
actChks, actErr := ExpandChunks(merged.Iterator())
expChks, expErr := ExpandChunks(tc.expected.Iterator())
actChks, actErr := ExpandChunks(merged.Iterator(nil))
expChks, expErr := ExpandChunks(tc.expected.Iterator(nil))
require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
@ -893,10 +893,11 @@ func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
var err error
var t int64
var v float64
var iter chunkenc.Iterator
for n := 0; n < b.N; n++ {
seriesSet := makeSeriesSet()
for seriesSet.Next() {
iter := seriesSet.At().Iterator()
iter = seriesSet.At().Iterator(iter)
for iter.Next() == chunkenc.ValFloat {
t, v = iter.At()
}

9
storage/remote/codec.go

@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
)
// decodeReadLimit is the maximum size of a read request body in bytes.
@ -115,9 +116,10 @@ func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHi
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, storage.Warnings, error) {
numSamples := 0
resp := &prompb.QueryResult{}
var iter chunkenc.Iterator
for ss.Next() {
series := ss.At()
iter := series.Iterator()
iter = series.Iterator(iter)
samples := []prompb.Sample{}
for iter.Next() == chunkenc.ValFloat {
@ -199,11 +201,12 @@ func StreamChunkedReadResponses(
var (
chks []prompb.Chunk
lbls []prompb.Label
iter chunks.Iterator
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
iter = series.Iterator(iter)
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
frameBytesLeft := maxBytesInFrame
@ -346,7 +349,7 @@ func (c *concreteSeries) Labels() labels.Labels {
return labels.New(c.labels...)
}
func (c *concreteSeries) Iterator() chunkenc.Iterator {
func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
return newConcreteSeriersIterator(c)
}

2
storage/remote/codec_test.go

@ -215,7 +215,7 @@ func TestConcreteSeriesIterator(t *testing.T) {
{Value: 4, Timestamp: 4},
},
}
it := series.Iterator()
it := series.Iterator(nil)
// Seek to the first sample with ts=1.
require.Equal(t, chunkenc.ValFloat, it.Seek(1))

26
storage/series.go

@ -27,25 +27,25 @@ import (
type SeriesEntry struct {
Lset labels.Labels
SampleIteratorFn func() chunkenc.Iterator
SampleIteratorFn func(chunkenc.Iterator) chunkenc.Iterator
}
func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *SeriesEntry) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
func (s *SeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return s.SampleIteratorFn(it) }
type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkIteratorFn func() chunks.Iterator
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
}
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
return &SeriesEntry{
Lset: lset,
SampleIteratorFn: func() chunkenc.Iterator {
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
return NewListSeriesIterator(samples(s))
},
}
@ -56,7 +56,7 @@ func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkSeriesEntry {
return &ChunkSeriesEntry{
Lset: lset,
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
chks := make([]chunks.Meta, 0, len(samples))
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
@ -178,7 +178,7 @@ func (c *chunkSetToSeriesSet) Next() bool {
return false
}
iter := c.ChunkSeriesSet.At().Iterator()
iter := c.ChunkSeriesSet.At().Iterator(nil)
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
@ -210,9 +210,9 @@ func (c *chunkSetToSeriesSet) Err() error {
func newChunkToSeriesDecoder(labels labels.Labels, chk chunks.Meta) Series {
return &SeriesEntry{
Lset: labels,
SampleIteratorFn: func() chunkenc.Iterator {
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
// TODO(bwplotka): Can we provide any chunkenc buffer?
return chk.Chunk.Iterator(nil)
return chk.Chunk.Iterator(it)
},
}
}
@ -252,7 +252,7 @@ func NewSeriesToChunkEncoder(series Series) ChunkSeries {
return &seriesToChunkEncoder{series}
}
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
var (
chk chunkenc.Chunk
app chunkenc.Appender
@ -263,7 +263,7 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
chks := []chunks.Meta{}
i := 0
seriesIter := s.Series.Iterator()
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if typ != lastType || i >= seriesToChunkEncoderSplit {

8
tsdb/block_test.go

@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) {
// Check chunk errors during iter time.
require.True(t, set.Next())
it := set.At().Iterator()
it := set.At().Iterator(nil)
require.Equal(t, chunkenc.ValNone, it.Next())
require.Equal(t, tc.iterErr.Error(), it.Err().Error())
})
@ -505,11 +505,12 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str
head, err := NewHead(nil, nil, w, nil, opts, nil)
require.NoError(tb, err)
var it chunkenc.Iterator
ctx := context.Background()
app := head.Appender(ctx)
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
typ := it.Next()
lastTyp := typ
@ -550,11 +551,12 @@ func createHeadWithOOOSamples(tb testing.TB, w *wlog.WL, series []storage.Series
oooSampleLabels := make([]labels.Labels, 0, len(series))
oooSamples := make([]tsdbutil.SampleSlice, 0, len(series))
var it chunkenc.Iterator
totalSamples := 0
app := head.Appender(context.Background())
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
os := tsdbutil.SampleSlice{}
count := 0

7
tsdb/compact.go

@ -746,8 +746,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
var (
ref = storage.SeriesRef(0)
chks []chunks.Meta
ref = storage.SeriesRef(0)
chks []chunks.Meta
chksIter chunks.Iterator
)
set := sets[0]
@ -765,7 +766,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}
s := set.At()
chksIter := s.Iterator()
chksIter = s.Iterator(chksIter)
chks = chks[:0]
for chksIter.Next() {
// We are not iterating in streaming way over chunk as

50
tsdb/db_test.go

@ -93,12 +93,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
require.NoError(t, q.Close())
}()
var it chunkenc.Iterator
result := map[string][]tsdbutil.Sample{}
for ss.Next() {
series := ss.At()
samples := []tsdbutil.Sample{}
it := series.Iterator()
it = series.Iterator(it)
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {
case chunkenc.ValFloat:
@ -133,12 +134,13 @@ func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Match
require.NoError(t, q.Close())
}()
var it chunks.Iterator
result := map[string][]chunks.Meta{}
for ss.Next() {
series := ss.At()
chks := []chunks.Meta{}
it := series.Iterator()
it = series.Iterator(it)
for it.Next() {
chks = append(chks, it.At())
}
@ -454,8 +456,8 @@ Outer:
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -628,9 +630,10 @@ func TestDB_Snapshot(t *testing.T) {
// sum values
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -676,9 +679,10 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
// Sum values.
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -770,8 +774,8 @@ Outer:
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -921,7 +925,7 @@ func TestDB_e2e(t *testing.T) {
for ss.Next() {
x := ss.At()
smpls, err := storage.ExpandSamples(x.Iterator(), newSample)
smpls, err := storage.ExpandSamples(x.Iterator(nil), newSample)
require.NoError(t, err)
if len(smpls) > 0 {
@ -1108,12 +1112,13 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore
set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+"))
actualSeries := 0
var chunksIt chunks.Iterator
for set.Next() {
actualSeries++
actualChunks := 0
chunksIt := set.At().Iterator()
chunksIt = set.At().Iterator(chunksIt)
for chunksIt.Next() {
actualChunks++
}
@ -1205,8 +1210,8 @@ func TestTombstoneClean(t *testing.T) {
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -1479,11 +1484,12 @@ func TestSizeRetention(t *testing.T) {
// Add some data to the WAL.
headApp := db.Head().Appender(context.Background())
var aSeries labels.Labels
var it chunkenc.Iterator
for _, m := range headBlocks {
series := genSeries(100, 10, m.MinTime, m.MaxTime+1)
for _, s := range series {
aSeries = s.Labels()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() == chunkenc.ValFloat {
tim, v := it.At()
_, err := headApp.Append(0, s.Labels(), tim, v)
@ -1691,10 +1697,11 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
func expandSeriesSet(ss storage.SeriesSet) ([]labels.Labels, map[string][]sample, storage.Warnings, error) {
resultLabels := []labels.Labels{}
resultSamples := map[string][]sample{}
var it chunkenc.Iterator
for ss.Next() {
series := ss.At()
samples := []sample{}
it := series.Iterator()
it = series.Iterator(it)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
samples = append(samples, sample{t: t, v: v})
@ -2500,10 +2507,11 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
// Sum the values.
seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
var series chunkenc.Iterator
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
_, v := series.At()
sum += v
@ -2946,10 +2954,11 @@ func TestCompactHead(t *testing.T) {
defer func() { require.NoError(t, querier.Close()) }()
seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"})
var series chunkenc.Iterator
var actSamples []sample
for seriesSet.Next() {
series := seriesSet.At().Iterator()
series = seriesSet.At().Iterator(series)
for series.Next() == chunkenc.ValFloat {
time, val := series.At()
actSamples = append(actSamples, sample{int64(time), val, nil, nil})
@ -3347,7 +3356,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t
actualSeries++
// Get the iterator and call Next() so that we're sure the chunk is loaded.
it := seriesSet.At().Iterator()
it := seriesSet.At().Iterator(nil)
it.Next()
it.At()
@ -3477,11 +3486,13 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun
seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+"))
// Iterate all series and get their chunks.
var it chunks.Iterator
var chunks []chunkenc.Chunk
actualSeries := 0
for seriesSet.Next() {
actualSeries++
for it := seriesSet.At().Iterator(); it.Next(); {
it = seriesSet.At().Iterator(it)
for it.Next() {
chunks = append(chunks, it.At().Chunk)
}
}
@ -6025,13 +6036,14 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
ctx := context.Background()
var it chunkenc.Iterator
exp := make(map[string][]tsdbutil.Sample)
for _, series := range blockSeries {
createBlock(t, db.Dir(), series)
for _, s := range series {
key := s.Labels().String()
it := s.Iterator()
it = s.Iterator(it)
slice := exp[key]
for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() {
switch typ {

2
tsdb/example_test.go

@ -67,7 +67,7 @@ func Example() {
series := ss.At()
fmt.Println("series:", series.Labels().String())
it := series.Iterator()
it := series.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
fmt.Println("sample", v)

24
tsdb/head_test.go

@ -924,8 +924,8 @@ func TestHeadDeleteSimple(t *testing.T) {
require.Equal(t, expSeries.Labels(), actSeries.Labels())
smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -959,7 +959,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, res.Next(), "series is not present")
s := res.At()
it := s.Iterator()
it := s.Iterator(nil)
require.Equal(t, chunkenc.ValNone, it.Next(), "expected no samples")
for res.Next() {
}
@ -976,7 +976,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, res.Next(), "series don't exist")
exps := res.At()
it = exps.Iterator()
it = exps.Iterator(nil)
resSamples, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
require.Equal(t, []tsdbutil.Sample{sample{11, 1, nil, nil}}, resSamples)
@ -1163,7 +1163,7 @@ func TestDelete_e2e(t *testing.T) {
eok, rok := expSs.Next(), ss.Next()
// Skip a series if iterator is empty.
if rok {
for ss.At().Iterator().Next() == chunkenc.ValNone {
for ss.At().Iterator(nil).Next() == chunkenc.ValNone {
rok = ss.Next()
if !rok {
break
@ -1177,8 +1177,8 @@ func TestDelete_e2e(t *testing.T) {
sexp := expSs.At()
sres := ss.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
}
@ -2635,7 +2635,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
<-time.After(3 * time.Second)
// Now consume after compaction when it's gone.
it := s.Iterator()
it := s.Iterator(nil)
for it.Next() == chunkenc.ValFloat {
_, _ = it.At()
}
@ -2643,7 +2643,7 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) {
require.NoError(t, it.Err())
for ss.Next() {
s = ss.At()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() == chunkenc.ValFloat {
_, _ = it.At()
}
@ -2841,7 +2841,7 @@ func TestAppendHistogram(t *testing.T) {
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram {
t, h := it.AtHistogram()
@ -3304,7 +3304,7 @@ func TestHistogramStaleSample(t *testing.T) {
s := ss.At()
require.False(t, ss.Next())
it := s.Iterator()
it := s.Iterator(nil)
actHistograms := make([]timedHistogram, 0, len(expHistograms))
for it.Next() == chunkenc.ValHistogram {
t, h := it.AtHistogram()
@ -3581,7 +3581,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
it := s.Iterator()
it := s.Iterator(nil)
expIdx := 0
loop:
for {

4
tsdb/querier.go

@ -838,7 +838,7 @@ func (b *blockSeriesSet) At() storage.Series {
currIterFn := b.currIterFn
return &storage.SeriesEntry{
Lset: b.currLabels,
SampleIteratorFn: func() chunkenc.Iterator {
SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator {
return currIterFn().toSeriesIterator()
},
}
@ -872,7 +872,7 @@ func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
currIterFn := b.currIterFn
return &storage.ChunkSeriesEntry{
Lset: b.currLabels,
ChunkIteratorFn: func() chunks.Iterator {
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
return currIterFn().toChunkSeriesIterator()
},
}

15
tsdb/querier_test.go

@ -194,8 +194,8 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
sres := res.At()
require.Equal(t, sexp.Labels(), sres.Labels())
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
smplExp, errExp := storage.ExpandSamples(sexp.Iterator(nil), nil)
smplRes, errRes := storage.ExpandSamples(sres.Iterator(nil), nil)
require.Equal(t, errExp, errRes)
require.Equal(t, smplExp, smplRes)
@ -230,9 +230,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
require.Equal(t, sexpChks.Labels(), sres.Labels())
chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator())
chksExp, errExp := storage.ExpandChunks(sexpChks.Iterator(nil))
rmChunkRefs(chksExp)
chksRes, errRes := storage.ExpandChunks(sres.Iterator())
chksRes, errRes := storage.ExpandChunks(sres.Iterator(nil))
rmChunkRefs(chksRes)
require.Equal(t, errExp, errRes)
require.Equal(t, chksExp, chksRes)
@ -1433,9 +1433,10 @@ func BenchmarkQuerySeek(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
var it chunkenc.Iterator
ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
for ss.Next() {
it := ss.At().Iterator()
it = ss.At().Iterator(it)
for t := mint; t <= maxt; t++ {
it.Seek(t)
}
@ -2042,11 +2043,13 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
for i := 0; i < b.N; i++ {
ss := q.Select(false, nil, selectors...)
var actualExpansions int
var it chunkenc.Iterator
for ss.Next() {
s := ss.At()
s.Labels()
it := s.Iterator()
it = s.Iterator(it)
for it.Next() != chunkenc.ValNone {
_, _ = it.At()
}
actualExpansions++
}

3
tsdb/tsdbblockutil.go

@ -49,10 +49,11 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l
const commitAfter = 10000
ctx := context.Background()
app := w.Appender(ctx)
var it chunkenc.Iterator
for _, s := range series {
ref := storage.SeriesRef(0)
it := s.Iterator()
it = s.Iterator(it)
lset := s.Labels()
typ := it.Next()
lastTyp := typ

4
web/federate.go

@ -102,12 +102,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
var chkIter chunkenc.Iterator
for set.Next() {
s := set.At()
// TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus.
it.Reset(s.Iterator())
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
var t int64
var v float64

Loading…
Cancel
Save