diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 1a238ad4e..1a2a9a983 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -56,11 +56,11 @@ const ( // chunkDesc contains meta-data for a chunk. Many of its methods are // goroutine-safe proxies for chunk methods. type chunkDesc struct { - sync.Mutex + sync.Mutex // TODO(beorn7): Try out if an RWMutex would help here. c chunk // nil if chunk is evicted. rCnt int - chunkFirstTime model.Time // Used if chunk is evicted. - chunkLastTime model.Time // Used if chunk is evicted. + chunkFirstTime model.Time // Populated at creation. + chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. // evictListElement is nil if the chunk is not in the evict list. // evictListElement is _not_ protected by the chunkDesc mutex. @@ -71,11 +71,16 @@ type chunkDesc struct { // newChunkDesc creates a new chunkDesc pointing to the provided chunk. The // provided chunk is assumed to be not persisted yet. Therefore, the refCount of // the new chunkDesc is 1 (preventing eviction prior to persisting). -func newChunkDesc(c chunk) *chunkDesc { +func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() atomic.AddInt64(&numMemChunks, 1) numMemChunkDescs.Inc() - return &chunkDesc{c: c, rCnt: 1} + return &chunkDesc{ + c: c, + rCnt: 1, + chunkFirstTime: firstTime, + chunkLastTime: model.Earliest, + } } func (cd *chunkDesc) add(s *model.SamplePair) []chunk { @@ -124,23 +129,27 @@ func (cd *chunkDesc) refCount() int { } func (cd *chunkDesc) firstTime() model.Time { + // No lock required, will never be modified. + return cd.chunkFirstTime +} + +func (cd *chunkDesc) lastTime() model.Time { cd.Lock() defer cd.Unlock() - if cd.c == nil { - return cd.chunkFirstTime + if cd.chunkLastTime != model.Earliest || cd.c == nil { + return cd.chunkLastTime } - return cd.c.firstTime() + return cd.c.newIterator().lastTimestamp() } -func (cd *chunkDesc) lastTime() model.Time { +func (cd *chunkDesc) maybePopulateLastTime() { cd.Lock() defer cd.Unlock() - if cd.c == nil { - return cd.chunkLastTime + if cd.chunkLastTime == model.Earliest && cd.c != nil { + cd.chunkLastTime = cd.c.newIterator().lastTimestamp() } - return cd.c.newIterator().lastTimestamp() } func (cd *chunkDesc) lastSamplePair() *model.SamplePair { @@ -198,8 +207,10 @@ func (cd *chunkDesc) maybeEvict() bool { if cd.rCnt != 0 { return false } - cd.chunkFirstTime = cd.c.firstTime() - cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + // Last opportunity to populate chunkLastTime. + if cd.chunkLastTime == model.Earliest { + cd.chunkLastTime = cd.c.newIterator().lastTimestamp() + } cd.c = nil chunkOps.WithLabelValues(evict).Inc() atomic.AddInt64(&numMemChunks, -1) diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 72b989ccb..8a1c178f5 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -856,7 +856,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in p.dirty = true return sm, chunksToPersist, nil } - chunkDescs[i] = newChunkDesc(chunk) + chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime()) chunksToPersist++ } } diff --git a/storage/local/series.go b/storage/local/series.go index 0bfe68ac8..d73154881 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -209,7 +209,7 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) // The caller must have locked the fingerprint of the series. func (s *memorySeries) add(v *model.SamplePair) int { if len(s.chunkDescs) == 0 || s.headChunkClosed { - newHead := newChunkDesc(newChunk()) + newHead := newChunkDesc(newChunk(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false } else if s.headChunkUsedByIterator && s.head().refCount() > 1 { @@ -233,7 +233,12 @@ func (s *memorySeries) add(v *model.SamplePair) int { s.head().c = chunks[0] for _, c := range chunks[1:] { - s.chunkDescs = append(s.chunkDescs, newChunkDesc(c)) + s.chunkDescs = append(s.chunkDescs, newChunkDesc(c, c.firstTime())) + } + + // Populate lastTime of now-closed chunks. + for _, cd := range s.chunkDescs[len(s.chunkDescs)-len(chunks) : len(s.chunkDescs)-1] { + cd.maybePopulateLastTime() } s.lastTime = v.Timestamp @@ -254,6 +259,7 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { // Since we cannot modify the head chunk from now on, we // don't need to bother with cloning anymore. s.headChunkUsedByIterator = false + s.head().maybePopulateLastTime() return true } return false