diff --git a/promql/value.go b/promql/value.go index 78342859e..4db976e97 100644 --- a/promql/value.go +++ b/promql/value.go @@ -364,6 +364,10 @@ func (ss *StorageSeries) Labels() labels.Labels { // Iterator returns a new iterator of the data of the series. func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if ssi, ok := it.(*storageSeriesIterator); ok { + ssi.reset(ss.series) + return ssi + } return newStorageSeriesIterator(ss.series) } @@ -379,6 +383,11 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator { } } +func (ssi *storageSeriesIterator) reset(series Series) { + ssi.points = series.Points + ssi.curr = -1 +} + func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType { i := ssi.curr if i < 0 { diff --git a/storage/merge.go b/storage/merge.go index 336d82c6f..78a0125db 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -425,12 +425,8 @@ func ChainedSeriesMerge(series ...Series) Series { } return &SeriesEntry{ Lset: series[0].Labels(), - SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(series)) - for _, s := range series { - iterators = append(iterators, s.Iterator(nil)) - } - return NewChainSampleIterator(iterators) + SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { + return ChainSampleIteratorFromSeries(it, series) }, } } @@ -446,15 +442,42 @@ type chainSampleIterator struct { lastT int64 } -// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted -// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same -// timestamp are dropped. -func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator { - return &chainSampleIterator{ - iterators: iterators, - h: nil, - lastT: math.MinInt64, +// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible. +func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator { + csi, ok := it.(*chainSampleIterator) + if !ok { + csi = &chainSampleIterator{} } + if cap(csi.iterators) < length { + csi.iterators = make([]chunkenc.Iterator, length) + } else { + csi.iterators = csi.iterators[:length] + } + csi.h = nil + csi.lastT = math.MinInt64 + return csi +} + +func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(series)) + for i, s := range series { + csi.iterators[i] = s.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator { + csi := getChainSampleIterator(it, len(chunks)) + for i, c := range chunks { + csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i]) + } + return csi +} + +func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator { + csi := getChainSampleIterator(it, 0) + csi.iterators = iterators + return csi } func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { diff --git a/storage/merge_test.go b/storage/merge_test.go index 407fc4ea5..ad68684c0 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -809,7 +809,7 @@ func TestChainSampleIterator(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual, err := ExpandSamples(merged, nil) require.NoError(t, err) require.Equal(t, tc.expected, actual) @@ -855,7 +855,7 @@ func TestChainSampleIteratorSeek(t *testing.T) { expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, }, } { - merged := NewChainSampleIterator(tc.input) + merged := ChainSampleIteratorFromIterators(nil, tc.input) actual := []tsdbutil.Sample{} if merged.Seek(tc.seek) == chunkenc.ValFloat { t, v := merged.At() diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 9b7516b87..a74ad2b7b 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -350,6 +350,10 @@ func (c *concreteSeries) Labels() labels.Labels { } func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if csi, ok := it.(*concreteSeriesIterator); ok { + csi.reset(c) + return csi + } return newConcreteSeriersIterator(c) } @@ -366,6 +370,11 @@ func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator { } } +func (c *concreteSeriesIterator) reset(series *concreteSeries) { + c.cur = -1 + c.series = series +} + // Seek implements storage.SeriesIterator. func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType { if c.cur == -1 { diff --git a/storage/series.go b/storage/series.go index 87b1256f6..339beb2c9 100644 --- a/storage/series.go +++ b/storage/series.go @@ -43,10 +43,15 @@ func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return // NewListSeries returns series entry with iterator that allows to iterate over provided samples. func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry { + samplesS := Samples(samples(s)) return &SeriesEntry{ Lset: lset, SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator { - return NewListSeriesIterator(samples(s)) + if lsi, ok := it.(*listSeriesIterator); ok { + lsi.Reset(samplesS) + return lsi + } + return NewListSeriesIterator(samplesS) }, } } @@ -57,10 +62,20 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam return &ChunkSeriesEntry{ Lset: lset, ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator { - chks := make([]chunks.Meta, 0, len(samples)) + lcsi, existing := it.(*listChunkSeriesIterator) + var chks []chunks.Meta + if existing { + chks = lcsi.chks[:0] + } else { + chks = make([]chunks.Meta, 0, len(samples)) + } for _, s := range samples { chks = append(chks, tsdbutil.ChunkFromSamples(s)) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) }, } @@ -87,6 +102,11 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } +func (it *listSeriesIterator) Reset(samples Samples) { + it.samples = samples + it.idx = -1 +} + func (it *listSeriesIterator) At() (int64, float64) { s := it.samples.Get(it.idx) return s.T(), s.V() @@ -150,6 +170,11 @@ func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator { return &listChunkSeriesIterator{chks: chks, idx: -1} } +func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) { + it.chks = chks + it.idx = -1 +} + func (it *listChunkSeriesIterator) At() chunks.Meta { return it.chks[it.idx] } @@ -164,6 +189,7 @@ func (it *listChunkSeriesIterator) Err() error { return nil } type chunkSetToSeriesSet struct { ChunkSeriesSet + iter chunks.Iterator chkIterErr error sameSeriesChunks []Series } @@ -178,18 +204,18 @@ func (c *chunkSetToSeriesSet) Next() bool { return false } - iter := c.ChunkSeriesSet.At().Iterator(nil) + c.iter = c.ChunkSeriesSet.At().Iterator(c.iter) c.sameSeriesChunks = c.sameSeriesChunks[:0] - for iter.Next() { + for c.iter.Next() { c.sameSeriesChunks = append( c.sameSeriesChunks, - newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()), + newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()), ) } - if iter.Err() != nil { - c.chkIterErr = iter.Err() + if c.iter.Err() != nil { + c.chkIterErr = c.iter.Err() return false } return true @@ -262,6 +288,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { maxt := int64(math.MinInt64) chks := []chunks.Meta{} + lcsi, existing := it.(*listChunkSeriesIterator) + if existing { + chks = lcsi.chks[:0] + } + i := 0 seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone @@ -323,6 +354,10 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { }) } + if existing { + lcsi.Reset(chks...) + return lcsi + } return NewListChunkSeriesIterator(chks...) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 6a273a0fd..985a15792 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -503,11 +503,7 @@ func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) { } func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { - iterators := make([]chunkenc.Iterator, 0, len(o.chunks)) - for _, c := range o.chunks { - iterators = append(iterators, c.Chunk.Iterator(nil)) - } - return storage.NewChainSampleIterator(iterators) + return storage.ChainSampleIteratorFromMetas(iterator, o.chunks) } func (o mergedOOOChunks) NumSamples() int {