From 0df3489275ed4357b12d98e737290992e19915ce Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Mon, 10 Jan 2022 10:36:45 -0300 Subject: [PATCH] Write chunks via queue, predicting the refs (#10051) * Write chunks via queue, predicting the refs Our load tests have shown that there is a latency spike in the remote write handler whenever the head chunks need to be written, because chunkDiskMapper.WriteChunk() blocks until the chunks are written to disk. This adds a queue to the chunk disk mapper which makes the WriteChunk() method non-blocking unless the queue is full. Reads can still be served from the queue. Signed-off-by: Mauro Stettler * address PR feeddback Signed-off-by: Mauro Stettler * initialize metrics without .Add(0) Signed-off-by: Mauro Stettler * change isRunningMtx to normal lock Signed-off-by: Mauro Stettler * do not re-initialize chunkrefmap Signed-off-by: Mauro Stettler * update metric outside of lock scope Signed-off-by: Mauro Stettler * add benchmark for adding job to chunk write queue Signed-off-by: Mauro Stettler * remove unnecessary "success" var Signed-off-by: Mauro Stettler * gofumpt -extra Signed-off-by: Mauro Stettler * avoid WithLabelValues call in addJob Signed-off-by: Mauro Stettler * format comments Signed-off-by: Mauro Stettler * addressing PR feedback Signed-off-by: Mauro Stettler * rename cutExpectRef to cutAndExpectRef Signed-off-by: Mauro Stettler * use head.Init() instead of .initTime() Signed-off-by: Mauro Stettler * address PR feedback Signed-off-by: Mauro Stettler * PR feedback Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Mauro Stettler * update test according to PR feedback Signed-off-by: Mauro Stettler * replace callbackWg -> awaitCb Signed-off-by: Mauro Stettler * better test of truncation with empty files Signed-off-by: Mauro Stettler * replace callbackWg -> awaitCb Signed-off-by: Mauro Stettler Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> --- tsdb/chunks/chunk_write_queue.go | 165 +++++++++++++++ tsdb/chunks/chunk_write_queue_test.go | 287 ++++++++++++++++++++++++++ tsdb/chunks/head_chunks.go | 229 +++++++++++++++----- tsdb/chunks/head_chunks_test.go | 161 +++++++++------ tsdb/db.go | 8 + tsdb/db_test.go | 10 +- tsdb/head.go | 5 + tsdb/head_append.go | 13 +- tsdb/head_test.go | 206 +++++++++++++++++- 9 files changed, 960 insertions(+), 124 deletions(-) create mode 100644 tsdb/chunks/chunk_write_queue.go create mode 100644 tsdb/chunks/chunk_write_queue_test.go diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go new file mode 100644 index 000000000..ce9b94d85 --- /dev/null +++ b/tsdb/chunks/chunk_write_queue.go @@ -0,0 +1,165 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "errors" + "sync" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +type chunkWriteJob struct { + cutFile bool + seriesRef HeadSeriesRef + mint int64 + maxt int64 + chk chunkenc.Chunk + ref ChunkDiskMapperRef + callback func(error) +} + +// chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion. +// Chunks that shall be written get added to the queue, which is consumed asynchronously. +// Adding jobs to the job is non-blocking as long as the queue isn't full. +type chunkWriteQueue struct { + jobs chan chunkWriteJob + + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk + + isRunningMtx sync.Mutex // Protects the isRunning property. + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. + + workerWg sync.WaitGroup + + writeChunk writeChunkF + + // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. + adds prometheus.Counter + gets prometheus.Counter + completed prometheus.Counter +} + +// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. +type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error + +func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { + counters := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_tsdb_chunk_write_queue_operations_total", + Help: "Number of operations on the chunk_write_queue.", + }, + []string{"operation"}, + ) + + q := &chunkWriteQueue{ + jobs: make(chan chunkWriteJob, size), + chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), + writeChunk: writeChunk, + + adds: counters.WithLabelValues("add"), + gets: counters.WithLabelValues("get"), + completed: counters.WithLabelValues("complete"), + } + + if reg != nil { + reg.MustRegister(counters) + } + + q.start() + return q +} + +func (c *chunkWriteQueue) start() { + c.workerWg.Add(1) + go func() { + defer c.workerWg.Done() + + for job := range c.jobs { + c.processJob(job) + } + }() + + c.isRunningMtx.Lock() + c.isRunning = true + c.isRunningMtx.Unlock() +} + +func (c *chunkWriteQueue) processJob(job chunkWriteJob) { + err := c.writeChunk(job.seriesRef, job.mint, job.maxt, job.chk, job.ref, job.cutFile) + if job.callback != nil { + job.callback(err) + } + + c.chunkRefMapMtx.Lock() + defer c.chunkRefMapMtx.Unlock() + + delete(c.chunkRefMap, job.ref) + + c.completed.Inc() +} + +func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { + defer func() { + if err == nil { + c.adds.Inc() + } + }() + + c.isRunningMtx.Lock() + defer c.isRunningMtx.Unlock() + + if !c.isRunning { + return errors.New("queue is not started") + } + + c.chunkRefMapMtx.Lock() + c.chunkRefMap[job.ref] = job.chk + c.chunkRefMapMtx.Unlock() + + c.jobs <- job + + return nil +} + +func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk { + c.chunkRefMapMtx.RLock() + defer c.chunkRefMapMtx.RUnlock() + + chk, ok := c.chunkRefMap[ref] + if ok { + c.gets.Inc() + } + + return chk +} + +func (c *chunkWriteQueue) stop() { + c.isRunningMtx.Lock() + defer c.isRunningMtx.Unlock() + + if !c.isRunning { + return + } + + c.isRunning = false + + close(c.jobs) + + c.workerWg.Wait() +} diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go new file mode 100644 index 000000000..251c96395 --- /dev/null +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -0,0 +1,287 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunks + +import ( + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +func TestChunkWriteQueue_GettingChunkFromQueue(t *testing.T) { + var blockWriterWg sync.WaitGroup + blockWriterWg.Add(1) + + // blockingChunkWriter blocks until blockWriterWg is done. + blockingChunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error { + blockWriterWg.Wait() + return nil + } + + q := newChunkWriteQueue(nil, 1000, blockingChunkWriter) + + defer q.stop() + defer blockWriterWg.Done() + + testChunk := chunkenc.NewXORChunk() + var ref ChunkDiskMapperRef + job := chunkWriteJob{ + chk: testChunk, + ref: ref, + } + require.NoError(t, q.addJob(job)) + + // Retrieve chunk from the queue. + gotChunk := q.get(ref) + require.Equal(t, testChunk, gotChunk) +} + +func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) { + var ( + gotSeriesRef HeadSeriesRef + gotMint, gotMaxt int64 + gotChunk chunkenc.Chunk + gotRef ChunkDiskMapperRef + gotCutFile bool + ) + + blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error { + gotSeriesRef = seriesRef + gotMint = mint + gotMaxt = maxt + gotChunk = chunk + gotRef = ref + gotCutFile = cutFile + return nil + } + + q := newChunkWriteQueue(nil, 1000, blockingChunkWriter) + defer q.stop() + + seriesRef := HeadSeriesRef(1) + var mint, maxt int64 = 2, 3 + chunk := chunkenc.NewXORChunk() + ref := newChunkDiskMapperRef(321, 123) + cutFile := true + awaitCb := make(chan struct{}) + require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(err error) { + close(awaitCb) + }})) + <-awaitCb + + // Compare whether the write function has received all job attributes correctly. + require.Equal(t, seriesRef, gotSeriesRef) + require.Equal(t, mint, gotMint) + require.Equal(t, maxt, gotMaxt) + require.Equal(t, chunk, gotChunk) + require.Equal(t, ref, gotRef) + require.Equal(t, cutFile, gotCutFile) +} + +func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { + sizeLimit := 100 + unblockChunkWriterCh := make(chan struct{}, sizeLimit) + + // blockingChunkWriter blocks until the unblockChunkWriterCh channel returns a value. + blockingChunkWriter := func(seriesRef HeadSeriesRef, mint, maxt int64, chunk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) error { + <-unblockChunkWriterCh + return nil + } + + q := newChunkWriteQueue(nil, sizeLimit, blockingChunkWriter) + defer q.stop() + // Unblock writers when shutting down. + defer close(unblockChunkWriterCh) + + var chunkRef ChunkDiskMapperRef + var callbackWg sync.WaitGroup + addChunk := func() { + callbackWg.Add(1) + require.NoError(t, q.addJob(chunkWriteJob{ + ref: chunkRef, + callback: func(err error) { + callbackWg.Done() + }, + })) + chunkRef++ + } + + unblockChunkWriter := func() { + unblockChunkWriterCh <- struct{}{} + } + + // Fill the queue to the middle of the size limit. + for job := 0; job < sizeLimit/2; job++ { + addChunk() + } + + // Consume the jobs. + for job := 0; job < sizeLimit/2; job++ { + unblockChunkWriter() + } + + // Add jobs until the queue is full. + // Note that one more job than can be added because one will be processed by the worker already + // and it will block on the chunk write function. + for job := 0; job < sizeLimit+1; job++ { + addChunk() + } + + // The queue should be full. + require.True(t, queueIsFull(q)) + + // Adding another job should block as long as no job from the queue gets consumed. + addedJob := atomic.NewBool(false) + go func() { + addChunk() + addedJob.Store(true) + }() + + // Wait for 10ms while the adding of a new job is blocked. + time.Sleep(time.Millisecond * 10) + require.False(t, addedJob.Load()) + + // Consume one job from the queue. + unblockChunkWriter() + + // Wait until the job has been added to the queue. + require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) + + // The queue should be full again. + require.True(t, queueIsFull(q)) + + // Consume +1 jobs from the queue. + // To drain the queue we need to consume +1 jobs because 1 job + // is already in the state of being processed. + for job := 0; job < sizeLimit+1; job++ { + require.False(t, queueIsEmpty(q)) + unblockChunkWriter() + } + + // Wait until all jobs have been processed. + callbackWg.Wait() + require.True(t, queueIsEmpty(q)) +} + +func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { + testError := errors.New("test error") + chunkWriter := func(_ HeadSeriesRef, _, _ int64, _ chunkenc.Chunk, _ ChunkDiskMapperRef, _ bool) error { + return testError + } + + awaitCb := make(chan struct{}) + var gotError error + callback := func(err error) { + gotError = err + close(awaitCb) + } + + q := newChunkWriteQueue(nil, 1, chunkWriter) + defer q.stop() + + job := chunkWriteJob{callback: callback} + require.NoError(t, q.addJob(job)) + + <-awaitCb + + require.Equal(t, testError, gotError) +} + +func BenchmarkChunkWriteQueue_addJob(b *testing.B) { + for _, withReads := range []bool{false, true} { + b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) { + for _, concurrentWrites := range []int{1, 10, 100, 1000} { + b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) { + issueReadSignal := make(chan struct{}) + q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error { + if withReads { + select { + case issueReadSignal <- struct{}{}: + default: + // Can't write to issueReadSignal, don't block but omit read instead. + } + } + return nil + }) + b.Cleanup(func() { + // Stopped already, so no more writes will happen. + close(issueReadSignal) + }) + b.Cleanup(q.stop) + + start := sync.WaitGroup{} + start.Add(1) + + jobs := make(chan chunkWriteJob, b.N) + for i := 0; i < b.N; i++ { + jobs <- chunkWriteJob{ + seriesRef: HeadSeriesRef(i), + ref: ChunkDiskMapperRef(i), + } + } + close(jobs) + + go func() { + for range issueReadSignal { + // We don't care about the ID we're getting, we just want to grab the lock. + _ = q.get(ChunkDiskMapperRef(0)) + } + }() + + done := sync.WaitGroup{} + done.Add(concurrentWrites) + for w := 0; w < concurrentWrites; w++ { + go func() { + start.Wait() + for j := range jobs { + _ = q.addJob(j) + } + done.Done() + }() + } + + b.ResetTimer() + start.Done() + done.Wait() + }) + } + }) + } +} + +func queueIsEmpty(q *chunkWriteQueue) bool { + return queueSize(q) == 0 +} + +func queueIsFull(q *chunkWriteQueue) bool { + // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh + // because one job is currently being processed and blocked in the writer. + return queueSize(q) == cap(q.jobs)+1 +} + +func queueSize(q *chunkWriteQueue) int { + q.chunkRefMapMtx.Lock() + defer q.chunkRefMapMtx.Unlock() + + // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has + // been fully processed, it remains in the chunkRefMap until the processing is complete. + return len(q.chunkRefMap) +} diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 51e6e3232..11c175cbf 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -26,7 +26,9 @@ import ( "strconv" "sync" + "github.com/dennwc/varint" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -66,6 +68,8 @@ const ( MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB. // DefaultWriteBufferSize is the default write buffer size. DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB. + // DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk. + DefaultWriteQueueSize = 1000 ) // ChunkDiskMapperRef represents the location of a head chunk on disk. @@ -77,10 +81,10 @@ func newChunkDiskMapperRef(seq, offset uint64) ChunkDiskMapperRef { return ChunkDiskMapperRef((seq << 32) | offset) } -func (ref ChunkDiskMapperRef) Unpack() (sgmIndex, chkStart int) { - sgmIndex = int(ref >> 32) - chkStart = int((ref << 32) >> 32) - return sgmIndex, chkStart +func (ref ChunkDiskMapperRef) Unpack() (seq, offset int) { + seq = int(ref >> 32) + offset = int((ref << 32) >> 32) + return seq, offset } // CorruptionErr is an error that's returned when corruption is encountered. @@ -94,18 +98,97 @@ func (e *CorruptionErr) Error() string { return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error() } +// chunkPos keeps track of the position in the head chunk files. +// chunkPos is not thread-safe, a lock must be used to protect it. +type chunkPos struct { + seq uint64 // Index of chunk file. + offset uint64 // Offset within chunk file. + cutFile bool // When true then the next chunk will be written to a new file. +} + +// getNextChunkRef takes a chunk and returns the chunk reference which will refer to it once it has been written. +// getNextChunkRef also decides whether a new file should be cut before writing this chunk, and it returns the decision via the second return value. +// The order of calling getNextChunkRef must be the order in which chunks are written to the disk. +func (f *chunkPos) getNextChunkRef(chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, cutFile bool) { + chkLen := uint64(len(chk.Bytes())) + bytesToWrite := f.bytesToWriteForChunk(chkLen) + + if f.shouldCutNewFile(chkLen) { + f.toNewFile() + f.cutFile = false + cutFile = true + } + + chkOffset := f.offset + f.offset += bytesToWrite + + return newChunkDiskMapperRef(f.seq, chkOffset), cutFile +} + +// toNewFile updates the seq/offset position to point to the beginning of a new chunk file. +func (f *chunkPos) toNewFile() { + f.seq++ + f.offset = SegmentHeaderSize +} + +// cutFileOnNextChunk triggers that the next chunk will be written in to a new file. +// Not thread safe, a lock must be held when calling this. +func (f *chunkPos) cutFileOnNextChunk() { + f.cutFile = true +} + +// initSeq sets the sequence number of the head chunk file. +// Should only be used for initialization, after that the sequence number will be managed by chunkPos. +func (f *chunkPos) initSeq(seq uint64) { + f.seq = seq +} + +// shouldCutNewFile returns whether a new file should be cut based on the file size. +// The read or write lock on chunkPos must be held when calling this. +func (f *chunkPos) shouldCutNewFile(chunkSize uint64) bool { + if f.cutFile { + return true + } + + return f.offset == 0 || // First head chunk file. + f.offset+chunkSize+MaxHeadChunkMetaSize > MaxHeadChunkFileSize // Exceeds the max head chunk file size. +} + +// bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size, +// including all meta data before and after the chunk data. +// Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk +func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 { + // Headers. + bytes := uint64(SeriesRefSize) + 2*MintMaxtSize + ChunkEncodingSize + + // Size of chunk length encoded as uvarint. + bytes += uint64(varint.UvarintSize(chkLen)) + + // Chunk length. + bytes += chkLen + + // crc32. + bytes += CRCSize + + return bytes +} + // ChunkDiskMapper is for writing the Head block chunks to the disk // and access chunks via mmapped file. type ChunkDiskMapper struct { - curFileNumBytes atomic.Int64 // Bytes written in current open file. - /// Writer. dir *os.File writeBufferSize int - curFile *os.File // File being written to. - curFileSequence int // Index of current open file being appended to. - curFileMaxt int64 // Used for the size retention. + curFile *os.File // File being written to. + curFileSequence int // Index of current open file being appended to. + curFileOffset atomic.Uint64 // Bytes written in current open file. + curFileMaxt int64 // Used for the size retention. + + // The values in evtlPos represent the file position which will eventually be + // reached once the content of the write queue has been fully processed. + evtlPosMtx sync.Mutex + evtlPos chunkPos byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. chkWriter *bufio.Writer // Writer for the current open file. @@ -128,6 +211,8 @@ type ChunkDiskMapper struct { // This is done after iterating through all the chunks in those files using the IterateAllChunks method. fileMaxtSet bool + writeQueue *chunkWriteQueue + closed bool } @@ -141,7 +226,7 @@ type mmappedChunkFile struct { // using the default head chunk file duration. // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper // to set the maxt of all the file. -func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*ChunkDiskMapper, error) { +func NewChunkDiskMapper(reg prometheus.Registerer, dir string, pool chunkenc.Pool, writeBufferSize, writeQueueSize int) (*ChunkDiskMapper, error) { // Validate write buffer size. if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) @@ -165,6 +250,7 @@ func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*C crc32: newCRC32(), chunkBuffer: newChunkBuffer(), } + m.writeQueue = newChunkWriteQueue(reg, writeQueueSize, m.writeChunk) if m.pool == nil { m.pool = chunkenc.NewPool() @@ -235,6 +321,8 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { } } + cdm.evtlPos.initSeq(uint64(lastSeq)) + return nil } @@ -287,17 +375,44 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk) (chkRef ChunkDiskMapperRef, err error) { +func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { + var err error + defer func() { + if err != nil && callback != nil { + callback(err) + } + }() + + // cdm.evtlPosMtx must be held to serialize the calls to .getNextChunkRef() and .addJob(). + cdm.evtlPosMtx.Lock() + defer cdm.evtlPosMtx.Unlock() + + ref, cutFile := cdm.evtlPos.getNextChunkRef(chk) + err = cdm.writeQueue.addJob(chunkWriteJob{ + cutFile: cutFile, + seriesRef: seriesRef, + mint: mint, + maxt: maxt, + chk: chk, + ref: ref, + callback: callback, + }) + + return ref +} + +func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, ref ChunkDiskMapperRef, cutFile bool) (err error) { cdm.writePathMtx.Lock() defer cdm.writePathMtx.Unlock() if cdm.closed { - return 0, ErrChunkDiskMapperClosed + return ErrChunkDiskMapperClosed } - if cdm.shouldCutNewFile(len(chk.Bytes())) { - if err := cdm.cut(); err != nil { - return 0, err + if cutFile { + err := cdm.cutAndExpectRef(ref) + if err != nil { + return err } } @@ -305,15 +420,13 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64 // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { if err := cdm.flushBuffer(); err != nil { - return 0, err + return err } } cdm.crc32.Reset() bytesWritten := 0 - chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef)) bytesWritten += SeriesRefSize binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) @@ -326,59 +439,69 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64 bytesWritten += n if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { - return 0, err + return err } if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { - return 0, err + return err } if err := cdm.writeCRC32(); err != nil { - return 0, err + return err } if maxt > cdm.curFileMaxt { cdm.curFileMaxt = maxt } - cdm.chunkBuffer.put(chkRef, chk) + cdm.chunkBuffer.put(ref, chk) if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { // The chunk was bigger than the buffer itself. // Flushing to not keep partial chunks in buffer. if err := cdm.flushBuffer(); err != nil { - return 0, err + return err } } - return chkRef, nil + return nil } -// shouldCutNewFile returns whether a new file should be cut, based on time and size retention. -// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. -// Time retention: so that we can delete old chunks with some time guarantee in low load environments. -func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { - return cdm.curFileSize() == 0 || // First head chunk file. - cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. +// CutNewFile makes that a new file will be created the next time a chunk is written. +func (cdm *ChunkDiskMapper) CutNewFile() { + cdm.evtlPosMtx.Lock() + defer cdm.evtlPosMtx.Unlock() + + cdm.evtlPos.cutFileOnNextChunk() } -// CutNewFile creates a new m-mapped file. -func (cdm *ChunkDiskMapper) CutNewFile() (returnErr error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() +// cutAndExpectRef creates a new m-mapped file. +// The write lock should be held before calling this. +// It ensures that the position in the new file matches the given chunk reference, if not then it errors. +func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err error) { + seq, offset, err := cdm.cut() + if err != nil { + return err + } + + if expSeq, expOffset := chkRef.Unpack(); seq != expSeq || offset != expOffset { + return errors.Errorf("expected newly cut file to have sequence:offset %d:%d, got %d:%d", expSeq, expOffset, seq, offset) + } - return cdm.cut() + return nil } // cut creates a new m-mapped file. The write lock should be held before calling this. -func (cdm *ChunkDiskMapper) cut() (returnErr error) { +// It returns the file sequence and the offset in that file to start writing chunks. +func (cdm *ChunkDiskMapper) cut() (seq, offset int, returnErr error) { // Sync current tail to disk and close. if err := cdm.finalizeCurFile(); err != nil { - return err + return 0, 0, err } - n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) + offset, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) if err != nil { - return err + return 0, 0, err } + defer func() { // The file should not be closed if there is no error, // its kept open in the ChunkDiskMapper. @@ -387,7 +510,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { } }() - cdm.curFileNumBytes.Store(int64(n)) + cdm.curFileOffset.Store(uint64(offset)) if cdm.curFile != nil { cdm.readPathMtx.Lock() @@ -397,7 +520,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize) if err != nil { - return err + return 0, 0, err } cdm.readPathMtx.Lock() @@ -415,7 +538,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { cdm.curFileMaxt = 0 - return nil + return seq, offset, nil } // finalizeCurFile writes all pending data to the current tail file, @@ -438,7 +561,7 @@ func (cdm *ChunkDiskMapper) finalizeCurFile() error { func (cdm *ChunkDiskMapper) write(b []byte) error { n, err := cdm.chkWriter.Write(b) - cdm.curFileNumBytes.Add(int64(n)) + cdm.curFileOffset.Add(uint64(n)) return err } @@ -476,6 +599,11 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error return nil, ErrChunkDiskMapperClosed } + chunk := cdm.writeQueue.get(ref) + if chunk != nil { + return chunk, nil + } + sgmIndex, chkStart := ref.Unpack() // We skip the series ref and the mint/maxt beforehand. chkStart += SeriesRefSize + (2 * MintMaxtSize) @@ -732,7 +860,10 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { errs := tsdb_errors.NewMulti() // Cut a new file only if the current file has some chunks. if cdm.curFileSize() > HeadChunkFileHeaderSize { - errs.Add(cdm.CutNewFile()) + // There is a known race condition here because between the check of curFileSize() and the call to CutNewFile() + // a new file could already be cut, this is acceptable because it will simply result in an empty file which + // won't do any harm. + cdm.CutNewFile() } errs.Add(cdm.deleteFiles(removedFiles)) return errs.Err() @@ -787,13 +918,19 @@ func (cdm *ChunkDiskMapper) Size() (int64, error) { return fileutil.DirSize(cdm.dir.Name()) } -func (cdm *ChunkDiskMapper) curFileSize() int64 { - return cdm.curFileNumBytes.Load() +func (cdm *ChunkDiskMapper) curFileSize() uint64 { + return cdm.curFileOffset.Load() } // Close closes all the open files in ChunkDiskMapper. // It is not longer safe to access chunks from this struct after calling Close. func (cdm *ChunkDiskMapper) Close() error { + // Locking the eventual position lock blocks WriteChunk() + cdm.evtlPosMtx.Lock() + defer cdm.evtlPosMtx.Unlock() + + cdm.writeQueue.stop() + // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. // The lock order should not be reversed here else it can cause deadlocks. cdm.writePathMtx.Lock() diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index 58c0f8ada..be1784137 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -28,7 +28,7 @@ import ( ) func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testChunkDiskMapper(t) + hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() @@ -129,8 +129,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // Testing IterateAllChunks method. dir := hrw.dir.Name() require.NoError(t, hrw.Close()) - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) + hrw = createChunkDiskMapper(t, dir) idx := 0 require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { @@ -156,10 +155,9 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { // TestChunkDiskMapper_Truncate tests // * If truncation is happening properly based on the time passed. // * The active file is not deleted even if the passed time makes it eligible to be deleted. -// * Empty current file does not lead to creation of another file after truncation. // * Non-empty current file leads to creation of another file after truncation. func TestChunkDiskMapper_Truncate(t *testing.T) { - hrw := testChunkDiskMapper(t) + hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() @@ -167,16 +165,20 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { timeRange := 0 fileTimeStep := 100 var thirdFileMinT, sixthFileMinT int64 - addChunk := func() int { - mint := timeRange + 1 // Just after the new file cut. - maxt := timeRange + fileTimeStep - 1 // Just before the next file. + t.Helper() - // Write a chunks to set maxt for the segment. - _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + var err error + awaitCb := make(chan struct{}) + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) { + err = cbErr + close(awaitCb) + }) + <-awaitCb require.NoError(t, err) - - timeRange += fileTimeStep + timeRange += step return mint } @@ -198,7 +200,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Create segments 1 to 7. for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() mint := int64(addChunk()) if i == 3 { thirdFileMinT = mint @@ -210,19 +212,17 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Truncating files. require.NoError(t, hrw.Truncate(thirdFileMinT)) + + // Add a chunk to trigger cutting of new file. + addChunk() + verifyFiles([]int{3, 4, 5, 6, 7, 8}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarted. - var err error - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) - require.True(t, hrw.fileMaxtSet) + hrw = createChunkDiskMapper(t, dir) verifyFiles([]int{3, 4, 5, 6, 7, 8}) // New file is created after restart even if last file was empty. @@ -231,15 +231,23 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Truncating files after restart. require.NoError(t, hrw.Truncate(sixthFileMinT)) - verifyFiles([]int{6, 7, 8, 9, 10}) + verifyFiles([]int{6, 7, 8, 9}) - // As the last file was empty, this creates no new files. + // Truncating a second time without adding a chunk shouldn't create a new file. require.NoError(t, hrw.Truncate(sixthFileMinT+1)) - verifyFiles([]int{6, 7, 8, 9, 10}) + verifyFiles([]int{6, 7, 8, 9}) + + // Add a chunk to trigger cutting of new file. addChunk() + verifyFiles([]int{6, 7, 8, 9, 10}) + // Truncating till current time should not delete the current active file. require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) + + // Add a chunk to trigger cutting of new file. + addChunk() + verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. } @@ -248,23 +256,40 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation // simply deleted all empty files instead of stopping once it encountered a non-empty file. func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { - hrw := testChunkDiskMapper(t) + hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() - timeRange := 0 + addChunk := func() { + t.Helper() + + awaitCb := make(chan struct{}) + step := 100 mint, maxt := timeRange+1, timeRange+step-1 - _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) - require.NoError(t, err) + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { + close(awaitCb) + require.NoError(t, err) + }) + <-awaitCb timeRange += step } + emptyFile := func() { - require.NoError(t, hrw.CutNewFile()) + t.Helper() + + _, _, err := hrw.cut() + require.NoError(t, err) + hrw.evtlPosMtx.Lock() + hrw.evtlPos.toNewFile() + hrw.evtlPosMtx.Unlock() } + nonEmptyFile := func() { + t.Helper() + emptyFile() addChunk() } @@ -297,42 +322,48 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { // though files 4 and 6 are empty. file2Maxt := hrw.mmappedChunkFiles[2].maxt require.NoError(t, hrw.Truncate(file2Maxt+1)) - // As 6 was empty, it should not create another file. verifyFiles([]int{3, 4, 5, 6}) + // Add chunk, so file 6 is not empty anymore. addChunk() - // Truncate creates another file as 6 is not empty now. - require.NoError(t, hrw.Truncate(file2Maxt+1)) - verifyFiles([]int{3, 4, 5, 6, 7}) + verifyFiles([]int{3, 4, 5, 6}) + + // Truncating till file 3 should also delete file 4, because it is empty. + file3Maxt := hrw.mmappedChunkFiles[3].maxt + require.NoError(t, hrw.Truncate(file3Maxt+1)) + addChunk() + verifyFiles([]int{5, 6, 7}) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) - // Restarting checks for unsequential files. - var err error - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - verifyFiles([]int{3, 4, 5, 6, 7}) + hrw = createChunkDiskMapper(t, dir) + verifyFiles([]int{5, 6, 7}) } // TestHeadReadWriter_TruncateAfterIterateChunksError tests for // https://github.com/prometheus/prometheus/issues/7753 func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { - hrw := testChunkDiskMapper(t) + hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() // Write a chunks to iterate on it later. - _, err := hrw.WriteChunk(1, 0, 1000, randomChunk(t)) + var err error + awaitCb := make(chan struct{}) + hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) { + err = cbErr + close(awaitCb) + }) + <-awaitCb require.NoError(t, err) dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) + hrw = createChunkDiskMapper(t, dir) // Forcefully failing IterateAllChunks. require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { @@ -344,21 +375,31 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { } func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { - hrw := testChunkDiskMapper(t) + hrw := createChunkDiskMapper(t, "") defer func() { require.NoError(t, hrw.Close()) }() timeRange := 0 addChunk := func() { + t.Helper() + step := 100 mint, maxt := timeRange+1, timeRange+step-1 - _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + var err error + awaitCb := make(chan struct{}) + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) { + err = cbErr + close(awaitCb) + }) + <-awaitCb require.NoError(t, err) timeRange += step } nonEmptyFile := func() { - require.NoError(t, hrw.CutNewFile()) + t.Helper() + + hrw.CutNewFile() addChunk() } @@ -388,11 +429,7 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { require.NoError(t, f.Close()) // Open chunk disk mapper again, corrupt file should be removed. - hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) - require.True(t, hrw.fileMaxtSet) + hrw = createChunkDiskMapper(t, dir) // Removed from memory. require.Equal(t, 3, len(hrw.mmappedChunkFiles)) @@ -411,18 +448,22 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { } } -func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper { - tmpdir, err := ioutil.TempDir("", "data") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(tmpdir)) - }) +func createChunkDiskMapper(t *testing.T, dir string) *ChunkDiskMapper { + if dir == "" { + var err error + dir, err = ioutil.TempDir("", "data") + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(dir)) + }) + } - hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) + hrw, err := NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), DefaultWriteBufferSize, DefaultWriteQueueSize) require.NoError(t, err) require.False(t, hrw.fileMaxtSet) require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) require.True(t, hrw.fileMaxtSet) + return hrw } @@ -443,7 +484,11 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomChunk(t) - chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk) - require.NoError(t, err) + awaitCb := make(chan struct{}) + chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { + require.NoError(t, err) + close(awaitCb) + }) + <-awaitCb return } diff --git a/tsdb/db.go b/tsdb/db.go index f00bd39f9..c85cd84cf 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -80,6 +80,7 @@ func DefaultOptions() *Options { StripeSize: DefaultStripeSize, HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, IsolationDisabled: defaultIsolationDisabled, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, } } @@ -135,6 +136,9 @@ type Options struct { // HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper. HeadChunksWriteBufferSize int + // HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper. + HeadChunksWriteQueueSize int + // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback @@ -582,6 +586,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if opts.HeadChunksWriteBufferSize <= 0 { opts.HeadChunksWriteBufferSize = chunks.DefaultWriteBufferSize } + if opts.HeadChunksWriteQueueSize < 0 { + opts.HeadChunksWriteQueueSize = chunks.DefaultWriteQueueSize + } if opts.MaxBlockChunkSegmentSize <= 0 { opts.MaxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize } @@ -704,6 +711,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.ChunkDirRoot = dir headOpts.ChunkPool = db.chunkPool headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize + headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.EnableExemplarStorage = opts.EnableExemplarStorage diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 063a90aba..45d2325ba 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -212,17 +212,13 @@ func TestNoPanicAfterWALCorruption(t *testing.T) { var maxt int64 ctx := context.Background() { - for { + // Appending 121 samples because on the 121st a new chunk will be created. + for i := 0; i < 121; i++ { app := db.Appender(ctx) _, err := app.Append(0, labels.FromStrings("foo", "bar"), maxt, 0) expSamples = append(expSamples, sample{t: maxt, v: 0}) require.NoError(t, err) require.NoError(t, app.Commit()) - mmapedChunks, err := ioutil.ReadDir(mmappedChunksDir(db.Dir())) - require.NoError(t, err) - if len(mmapedChunks) > 0 { - break - } maxt++ } require.NoError(t, db.Close()) @@ -2453,7 +2449,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, err) } require.NoError(t, app.Commit()) - defer func() { require.NoError(t, db.Close()) }() + require.NoError(t, db.Close()) } // Flush WAL. diff --git a/tsdb/head.go b/tsdb/head.go index be82310d6..878c11ac3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -127,6 +127,8 @@ type HeadOptions struct { ChunkDirRoot string ChunkPool chunkenc.Pool ChunkWriteBufferSize int + ChunkWriteQueueSize int + // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. @@ -144,6 +146,7 @@ func DefaultHeadOptions() *HeadOptions { ChunkDirRoot: "", ChunkPool: chunkenc.NewPool(), ChunkWriteBufferSize: chunks.DefaultWriteBufferSize, + ChunkWriteQueueSize: chunks.DefaultWriteQueueSize, StripeSize: DefaultStripeSize, SeriesCallback: &noopSeriesLifecycleCallback{}, IsolationDisabled: defaultIsolationDisabled, @@ -208,9 +211,11 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti } h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + r, mmappedChunksDir(opts.ChunkDirRoot), opts.ChunkPool, opts.ChunkWriteBufferSize, + opts.ChunkWriteQueueSize, ) if err != nil { return nil, err diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 4a14785d4..7cc5c3688 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -571,12 +571,7 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper return } - chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk) - if err != nil { - if err != chunks.ErrChunkDiskMapperClosed { - panic(err) - } - } + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError) s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ ref: chunkRef, numSamples: uint16(s.headChunk.chunk.NumSamples()), @@ -585,6 +580,12 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper }) } +func handleChunkWriteError(err error) { + if err != nil && err != chunks.ErrChunkDiskMapperClosed { + panic(err) + } +} + // Rollback removes the samples and exemplars from headAppender and writes any series to WAL. func (a *headAppender) Rollback() (err error) { if a.closed { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d5f6f6373..c37b04676 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -35,6 +35,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/exemplar" @@ -224,7 +225,7 @@ func BenchmarkLoadWAL(b *testing.B) { // Write mmapped chunks. if c.mmappedChunkT != 0 { - chunkDiskMapper, err := chunks.NewChunkDiskMapper(mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. @@ -270,6 +271,195 @@ func BenchmarkLoadWAL(b *testing.B) { } } +// TestHead_HighConcurrencyReadAndWrite generates 1000 series with a step of 15s and fills a whole block with samples, +// this means in total it generates 4000 chunks because with a step of 15s there are 4 chunks per block per series. +// While appending the samples to the head it concurrently queries them from multiple go routines and verifies that the +// returned results are correct. +func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { + head, _ := newTestHead(t, DefaultBlockDuration, false) + defer func() { + require.NoError(t, head.Close()) + }() + + seriesCnt := 1000 + readConcurrency := 2 + writeConcurrency := 10 + startTs := uint64(DefaultBlockDuration) // start at the second block relative to the unix epoch. + qryRange := uint64(5 * time.Minute.Milliseconds()) + step := uint64(15 * time.Second / time.Millisecond) + endTs := startTs + uint64(DefaultBlockDuration) + + labelSets := make([]labels.Labels, seriesCnt) + for i := 0; i < seriesCnt; i++ { + labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) + } + + head.Init(0) + + g, ctx := errgroup.WithContext(context.Background()) + whileNotCanceled := func(f func() (bool, error)) error { + for ctx.Err() == nil { + cont, err := f() + if err != nil { + return err + } + if !cont { + return nil + } + } + return nil + } + + // Create one channel for each write worker, the channels will be used by the coordinator + // go routine to coordinate which timestamps each write worker has to write. + writerTsCh := make([]chan uint64, writeConcurrency) + for writerTsChIdx := range writerTsCh { + writerTsCh[writerTsChIdx] = make(chan uint64) + } + + // workerReadyWg is used to synchronize the start of the test, + // we only start the test once all workers signal that they're ready. + var workerReadyWg sync.WaitGroup + workerReadyWg.Add(writeConcurrency + readConcurrency) + + // Start the write workers. + for wid := 0; wid < writeConcurrency; wid++ { + // Create copy of workerID to be used by worker routine. + workerID := wid + + g.Go(func() error { + // The label sets which this worker will write. + workerLabelSets := labelSets[(seriesCnt/writeConcurrency)*workerID : (seriesCnt/writeConcurrency)*(workerID+1)] + + // Signal that this worker is ready. + workerReadyWg.Done() + + return whileNotCanceled(func() (bool, error) { + ts, ok := <-writerTsCh[workerID] + if !ok { + return false, nil + } + + app := head.Appender(ctx) + for i := 0; i < len(workerLabelSets); i++ { + // We also use the timestamp as the sample value. + _, err := app.Append(0, workerLabelSets[i], int64(ts), float64(ts)) + if err != nil { + return false, fmt.Errorf("Error when appending to head: %w", err) + } + } + + return true, app.Commit() + }) + }) + } + + // queryHead is a helper to query the head for a given time range and labelset. + queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]tsdbutil.Sample, error) { + q, err := NewBlockQuerier(head, int64(mint), int64(maxt)) + if err != nil { + return nil, err + } + return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil + } + + // readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read. + readerTsCh := make(chan uint64) + + // Start the read workers. + for wid := 0; wid < readConcurrency; wid++ { + // Create copy of threadID to be used by worker routine. + workerID := wid + + g.Go(func() error { + querySeriesRef := (seriesCnt / readConcurrency) * workerID + + // Signal that this worker is ready. + workerReadyWg.Done() + + return whileNotCanceled(func() (bool, error) { + ts, ok := <-readerTsCh + if !ok { + return false, nil + } + + querySeriesRef = (querySeriesRef + 1) % seriesCnt + lbls := labelSets[querySeriesRef] + samples, err := queryHead(ts-qryRange, ts, lbls[0]) + if err != nil { + return false, err + } + + if len(samples) != 1 { + return false, fmt.Errorf("expected 1 series, got %d", len(samples)) + } + + series := lbls.String() + expectSampleCnt := qryRange/step + 1 + if expectSampleCnt != uint64(len(samples[series])) { + return false, fmt.Errorf("expected %d samples, got %d", expectSampleCnt, len(samples[series])) + } + + for sampleIdx, sample := range samples[series] { + expectedValue := ts - qryRange + (uint64(sampleIdx) * step) + if sample.T() != int64(expectedValue) { + return false, fmt.Errorf("expected sample %d to have ts %d, got %d", sampleIdx, expectedValue, sample.T()) + } + if sample.V() != float64(expectedValue) { + return false, fmt.Errorf("expected sample %d to have value %d, got %f", sampleIdx, expectedValue, sample.V()) + } + } + + return true, nil + }) + }) + } + + // Start the coordinator go routine. + g.Go(func() error { + currTs := startTs + + defer func() { + // End of the test, close all channels to stop the workers. + for _, ch := range writerTsCh { + close(ch) + } + close(readerTsCh) + }() + + // Wait until all workers are ready to start the test. + workerReadyWg.Wait() + return whileNotCanceled(func() (bool, error) { + // Send the current timestamp to each of the writers. + for _, ch := range writerTsCh { + select { + case ch <- currTs: + case <-ctx.Done(): + return false, nil + } + } + + // Once data for at least has been ingested, send the current timestamp to the readers. + if currTs > startTs+qryRange { + select { + case readerTsCh <- currTs - step: + case <-ctx.Done(): + return false, nil + } + } + + currTs += step + if currTs > endTs { + return false, nil + } + + return true, nil + }) + }) + + require.NoError(t, g.Wait()) +} + func TestHead_ReadWAL(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { @@ -540,7 +730,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) require.NoError(t, err) defer func() { require.NoError(t, chunkDiskMapper.Close()) @@ -1084,7 +1274,7 @@ func TestMemSeries_append(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) require.NoError(t, err) defer func() { require.NoError(t, chunkDiskMapper.Close()) @@ -1462,14 +1652,16 @@ func TestHeadReadWriterRepair(t *testing.T) { ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") - require.NoError(t, h.chunkDiskMapper.CutNewFile()) + h.chunkDiskMapper.CutNewFile() } require.NoError(t, h.Close()) - // Verify that there are 7 segment files. + // Verify that there are 6 segment files. + // It should only be 6 because the last call to .CutNewFile() won't + // take effect without another chunk being written. files, err := ioutil.ReadDir(mmappedChunksDir(dir)) require.NoError(t, err) - require.Equal(t, 7, len(files)) + require.Equal(t, 6, len(files)) // Corrupt the 4th file by writing a random byte to series ref. f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0o666) @@ -2270,7 +2462,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() // This is usually taken from the Head, but passing manually here. - chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) require.NoError(t, err) defer func() { require.NoError(t, chunkDiskMapper.Close())