From ecee5d82811cc9793d5f3c0ee5b928be98dc767c Mon Sep 17 00:00:00 2001 From: Bjoern Rabenstein Date: Tue, 14 Oct 2014 17:44:01 +0200 Subject: [PATCH] Fix head chunk persisting and a chunkDesc race condition. - Head chunk persisting only happens in evictOlderThan, so do it there. (With the previous code, it would never happen.) - Raw accesses to chunkDesc.chunk are now done via isEvicted (with locking). Change-Id: I48b07b56dfea4899b50df159b4ea566954396fcd --- storage/local/series.go | 89 +++++++++++++++++++++++++--------------- storage/local/storage.go | 6 ++- 2 files changed, 59 insertions(+), 36 deletions(-) diff --git a/storage/local/series.go b/storage/local/series.go index 818f3db52..1481eb51a 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -183,6 +183,13 @@ func (cd *chunkDesc) lastTime() clientmodel.Timestamp { return cd.chunk.lastTime() } +func (cd *chunkDesc) isEvicted() bool { + cd.Lock() + defer cd.Unlock() + + return cd.chunk == nil +} + func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool { return !t.Before(cd.firstTime()) && !t.After(cd.lastTime()) } @@ -201,16 +208,21 @@ func (cd *chunkDesc) open(c chunk) { } // evictOnUnpin evicts the chunk once unpinned. If it is not pinned when this -// method is called, it evicts the chunk immediately and returns true. +// method is called, it evicts the chunk immediately and returns true. If the +// chunk is already evicted when this method is called, it returns true, too. func (cd *chunkDesc) evictOnUnpin() bool { cd.Lock() defer cd.Unlock() + if cd.chunk == nil { + // Already evicted. + return true + } + cd.evict = true if cd.refCount == 0 { cd.evictNow() return true } - cd.evict = true return false } @@ -229,9 +241,9 @@ type memorySeries struct { // (or all) chunkDescs are only on disk. These chunks are all contiguous // and at the tail end. chunkDescsLoaded bool - // Whether the current head chunk has already been persisted (or at - // least has been scheduled to be persisted). If true, the current head - // chunk must not be modified anymore. + // Whether the current head chunk has already been scheduled to be + // persisted. If true, the current head chunk must not be modified + // anymore. headChunkPersisted bool } @@ -286,35 +298,48 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per } } -// persistHeadChunk queues the head chunk for persisting if not already done. -// The caller must have locked the fingerprint of the series. -func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) { - if s.headChunkPersisted { - return - } - s.headChunkPersisted = true - persistQueue <- &persistRequest{ - fingerprint: fp, - chunkDesc: s.head(), - } -} - -// evictOlderThan evicts chunks whose latest sample is older than the given -// timestamp. It returns true if all chunks in the series were immediately -// evicted (i.e. all chunks are older than the timestamp, and none of the chunks -// was pinned). +// evictOlderThan marks for eviction all chunks whose latest sample is older +// than the given timestamp. It returns true if all chunks in the series were +// immediately evicted (i.e. all chunks are older than the timestamp, and none +// of the chunks was pinned). +// +// Special considerations for the head chunk: If it has not been scheduled to be +// persisted yet but is old enough for eviction, the scheduling happens now. (To +// do that, the method neets the fingerprint and the persist queue.) It is +// likely that the actual persisting will not happen soon enough to immediately +// evict the head chunk, though. Thus, calling evictOlderThan for a series with +// a non-persisted head chunk will most likely return false, even if no chunk is +// pinned for other reasons. A series old enough for archiving will usually +// require at least two eviction runs to become ready for archiving: In the +// first run, its head chunk is scheduled to be persisted. The next call of +// evictOlderThan will then return true, provided that the series hasn't +// received new samples in the meantime, the head chunk has now been persisted, +// and no chunk is pinned for other reasons. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) bool { +func (s *memorySeries) evictOlderThan( + t clientmodel.Timestamp, + fp clientmodel.Fingerprint, + persistQueue chan *persistRequest, +) bool { allEvicted := true // For now, always drop the entire range from oldest to t. - for _, cd := range s.chunkDescs { + for i, cd := range s.chunkDescs { if !cd.lastTime().Before(t) { return false } - if cd.chunk == nil { + if cd.isEvicted() { continue } + if !s.headChunkPersisted && i == len(s.chunkDescs)-1 { + // This is a non-persisted head chunk that is old enough + // for eviction. Queue it to be persisted: + s.headChunkPersisted = true + persistQueue <- &persistRequest{ + fingerprint: fp, + chunkDesc: cd, + } + } if !cd.evictOnUnpin() { allEvicted = false } @@ -323,6 +348,7 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) bool { } // purgeOlderThan returns true if all chunks have been purged. +// // The caller must have locked the fingerprint of the series. func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { keepIdx := len(s.chunkDescs) @@ -334,24 +360,19 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { } for i := 0; i < keepIdx; i++ { - if s.chunkDescs[i].chunk != nil { - s.chunkDescs[i].evictOnUnpin() - } + s.chunkDescs[i].evictOnUnpin() } s.chunkDescs = s.chunkDescs[keepIdx:] return len(s.chunkDescs) == 0 } // preloadChunks is an internal helper method. -// TODO: in this method (and other places), we just fudge around with chunkDesc -// internals without grabbing the chunkDesc lock. Study how this needs to be -// protected against other accesses that don't hold the fp lock. func (s *memorySeries) preloadChunks(indexes []int, p *persistence) (chunkDescs, error) { loadIndexes := []int{} pinnedChunkDescs := make(chunkDescs, 0, len(indexes)) for _, idx := range indexes { pinnedChunkDescs = append(pinnedChunkDescs, s.chunkDescs[idx]) - if s.chunkDescs[idx].chunk == nil { + if s.chunkDescs[idx].isEvicted() { loadIndexes = append(loadIndexes, idx) } else { s.chunkDescs[idx].pin() @@ -364,7 +385,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p *persistence) (chunkDescs, if err != nil { // Unpin any pinned chunks that were already loaded. for _, cd := range pinnedChunkDescs { - if cd.chunk != nil { + if !cd.isEvicted() { cd.unpin() } } @@ -459,7 +480,7 @@ func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator { chunks := make(chunks, 0, len(s.chunkDescs)) for i, cd := range s.chunkDescs { - if cd.chunk != nil { + if !cd.isEvicted() { if i == len(s.chunkDescs)-1 { chunks = append(chunks, cd.chunk.clone()) } else { diff --git a/storage/local/storage.go b/storage/local/storage.go index 050fd0073..60a58d726 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -205,8 +205,10 @@ func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { for m := range s.fingerprintToSeries.iter() { s.fpLocker.Lock(m.fp) - if m.series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { - m.series.persistHeadChunk(m.fp, s.persistQueue) + if m.series.evictOlderThan( + clientmodel.TimestampFromTime(time.Now()).Add(-1*ttl), + m.fp, s.persistQueue, + ) { s.fingerprintToSeries.del(m.fp) if err := s.persistence.archiveMetric( m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),