diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d1563b1d5..6e8fad806 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -305,6 +305,9 @@ func main() { serverOnlyFlag(a, "storage.tsdb.wal-compression", "Compress the tsdb WAL."). Hidden().Default("true").BoolVar(&cfg.tsdb.WALCompression) + serverOnlyFlag(a, "storage.tsdb.head-chunks-write-queue-size", "Size of the queue through which head chunks are written to the disk to be m-mapped, 0 disables the queue completely. Experimental."). + Default("0").IntVar(&cfg.tsdb.HeadChunksWriteQueueSize) + agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -1484,6 +1487,7 @@ type tsdbOptions struct { NoLockfile bool AllowOverlappingBlocks bool WALCompression bool + HeadChunksWriteQueueSize int StripeSize int MinBlockDuration model.Duration MaxBlockDuration model.Duration @@ -1501,6 +1505,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { NoLockfile: opts.NoLockfile, AllowOverlappingBlocks: opts.AllowOverlappingBlocks, WALCompression: opts.WALCompression, + HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, StripeSize: opts.StripeSize, MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond), diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 94b20e129..307ecfc96 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -69,7 +69,8 @@ const ( // DefaultWriteBufferSize is the default write buffer size. DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB. // DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk. - DefaultWriteQueueSize = 1000 + // A value of 0 completely disables this feature. + DefaultWriteQueueSize = 0 ) // ChunkDiskMapperRef represents the location of a head chunk on disk. @@ -249,7 +250,10 @@ func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Poo crc32: newCRC32(), chunkBuffer: newChunkBuffer(), } - m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk) + + if writeQueueSize > 0 { + m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk) + } if m.pool == nil { m.pool = chunkenc.NewPool() @@ -375,18 +379,33 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { - var err error - defer func() { - if err != nil && callback != nil { - callback(err) - } - }() - - // cdm.evtlPosMtx must be held to serialize the calls to .getNextChunkRef() and .addJob(). + // cdm.evtlPosMtx must be held to serialize the calls to cdm.evtlPos.getNextChunkRef() and the writing of the chunk (either with or without queue). cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() - ref, cutFile := cdm.evtlPos.getNextChunkRef(chk) + + if cdm.writeQueue != nil { + return cdm.writeChunkViaQueue(ref, cutFile, seriesRef, mint, maxt, chk, callback) + } + + err := cdm.writeChunk(seriesRef, mint, maxt, chk, ref, cutFile) + if callback != nil { + callback(err) + } + + return ref +} + +func (cdm *ChunkDiskMapper) writeChunkViaQueue(ref ChunkDiskMapperRef, cutFile bool, seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { + var err error + if callback != nil { + defer func() { + if err != nil { + callback(err) + } + }() + } + err = cdm.writeQueue.addJob(chunkWriteJob{ cutFile: cutFile, seriesRef: seriesRef, @@ -473,6 +492,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() { } func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { + if cdm.writeQueue == nil { + return true + } + return cdm.writeQueue.queueIsEmpty() } @@ -602,9 +625,11 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, ErrChunkDiskMapperClosed } - chunk := cdm.writeQueue.get(ref) - if chunk != nil { - return chunk, nil + if cdm.writeQueue != nil { + chunk := cdm.writeQueue.get(ref) + if chunk != nil { + return chunk, nil + } } sgmIndex, chkStart := ref.Unpack() @@ -962,7 +987,9 @@ func (cdm *ChunkDiskMapper) Close() error { cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() - cdm.writeQueue.stop() + if cdm.writeQueue != nil { + cdm.writeQueue.stop() + } // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. // The lock order should not be reversed here else it can cause deadlocks. diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 9dca41d78..57c9c3401 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -27,6 +27,22 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" ) +var writeQueueSize int + +func TestMain(m *testing.M) { + // Run all tests with the chunk write queue disabled. + writeQueueSize = 0 + exitVal := m.Run() + if exitVal != 0 { + os.Exit(exitVal) + } + + // Re-run all tests with the chunk write queue size of 1e6. + writeQueueSize = 1000000 + exitVal = m.Run() + os.Exit(exitVal) +} + func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { hrw := createChunkDiskMapper(t, "") defer func() { @@ -453,7 +469,7 @@ func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { dir = t.TempDir() } - hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize) + hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, writeQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))