@ -16,12 +16,21 @@ package chunks
import (
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
const (
// Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again.
chunkRefMapShrinkThreshold = 1000
// Minimum interval between shrinking of chunkWriteQueue.chunkRefMap.
chunkRefMapMinShrinkInterval = 10 * time . Minute
)
type chunkWriteJob struct {
cutFile bool
seriesRef HeadSeriesRef
@ -40,19 +49,26 @@ type chunkWriteQueue struct {
chunkRefMapMtx sync . RWMutex
chunkRefMap map [ ChunkDiskMapperRef ] chunkenc . Chunk
isRunningMtx sync . Mutex // Protects the isRunning property.
chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it.
chunkRefMapLastShrink time . Time // When the chunkRefMap has been shrunk the last time.
// isRunningMtx serves two purposes:
// 1. It protects isRunning field.
// 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block
// while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1.
isRunningMtx sync . Mutex
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
workerWg sync . WaitGroup
writeChunk writeChunkF
// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
// Keeping separate counters instead of only a single CounterVec to improve the performance of the critical
// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
adds prometheus . Counter
gets prometheus . Counter
completed prometheus . Counter
shrink prometheus . Counter
}
// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
@ -69,12 +85,14 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
q := & chunkWriteQueue {
jobs : make ( chan chunkWriteJob , size ) ,
chunkRefMap : make ( map [ ChunkDiskMapperRef ] chunkenc . Chunk , size ) ,
chunkRefMap : make ( map [ ChunkDiskMapperRef ] chunkenc . Chunk ) ,
chunkRefMapLastShrink : time . Now ( ) ,
writeChunk : writeChunk ,
adds : counters . WithLabelValues ( "add" ) ,
gets : counters . WithLabelValues ( "get" ) ,
completed : counters . WithLabelValues ( "complete" ) ,
shrink : counters . WithLabelValues ( "shrink" ) ,
}
if reg != nil {
@ -112,6 +130,42 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
delete ( c . chunkRefMap , job . ref )
c . completed . Inc ( )
c . shrinkChunkRefMap ( )
}
// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met,
// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method.
//
// We do this because Go runtime doesn't release internal memory used by map after map has been emptied.
// To achieve that we create new map instead and throw the old one away.
func ( c * chunkWriteQueue ) shrinkChunkRefMap ( ) {
if len ( c . chunkRefMap ) > 0 {
// Can't shrink it while there is data in it.
return
}
if c . chunkRefMapPeakSize < chunkRefMapShrinkThreshold {
// Not shrinking it because it has not grown to the minimum threshold yet.
return
}
now := time . Now ( )
if now . Sub ( c . chunkRefMapLastShrink ) < chunkRefMapMinShrinkInterval {
// Not shrinking it because the minimum duration between shrink-events has not passed yet.
return
}
// Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event.
// We are trying to hit the sweet spot in the trade-off between initializing it to a very small size
// potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially
// resulting in unused allocated memory.
c . chunkRefMap = make ( map [ ChunkDiskMapperRef ] chunkenc . Chunk , c . chunkRefMapPeakSize / 2 )
c . chunkRefMapPeakSize = 0
c . chunkRefMapLastShrink = now
c . shrink . Inc ( )
}
func ( c * chunkWriteQueue ) addJob ( job chunkWriteJob ) ( err error ) {
@ -125,11 +179,16 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
defer c . isRunningMtx . Unlock ( )
if ! c . isRunning {
return errors . New ( "queue is not started " )
return errors . New ( "queue is not running " )
}
c . chunkRefMapMtx . Lock ( )
c . chunkRefMap [ job . ref ] = job . chk
// Keep track of the peak usage of c.chunkRefMap.
if len ( c . chunkRefMap ) > c . chunkRefMapPeakSize {
c . chunkRefMapPeakSize = len ( c . chunkRefMap )
}
c . chunkRefMapMtx . Unlock ( )
c . jobs <- job