From 9801f52b0a21a22dea6071772a2980eca5368b28 Mon Sep 17 00:00:00 2001 From: johncming Date: Thu, 23 Jul 2020 20:35:19 +0800 Subject: [PATCH] tsdb/chunks: fix bug of data race(#7643). (#7646) Signed-off-by: johncming --- tsdb/chunks/head_chunks.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 40b14afcd..a654682d1 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -260,7 +260,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk c // The upper 4 bytes are for the head chunk file index and // the lower 4 bytes are for the head chunk file offset where to start reading this chunk. - chkRef = chunkRef(uint64(cdm.curFileSequence), uint64(cdm.curFileNumBytes)) + chkRef = chunkRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], seriesRef) bytesWritten += SeriesRefSize @@ -308,8 +308,8 @@ func chunkRef(seq, offset uint64) (chunkRef uint64) { // Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. // Time retention: so that we can delete old chunks with some time guarantee in low load environments. func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { - return cdm.curFileNumBytes == 0 || // First head chunk file. - cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. + return cdm.curFileSize() == 0 || // First head chunk file. + cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. } // CutNewFile creates a new m-mapped file. @@ -342,7 +342,7 @@ func (cdm *ChunkDiskMapper) cut() (returnErr error) { } }() - cdm.size += cdm.curFileNumBytes + cdm.size += cdm.curFileSize() atomic.StoreInt64(&cdm.curFileNumBytes, int64(n)) if cdm.curFile != nil { @@ -558,7 +558,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mmapFile := cdm.mmappedChunkFiles[segID] fileEnd := mmapFile.byteSlice.Len() if segID == cdm.curFileSequence { - fileEnd = int(cdm.curFileNumBytes) + fileEnd = int(cdm.curFileSize()) } idx := HeadChunkFileHeaderSize for idx < fileEnd { @@ -681,7 +681,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { var merr tsdb_errors.MultiError // Cut a new file only if the current file has some chunks. - if cdm.curFileNumBytes > HeadChunkFileHeaderSize { + if cdm.curFileSize() > HeadChunkFileHeaderSize { merr.Add(cdm.CutNewFile()) } merr.Add(cdm.deleteFiles(removedFiles)) @@ -732,8 +732,11 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { // Size returns the size of the chunk files. func (cdm *ChunkDiskMapper) Size() int64 { - n := atomic.LoadInt64(&cdm.curFileNumBytes) - return cdm.size + n + return cdm.size + cdm.curFileSize() +} + +func (cdm *ChunkDiskMapper) curFileSize() int64 { + return atomic.LoadInt64(&cdm.curFileNumBytes) } // Close closes all the open files in ChunkDiskMapper.