mirror of https://github.com/prometheus/prometheus
storage: re-use iterators to save garbage
Re-use previous memory if it is already of the correct type. In `NewListSeries` we hoist the conversion to an interface value out so it only allocates once. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>pull/11334/head
parent
f0866c0774
commit
463f5cafdd
|
@ -364,6 +364,10 @@ func (ss *StorageSeries) Labels() labels.Labels {
|
||||||
|
|
||||||
// Iterator returns a new iterator of the data of the series.
|
// Iterator returns a new iterator of the data of the series.
|
||||||
func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
func (ss *StorageSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||||
|
if ssi, ok := it.(*storageSeriesIterator); ok {
|
||||||
|
ssi.reset(ss.series)
|
||||||
|
return ssi
|
||||||
|
}
|
||||||
return newStorageSeriesIterator(ss.series)
|
return newStorageSeriesIterator(ss.series)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,6 +383,11 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ssi *storageSeriesIterator) reset(series Series) {
|
||||||
|
ssi.points = series.Points
|
||||||
|
ssi.curr = -1
|
||||||
|
}
|
||||||
|
|
||||||
func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
func (ssi *storageSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
i := ssi.curr
|
i := ssi.curr
|
||||||
if i < 0 {
|
if i < 0 {
|
||||||
|
|
|
@ -425,12 +425,8 @@ func ChainedSeriesMerge(series ...Series) Series {
|
||||||
}
|
}
|
||||||
return &SeriesEntry{
|
return &SeriesEntry{
|
||||||
Lset: series[0].Labels(),
|
Lset: series[0].Labels(),
|
||||||
SampleIteratorFn: func(chunkenc.Iterator) chunkenc.Iterator {
|
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
|
||||||
iterators := make([]chunkenc.Iterator, 0, len(series))
|
return ChainSampleIteratorFromSeries(it, series)
|
||||||
for _, s := range series {
|
|
||||||
iterators = append(iterators, s.Iterator(nil))
|
|
||||||
}
|
|
||||||
return NewChainSampleIterator(iterators)
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -446,15 +442,42 @@ type chainSampleIterator struct {
|
||||||
lastT int64
|
lastT int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChainSampleIterator returns a single iterator that iterates over the samples from the given iterators in a sorted
|
// Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible.
|
||||||
// fashion. If samples overlap, one sample from overlapped ones is kept (randomly) and all others with the same
|
func getChainSampleIterator(it chunkenc.Iterator, length int) *chainSampleIterator {
|
||||||
// timestamp are dropped.
|
csi, ok := it.(*chainSampleIterator)
|
||||||
func NewChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
|
if !ok {
|
||||||
return &chainSampleIterator{
|
csi = &chainSampleIterator{}
|
||||||
iterators: iterators,
|
|
||||||
h: nil,
|
|
||||||
lastT: math.MinInt64,
|
|
||||||
}
|
}
|
||||||
|
if cap(csi.iterators) < length {
|
||||||
|
csi.iterators = make([]chunkenc.Iterator, length)
|
||||||
|
} else {
|
||||||
|
csi.iterators = csi.iterators[:length]
|
||||||
|
}
|
||||||
|
csi.h = nil
|
||||||
|
csi.lastT = math.MinInt64
|
||||||
|
return csi
|
||||||
|
}
|
||||||
|
|
||||||
|
func ChainSampleIteratorFromSeries(it chunkenc.Iterator, series []Series) chunkenc.Iterator {
|
||||||
|
csi := getChainSampleIterator(it, len(series))
|
||||||
|
for i, s := range series {
|
||||||
|
csi.iterators[i] = s.Iterator(csi.iterators[i])
|
||||||
|
}
|
||||||
|
return csi
|
||||||
|
}
|
||||||
|
|
||||||
|
func ChainSampleIteratorFromMetas(it chunkenc.Iterator, chunks []chunks.Meta) chunkenc.Iterator {
|
||||||
|
csi := getChainSampleIterator(it, len(chunks))
|
||||||
|
for i, c := range chunks {
|
||||||
|
csi.iterators[i] = c.Chunk.Iterator(csi.iterators[i])
|
||||||
|
}
|
||||||
|
return csi
|
||||||
|
}
|
||||||
|
|
||||||
|
func ChainSampleIteratorFromIterators(it chunkenc.Iterator, iterators []chunkenc.Iterator) chunkenc.Iterator {
|
||||||
|
csi := getChainSampleIterator(it, 0)
|
||||||
|
csi.iterators = iterators
|
||||||
|
return csi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
|
|
|
@ -809,7 +809,7 @@ func TestChainSampleIterator(t *testing.T) {
|
||||||
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
|
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
merged := NewChainSampleIterator(tc.input)
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
actual, err := ExpandSamples(merged, nil)
|
actual, err := ExpandSamples(merged, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, tc.expected, actual)
|
require.Equal(t, tc.expected, actual)
|
||||||
|
@ -855,7 +855,7 @@ func TestChainSampleIteratorSeek(t *testing.T) {
|
||||||
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
|
expected: []tsdbutil.Sample{sample{0, 0, nil, nil}, sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}},
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
merged := NewChainSampleIterator(tc.input)
|
merged := ChainSampleIteratorFromIterators(nil, tc.input)
|
||||||
actual := []tsdbutil.Sample{}
|
actual := []tsdbutil.Sample{}
|
||||||
if merged.Seek(tc.seek) == chunkenc.ValFloat {
|
if merged.Seek(tc.seek) == chunkenc.ValFloat {
|
||||||
t, v := merged.At()
|
t, v := merged.At()
|
||||||
|
|
|
@ -350,6 +350,10 @@ func (c *concreteSeries) Labels() labels.Labels {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
func (c *concreteSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||||
|
if csi, ok := it.(*concreteSeriesIterator); ok {
|
||||||
|
csi.reset(c)
|
||||||
|
return csi
|
||||||
|
}
|
||||||
return newConcreteSeriersIterator(c)
|
return newConcreteSeriersIterator(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,6 +370,11 @@ func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *concreteSeriesIterator) reset(series *concreteSeries) {
|
||||||
|
c.cur = -1
|
||||||
|
c.series = series
|
||||||
|
}
|
||||||
|
|
||||||
// Seek implements storage.SeriesIterator.
|
// Seek implements storage.SeriesIterator.
|
||||||
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
func (c *concreteSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
||||||
if c.cur == -1 {
|
if c.cur == -1 {
|
||||||
|
|
|
@ -43,10 +43,15 @@ func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return
|
||||||
|
|
||||||
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
|
// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
|
||||||
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
|
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
|
||||||
|
samplesS := Samples(samples(s))
|
||||||
return &SeriesEntry{
|
return &SeriesEntry{
|
||||||
Lset: lset,
|
Lset: lset,
|
||||||
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
|
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
|
||||||
return NewListSeriesIterator(samples(s))
|
if lsi, ok := it.(*listSeriesIterator); ok {
|
||||||
|
lsi.Reset(samplesS)
|
||||||
|
return lsi
|
||||||
|
}
|
||||||
|
return NewListSeriesIterator(samplesS)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,10 +62,20 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam
|
||||||
return &ChunkSeriesEntry{
|
return &ChunkSeriesEntry{
|
||||||
Lset: lset,
|
Lset: lset,
|
||||||
ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
|
ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
|
||||||
chks := make([]chunks.Meta, 0, len(samples))
|
lcsi, existing := it.(*listChunkSeriesIterator)
|
||||||
|
var chks []chunks.Meta
|
||||||
|
if existing {
|
||||||
|
chks = lcsi.chks[:0]
|
||||||
|
} else {
|
||||||
|
chks = make([]chunks.Meta, 0, len(samples))
|
||||||
|
}
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
chks = append(chks, tsdbutil.ChunkFromSamples(s))
|
chks = append(chks, tsdbutil.ChunkFromSamples(s))
|
||||||
}
|
}
|
||||||
|
if existing {
|
||||||
|
lcsi.Reset(chks...)
|
||||||
|
return lcsi
|
||||||
|
}
|
||||||
return NewListChunkSeriesIterator(chks...)
|
return NewListChunkSeriesIterator(chks...)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -87,6 +102,11 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
|
||||||
return &listSeriesIterator{samples: samples, idx: -1}
|
return &listSeriesIterator{samples: samples, idx: -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *listSeriesIterator) Reset(samples Samples) {
|
||||||
|
it.samples = samples
|
||||||
|
it.idx = -1
|
||||||
|
}
|
||||||
|
|
||||||
func (it *listSeriesIterator) At() (int64, float64) {
|
func (it *listSeriesIterator) At() (int64, float64) {
|
||||||
s := it.samples.Get(it.idx)
|
s := it.samples.Get(it.idx)
|
||||||
return s.T(), s.V()
|
return s.T(), s.V()
|
||||||
|
@ -150,6 +170,11 @@ func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
|
||||||
return &listChunkSeriesIterator{chks: chks, idx: -1}
|
return &listChunkSeriesIterator{chks: chks, idx: -1}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *listChunkSeriesIterator) Reset(chks ...chunks.Meta) {
|
||||||
|
it.chks = chks
|
||||||
|
it.idx = -1
|
||||||
|
}
|
||||||
|
|
||||||
func (it *listChunkSeriesIterator) At() chunks.Meta {
|
func (it *listChunkSeriesIterator) At() chunks.Meta {
|
||||||
return it.chks[it.idx]
|
return it.chks[it.idx]
|
||||||
}
|
}
|
||||||
|
@ -164,6 +189,7 @@ func (it *listChunkSeriesIterator) Err() error { return nil }
|
||||||
type chunkSetToSeriesSet struct {
|
type chunkSetToSeriesSet struct {
|
||||||
ChunkSeriesSet
|
ChunkSeriesSet
|
||||||
|
|
||||||
|
iter chunks.Iterator
|
||||||
chkIterErr error
|
chkIterErr error
|
||||||
sameSeriesChunks []Series
|
sameSeriesChunks []Series
|
||||||
}
|
}
|
||||||
|
@ -178,18 +204,18 @@ func (c *chunkSetToSeriesSet) Next() bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
iter := c.ChunkSeriesSet.At().Iterator(nil)
|
c.iter = c.ChunkSeriesSet.At().Iterator(c.iter)
|
||||||
c.sameSeriesChunks = c.sameSeriesChunks[:0]
|
c.sameSeriesChunks = c.sameSeriesChunks[:0]
|
||||||
|
|
||||||
for iter.Next() {
|
for c.iter.Next() {
|
||||||
c.sameSeriesChunks = append(
|
c.sameSeriesChunks = append(
|
||||||
c.sameSeriesChunks,
|
c.sameSeriesChunks,
|
||||||
newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), iter.At()),
|
newChunkToSeriesDecoder(c.ChunkSeriesSet.At().Labels(), c.iter.At()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if iter.Err() != nil {
|
if c.iter.Err() != nil {
|
||||||
c.chkIterErr = iter.Err()
|
c.chkIterErr = c.iter.Err()
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -262,6 +288,11 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
maxt := int64(math.MinInt64)
|
maxt := int64(math.MinInt64)
|
||||||
|
|
||||||
chks := []chunks.Meta{}
|
chks := []chunks.Meta{}
|
||||||
|
lcsi, existing := it.(*listChunkSeriesIterator)
|
||||||
|
if existing {
|
||||||
|
chks = lcsi.chks[:0]
|
||||||
|
}
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
seriesIter := s.Series.Iterator(nil)
|
seriesIter := s.Series.Iterator(nil)
|
||||||
lastType := chunkenc.ValNone
|
lastType := chunkenc.ValNone
|
||||||
|
@ -323,6 +354,10 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if existing {
|
||||||
|
lcsi.Reset(chks...)
|
||||||
|
return lcsi
|
||||||
|
}
|
||||||
return NewListChunkSeriesIterator(chks...)
|
return NewListChunkSeriesIterator(chks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -503,11 +503,7 @@ func (o mergedOOOChunks) Appender() (chunkenc.Appender, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
|
func (o mergedOOOChunks) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
|
||||||
iterators := make([]chunkenc.Iterator, 0, len(o.chunks))
|
return storage.ChainSampleIteratorFromMetas(iterator, o.chunks)
|
||||||
for _, c := range o.chunks {
|
|
||||||
iterators = append(iterators, c.Chunk.Iterator(nil))
|
|
||||||
}
|
|
||||||
return storage.NewChainSampleIterator(iterators)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o mergedOOOChunks) NumSamples() int {
|
func (o mergedOOOChunks) NumSamples() int {
|
||||||
|
|
Loading…
Reference in New Issue