mirror of https://github.com/prometheus/prometheus
Merge pull request #2561 from prometheus/beorn7/storage2
storage: Evict unused chunk.Descs in crash recoverypull/2562/head
commit
50e4f49b7e
|
@ -304,6 +304,8 @@ func (p *persistence) sanitizeSeries(
|
||||||
}
|
}
|
||||||
s.persistWatermark = len(cds)
|
s.persistWatermark = len(cds)
|
||||||
s.modTime = modTime
|
s.modTime = modTime
|
||||||
|
// Finally, evict again all chunk.Descs except the latest one to save memory.
|
||||||
|
s.evictChunkDescs(len(cds) - 1)
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
// This is the tricky one: We have chunks from heads.db, but
|
// This is the tricky one: We have chunks from heads.db, but
|
||||||
|
@ -356,6 +358,8 @@ func (p *persistence) sanitizeSeries(
|
||||||
atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
|
atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
|
||||||
s.chunkDescs = cds
|
s.chunkDescs = cds
|
||||||
s.headChunkClosed = true
|
s.headChunkClosed = true
|
||||||
|
// Finally, evict again all chunk.Descs except the latest one to save memory.
|
||||||
|
s.evictChunkDescs(len(cds) - 1)
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
log.Warnf(
|
log.Warnf(
|
||||||
|
@ -364,11 +368,15 @@ func (p *persistence) sanitizeSeries(
|
||||||
)
|
)
|
||||||
chunk.NumMemDescs.Sub(float64(keepIdx))
|
chunk.NumMemDescs.Sub(float64(keepIdx))
|
||||||
atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
|
atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
|
||||||
|
chunkDescsToEvict := len(cds)
|
||||||
if keepIdx == len(s.chunkDescs) {
|
if keepIdx == len(s.chunkDescs) {
|
||||||
// No chunks from series file left, head chunk is evicted, so declare it closed.
|
// No chunks from series file left, head chunk is evicted, so declare it closed.
|
||||||
s.headChunkClosed = true
|
s.headChunkClosed = true
|
||||||
|
chunkDescsToEvict-- // Keep one chunk.Desc in this case to avoid a series with zero chunk.Descs.
|
||||||
}
|
}
|
||||||
s.chunkDescs = append(cds, s.chunkDescs[keepIdx:]...)
|
s.chunkDescs = append(cds, s.chunkDescs[keepIdx:]...)
|
||||||
|
// Finally, evict again chunk.Descs without chunk to save memory.
|
||||||
|
s.evictChunkDescs(chunkDescsToEvict)
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
// This series is supposed to be archived.
|
// This series is supposed to be archived.
|
||||||
|
@ -458,6 +466,8 @@ func (p *persistence) cleanUpArchiveIndexes(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fpToSeries[model.Fingerprint(fp)] = series
|
fpToSeries[model.Fingerprint(fp)] = series
|
||||||
|
// Evict all but one chunk.Desc to save memory.
|
||||||
|
series.evictChunkDescs(len(cds) - 1)
|
||||||
return nil
|
return nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1589,7 +1589,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries(
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// If we are here, the series is not archived, so check for Chunk.Desc
|
// If we are here, the series is not archived, so check for chunk.Desc
|
||||||
// eviction next.
|
// eviction next.
|
||||||
series.evictChunkDescs(iOldestNotEvicted)
|
series.evictChunkDescs(iOldestNotEvicted)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue