From 24827782cbc546adba99c7645dc8a01750023597 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 22 Feb 2022 20:35:15 +0530 Subject: [PATCH] Fix panics when m-mapping head chunks (#10316) * Fix panics when m-mapping head chunks Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar * Fix reviews Signed-off-by: Ganesh Vernekar --- tsdb/chunks/head_chunks.go | 53 ++++++++++++++++++++++++++-------- tsdb/db_test.go | 58 ++++++++++++++++++++++++++++++++++++++ tsdb/head.go | 15 ++++++++-- tsdb/head_test.go | 55 ++++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 15 deletions(-) diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index b9393865f..94b20e129 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -137,9 +137,8 @@ func (f *chunkPos) cutFileOnNextChunk() { f.cutFile = true } -// initSeq sets the sequence number of the head chunk file. -// Should only be used for initialization, after that the sequence number will be managed by chunkPos. -func (f *chunkPos) initSeq(seq uint64) { +// setSeq sets the sequence number of the head chunk file. +func (f *chunkPos) setSeq(seq uint64) { f.seq = seq } @@ -181,7 +180,7 @@ type ChunkDiskMapper struct { writeBufferSize int curFile *os.File // File being written to. - curFileSequence int // Index of current open file being appended to. + curFileSequence int // Index of current open file being appended to. 0 if no file is active. curFileOffset atomic.Uint64 // Bytes written in current open file. curFileMaxt int64 // Used for the size retention. @@ -321,7 +320,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { } } - cdm.evtlPos.initSeq(uint64(lastSeq)) + cdm.evtlPos.setSeq(uint64(lastSeq)) return nil } @@ -869,16 +868,33 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { // won't do any harm. cdm.CutNewFile() } - errs.Add(cdm.deleteFiles(removedFiles)) + pendingDeletes, err := cdm.deleteFiles(removedFiles) + errs.Add(err) + + if len(chkFileIndices) == len(removedFiles) { + // All files were deleted. Reset the current sequence. + cdm.evtlPosMtx.Lock() + if err == nil { + cdm.evtlPos.setSeq(0) + } else { + // In case of error, set it to the last file number on the disk that was not deleted. + cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) + } + cdm.evtlPosMtx.Unlock() + } + return errs.Err() } -func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { +// deleteFiles deletes the given file sequences in order of the sequence. +// In case of an error, it returns the sorted file sequences that were not deleted from the _disk_. +func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) { + sort.Ints(removedFiles) // To delete them in order. cdm.readPathMtx.Lock() for _, seq := range removedFiles { if err := cdm.closers[seq].Close(); err != nil { cdm.readPathMtx.Unlock() - return err + return removedFiles, err } delete(cdm.mmappedChunkFiles, seq) delete(cdm.closers, seq) @@ -886,13 +902,13 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { cdm.readPathMtx.Unlock() // We actually delete the files separately to not block the readPathMtx for long. - for _, seq := range removedFiles { + for i, seq := range removedFiles { if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { - return err + return removedFiles[i:], err } } - return nil + return nil, nil } // DeleteCorrupted deletes all the head chunk files after the one which had the corruption @@ -907,14 +923,27 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error { // Delete all the head chunk files following the corrupt head chunk file. segs := []int{} cdm.readPathMtx.RLock() + lastSeq := 0 for seg := range cdm.mmappedChunkFiles { if seg >= cerr.FileIndex { segs = append(segs, seg) + } else if seg > lastSeq { + lastSeq = seg } } cdm.readPathMtx.RUnlock() - return cdm.deleteFiles(segs) + pendingDeletes, err := cdm.deleteFiles(segs) + cdm.evtlPosMtx.Lock() + if err == nil { + cdm.evtlPos.setSeq(uint64(lastSeq)) + } else { + // In case of error, set it to the last file number on the disk that was not deleted. + cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) + } + cdm.evtlPosMtx.Unlock() + + return err } // Size returns the size of the chunk files. diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d94c387b3..c29179292 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3512,3 +3512,61 @@ func newTestDB(t *testing.T) *DB { }) return db } + +// Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110. +func TestDBPanicOnMmappingHeadChunk(t *testing.T) { + dir := t.TempDir() + + db, err := Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + db.DisableCompactions() + + // Choosing scrape interval of 45s to have chunk larger than 1h. + itvl := int64(45 * time.Second / time.Millisecond) + + lastTs := int64(0) + addSamples := func(numSamples int) { + app := db.Appender(context.Background()) + var ref storage.SeriesRef + lbls := labels.FromStrings("__name__", "testing", "foo", "bar") + for i := 0; i < numSamples; i++ { + ref, err = app.Append(ref, lbls, lastTs, float64(lastTs)) + require.NoError(t, err) + lastTs += itvl + if i%10 == 0 { + require.NoError(t, app.Commit()) + app = db.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + + // Ingest samples upto 2h50m to make the head "about to compact". + numSamples := int(170*time.Minute/time.Millisecond) / int(itvl) + addSamples(numSamples) + + require.Len(t, db.Blocks(), 0) + require.NoError(t, db.Compact()) + require.Len(t, db.Blocks(), 0) + + // Restarting. + require.NoError(t, db.Close()) + + db, err = Open(dir, nil, nil, DefaultOptions(), nil) + require.NoError(t, err) + db.DisableCompactions() + + // Ingest samples upto 20m more to make the head compact. + numSamples = int(20*time.Minute/time.Millisecond) / int(itvl) + addSamples(numSamples) + + require.Len(t, db.Blocks(), 0) + require.NoError(t, db.Compact()) + require.Len(t, db.Blocks(), 1) + + // More samples to m-map and panic. + numSamples = int(120*time.Minute/time.Millisecond) / int(itvl) + addSamples(numSamples) + + require.NoError(t, db.Close()) +} diff --git a/tsdb/head.go b/tsdb/head.go index 878c11ac3..e1ab11fd9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -627,7 +627,8 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) if !ok { slice := mmappedChunks[seriesRef] if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint { - return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) + return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]", + seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt) } slice = append(slice, &mmappedChunk{ @@ -641,7 +642,9 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) } if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint { - return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) + return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]", + seriesRef, ms.mmappedChunks[len(ms.mmappedChunks)-1].minTime, ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime, + mint, maxt) } h.metrics.chunks.Inc() @@ -673,7 +676,10 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { - level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) + level.Info(h.logger).Log("msg", "Deletion of corrupted mmap chunk files failed, discarding chunk files completely", "err", err) + if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil { + level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err) + } return map[chunks.HeadSeriesRef][]*mmappedChunk{} } @@ -681,6 +687,9 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head mmappedChunks, err := h.loadMmappedChunks(refSeries) if err != nil { level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) + if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil { + level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed after failed loading", "err", err) + } mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{} } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index c37b04676..4a7a620e6 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3143,3 +3143,58 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) { require.Equal(t, 0, idx) require.Greater(t, offset, 0) } + +// Tests https://github.com/prometheus/prometheus/issues/10277. +func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) { + dir := t.TempDir() + wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + require.NoError(t, err) + + opts := DefaultHeadOptions() + opts.ChunkRange = DefaultBlockDuration + opts.ChunkDirRoot = dir + opts.EnableExemplarStorage = true + opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + + h, err := NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + lastTs := int64(0) + var ref storage.SeriesRef + lbls := labels.FromStrings("__name__", "testing", "foo", "bar") + addChunks := func() { + interval := DefaultBlockDuration / (4 * 120) + app := h.Appender(context.Background()) + for i := 0; i < 250; i++ { + ref, err = app.Append(ref, lbls, lastTs, float64(lastTs)) + lastTs += interval + if i%10 == 0 { + require.NoError(t, app.Commit()) + app = h.Appender(context.Background()) + } + } + require.NoError(t, app.Commit()) + } + + addChunks() + + require.NoError(t, h.Close()) + wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false) + require.NoError(t, err) + + mmapFilePath := filepath.Join(dir, "chunks_head", "000001") + f, err := os.OpenFile(mmapFilePath, os.O_WRONLY, 0o666) + require.NoError(t, err) + _, err = f.WriteAt([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 17) + require.NoError(t, err) + require.NoError(t, f.Close()) + + h, err = NewHead(nil, nil, wlog, opts, nil) + require.NoError(t, err) + require.NoError(t, h.Init(0)) + + addChunks() + + require.NoError(t, h.Close()) +}