mirror of https://github.com/prometheus/prometheus
Remove locking from series iterator. Cache chunk iterators.
parent
cd5574bf8a
commit
81b190bf45
|
@ -398,8 +398,6 @@ func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
|
|||
|
||||
// getValueAtTime implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
||||
// state of the iterator and internals of the doubleDeltaChunk.
|
||||
i := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(t)
|
||||
})
|
||||
|
@ -438,8 +436,6 @@ func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestam
|
|||
|
||||
// getRangeValues implements chunkIterator.
|
||||
func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
|
||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
||||
// state of the iterator and internals of the doubleDeltaChunk.
|
||||
oldest := sort.Search(it.len, func(i int) bool {
|
||||
return !it.getTimestampAtIndex(i).Before(in.OldestInclusive)
|
||||
})
|
||||
|
|
|
@ -58,11 +58,11 @@ type Storage interface {
|
|||
WaitForIndexing()
|
||||
}
|
||||
|
||||
// SeriesIterator enables efficient access of sample values in a series. All
|
||||
// methods are goroutine-safe. A SeriesIterator iterates over a snapshot of a
|
||||
// series, i.e. it is safe to continue using a SeriesIterator after modifying
|
||||
// the corresponding series, but the iterator will represent the state of the
|
||||
// series prior the modification.
|
||||
// SeriesIterator enables efficient access of sample values in a series. Its
|
||||
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
|
||||
// a series, i.e. it is safe to continue using a SeriesIterator after or during
|
||||
// modifying the corresponding series, but the iterator will represent the state
|
||||
// of the series prior the modification.
|
||||
type SeriesIterator interface {
|
||||
// Gets the two values that are immediately adjacent to a given time. In
|
||||
// case a value exist at precisely the given time, only that single
|
||||
|
|
|
@ -415,7 +415,7 @@ func (s *memorySeries) preloadChunksForRange(
|
|||
|
||||
// newIterator returns a new SeriesIterator. The caller must have locked the
|
||||
// fingerprint of the memorySeries.
|
||||
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
||||
func (s *memorySeries) newIterator() SeriesIterator {
|
||||
chunks := make([]chunk, 0, len(s.chunkDescs))
|
||||
for i, cd := range s.chunkDescs {
|
||||
if chunk := cd.getChunk(); chunk != nil {
|
||||
|
@ -427,9 +427,8 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
|||
}
|
||||
|
||||
return &memorySeriesIterator{
|
||||
lock: lockFunc,
|
||||
unlock: unlockFunc,
|
||||
chunks: chunks,
|
||||
chunks: chunks,
|
||||
chunkIts: make([]chunkIterator, len(chunks)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -471,16 +470,13 @@ func (s *memorySeries) getChunksToPersist() []*chunkDesc {
|
|||
|
||||
// memorySeriesIterator implements SeriesIterator.
|
||||
type memorySeriesIterator struct {
|
||||
lock, unlock func()
|
||||
chunkIt chunkIterator
|
||||
chunks []chunk
|
||||
chunkIt chunkIterator // Last chunkIterator used by GetValueAtTime.
|
||||
chunkIts []chunkIterator // Caches chunkIterators.
|
||||
chunks []chunk
|
||||
}
|
||||
|
||||
// GetValueAtTime implements SeriesIterator.
|
||||
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||
it.lock()
|
||||
defer it.unlock()
|
||||
|
||||
// The most common case. We are iterating through a chunk.
|
||||
if it.chunkIt != nil && it.chunkIt.contains(t) {
|
||||
return it.chunkIt.getValueAtTime(t)
|
||||
|
@ -491,7 +487,7 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
}
|
||||
|
||||
// Before or exactly on the first sample of the series.
|
||||
it.chunkIt = it.chunks[0].newIterator()
|
||||
it.chunkIt = it.getChunkIterator(0)
|
||||
ts := it.chunkIt.getTimestampAtIndex(0)
|
||||
if !t.After(ts) {
|
||||
// return first value of first chunk
|
||||
|
@ -502,7 +498,7 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
}
|
||||
|
||||
// After or exactly on the last sample of the series.
|
||||
it.chunkIt = it.chunks[len(it.chunks)-1].newIterator()
|
||||
it.chunkIt = it.getChunkIterator(len(it.chunks) - 1)
|
||||
ts = it.chunkIt.getLastTimestamp()
|
||||
if !t.Before(ts) {
|
||||
// return last value of last chunk
|
||||
|
@ -520,7 +516,7 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
if i == len(it.chunks) {
|
||||
panic("out of bounds")
|
||||
}
|
||||
it.chunkIt = it.chunks[l-i].newIterator()
|
||||
it.chunkIt = it.getChunkIterator(l - i)
|
||||
ts = it.chunkIt.getLastTimestamp()
|
||||
if t.After(ts) {
|
||||
// We ended up between two chunks.
|
||||
|
@ -528,12 +524,12 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
Timestamp: ts,
|
||||
Value: it.chunkIt.getSampleValueAtIndex(it.chunkIt.length() - 1),
|
||||
}
|
||||
it.chunkIt = it.chunks[l-i+1].newIterator()
|
||||
it.chunkIt = it.getChunkIterator(l - i + 1)
|
||||
return metric.Values{
|
||||
sp1,
|
||||
metric.SamplePair{
|
||||
it.chunkIt.getTimestampAtIndex(0),
|
||||
it.chunkIt.getSampleValueAtIndex(0),
|
||||
Timestamp: it.chunkIt.getTimestampAtIndex(0),
|
||||
Value: it.chunkIt.getSampleValueAtIndex(0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -542,16 +538,13 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
|
|||
|
||||
// GetBoundaryValues implements SeriesIterator.
|
||||
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
|
||||
it.lock()
|
||||
defer it.unlock()
|
||||
|
||||
// Find the first chunk for which the first sample is within the interval.
|
||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||
})
|
||||
// Only now check the last timestamp of the previous chunk (which is
|
||||
// fairly expensive).
|
||||
if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) {
|
||||
if i > 0 && !it.getChunkIterator(i-1).getLastTimestamp().Before(in.OldestInclusive) {
|
||||
i--
|
||||
}
|
||||
|
||||
|
@ -563,7 +556,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
// already past the last value. The value we
|
||||
// want must be the last value of the previous
|
||||
// chunk. So backtrack...
|
||||
chunkIt := it.chunks[j-1].newIterator()
|
||||
chunkIt := it.getChunkIterator(i + j - 1)
|
||||
values = append(values, metric.SamplePair{
|
||||
Timestamp: chunkIt.getLastTimestamp(),
|
||||
Value: chunkIt.getLastSampleValue(),
|
||||
|
@ -571,7 +564,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
}
|
||||
break
|
||||
}
|
||||
chunkIt := c.newIterator()
|
||||
chunkIt := it.getChunkIterator(i + j)
|
||||
if len(values) == 0 {
|
||||
firstValues := chunkIt.getValueAtTime(in.OldestInclusive)
|
||||
switch len(firstValues) {
|
||||
|
@ -590,7 +583,7 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
}
|
||||
if len(values) == 1 {
|
||||
// We found exactly one value. In that case, add the most recent we know.
|
||||
chunkIt := it.chunks[len(it.chunks)-1].newIterator()
|
||||
chunkIt := it.getChunkIterator(len(it.chunks) - 1)
|
||||
values = append(values, metric.SamplePair{
|
||||
Timestamp: chunkIt.getLastTimestamp(),
|
||||
Value: chunkIt.getLastSampleValue(),
|
||||
|
@ -604,32 +597,37 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
|||
|
||||
// GetRangeValues implements SeriesIterator.
|
||||
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
|
||||
it.lock()
|
||||
defer it.unlock()
|
||||
|
||||
// Find the first chunk for which the first sample is within the interval.
|
||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||
// TODO: Avoid the expensive newIterator().getLastTimestamp() call.
|
||||
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||
})
|
||||
// Only now check the last timestamp of the previous chunk (which is
|
||||
// fairly expensive).
|
||||
if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) {
|
||||
if i > 0 && !it.getChunkIterator(i-1).getLastTimestamp().Before(in.OldestInclusive) {
|
||||
i--
|
||||
}
|
||||
|
||||
values := metric.Values{}
|
||||
for _, c := range it.chunks[i:] {
|
||||
for j, c := range it.chunks[i:] {
|
||||
if c.firstTime().After(in.NewestInclusive) {
|
||||
break
|
||||
}
|
||||
// TODO: actually reuse an iterator between calls if we get multiple ranges
|
||||
// from the same chunk.
|
||||
values = append(values, c.newIterator().getRangeValues(in)...)
|
||||
values = append(values, it.getChunkIterator(i+j).getRangeValues(in)...)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// getChunkIterator returns the chunkIterator for the chunk at position i (and
|
||||
// creates it if needed).
|
||||
func (it *memorySeriesIterator) getChunkIterator(i int) chunkIterator {
|
||||
chunkIt := it.chunkIts[i]
|
||||
if chunkIt == nil {
|
||||
chunkIt = it.chunks[i].newIterator()
|
||||
it.chunkIts[i] = chunkIt
|
||||
}
|
||||
return chunkIt
|
||||
}
|
||||
|
||||
// nopSeriesIterator implements Series Iterator. It never returns any values.
|
||||
type nopSeriesIterator struct{}
|
||||
|
||||
|
|
|
@ -277,10 +277,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
|
|||
// return any values.
|
||||
return nopSeriesIterator{}
|
||||
}
|
||||
return series.newIterator(
|
||||
func() { s.fpLocker.Lock(fp) },
|
||||
func() { s.fpLocker.Unlock(fp) },
|
||||
)
|
||||
return series.newIterator()
|
||||
}
|
||||
|
||||
// NewPreloader implements Storage.
|
||||
|
|
Loading…
Reference in New Issue