Fix panics when m-mapping head chunks (#10316)

* Fix panics when m-mapping head chunks

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix review comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix reviews

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pull/10337/head
Ganesh Vernekar 3 years ago committed by GitHub
parent 56e14463bc
commit 24827782cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -137,9 +137,8 @@ func (f *chunkPos) cutFileOnNextChunk() {
f.cutFile = true f.cutFile = true
} }
// initSeq sets the sequence number of the head chunk file. // setSeq sets the sequence number of the head chunk file.
// Should only be used for initialization, after that the sequence number will be managed by chunkPos. func (f *chunkPos) setSeq(seq uint64) {
func (f *chunkPos) initSeq(seq uint64) {
f.seq = seq f.seq = seq
} }
@ -181,7 +180,7 @@ type ChunkDiskMapper struct {
writeBufferSize int writeBufferSize int
curFile *os.File // File being written to. curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to. curFileSequence int // Index of current open file being appended to. 0 if no file is active.
curFileOffset atomic.Uint64 // Bytes written in current open file. curFileOffset atomic.Uint64 // Bytes written in current open file.
curFileMaxt int64 // Used for the size retention. curFileMaxt int64 // Used for the size retention.
@ -321,7 +320,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
} }
} }
cdm.evtlPos.initSeq(uint64(lastSeq)) cdm.evtlPos.setSeq(uint64(lastSeq))
return nil return nil
} }
@ -869,16 +868,33 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
// won't do any harm. // won't do any harm.
cdm.CutNewFile() cdm.CutNewFile()
} }
errs.Add(cdm.deleteFiles(removedFiles)) pendingDeletes, err := cdm.deleteFiles(removedFiles)
errs.Add(err)
if len(chkFileIndices) == len(removedFiles) {
// All files were deleted. Reset the current sequence.
cdm.evtlPosMtx.Lock()
if err == nil {
cdm.evtlPos.setSeq(0)
} else {
// In case of error, set it to the last file number on the disk that was not deleted.
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
}
cdm.evtlPosMtx.Unlock()
}
return errs.Err() return errs.Err()
} }
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { // deleteFiles deletes the given file sequences in order of the sequence.
// In case of an error, it returns the sorted file sequences that were not deleted from the _disk_.
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) {
sort.Ints(removedFiles) // To delete them in order.
cdm.readPathMtx.Lock() cdm.readPathMtx.Lock()
for _, seq := range removedFiles { for _, seq := range removedFiles {
if err := cdm.closers[seq].Close(); err != nil { if err := cdm.closers[seq].Close(); err != nil {
cdm.readPathMtx.Unlock() cdm.readPathMtx.Unlock()
return err return removedFiles, err
} }
delete(cdm.mmappedChunkFiles, seq) delete(cdm.mmappedChunkFiles, seq)
delete(cdm.closers, seq) delete(cdm.closers, seq)
@ -886,13 +902,13 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
cdm.readPathMtx.Unlock() cdm.readPathMtx.Unlock()
// We actually delete the files separately to not block the readPathMtx for long. // We actually delete the files separately to not block the readPathMtx for long.
for _, seq := range removedFiles { for i, seq := range removedFiles {
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
return err return removedFiles[i:], err
} }
} }
return nil return nil, nil
} }
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption // DeleteCorrupted deletes all the head chunk files after the one which had the corruption
@ -907,14 +923,27 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
// Delete all the head chunk files following the corrupt head chunk file. // Delete all the head chunk files following the corrupt head chunk file.
segs := []int{} segs := []int{}
cdm.readPathMtx.RLock() cdm.readPathMtx.RLock()
lastSeq := 0
for seg := range cdm.mmappedChunkFiles { for seg := range cdm.mmappedChunkFiles {
if seg >= cerr.FileIndex { if seg >= cerr.FileIndex {
segs = append(segs, seg) segs = append(segs, seg)
} else if seg > lastSeq {
lastSeq = seg
} }
} }
cdm.readPathMtx.RUnlock() cdm.readPathMtx.RUnlock()
return cdm.deleteFiles(segs) pendingDeletes, err := cdm.deleteFiles(segs)
cdm.evtlPosMtx.Lock()
if err == nil {
cdm.evtlPos.setSeq(uint64(lastSeq))
} else {
// In case of error, set it to the last file number on the disk that was not deleted.
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1]))
}
cdm.evtlPosMtx.Unlock()
return err
} }
// Size returns the size of the chunk files. // Size returns the size of the chunk files.

@ -3512,3 +3512,61 @@ func newTestDB(t *testing.T) *DB {
}) })
return db return db
} }
// Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110.
func TestDBPanicOnMmappingHeadChunk(t *testing.T) {
dir := t.TempDir()
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
db.DisableCompactions()
// Choosing scrape interval of 45s to have chunk larger than 1h.
itvl := int64(45 * time.Second / time.Millisecond)
lastTs := int64(0)
addSamples := func(numSamples int) {
app := db.Appender(context.Background())
var ref storage.SeriesRef
lbls := labels.FromStrings("__name__", "testing", "foo", "bar")
for i := 0; i < numSamples; i++ {
ref, err = app.Append(ref, lbls, lastTs, float64(lastTs))
require.NoError(t, err)
lastTs += itvl
if i%10 == 0 {
require.NoError(t, app.Commit())
app = db.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
// Ingest samples upto 2h50m to make the head "about to compact".
numSamples := int(170*time.Minute/time.Millisecond) / int(itvl)
addSamples(numSamples)
require.Len(t, db.Blocks(), 0)
require.NoError(t, db.Compact())
require.Len(t, db.Blocks(), 0)
// Restarting.
require.NoError(t, db.Close())
db, err = Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
db.DisableCompactions()
// Ingest samples upto 20m more to make the head compact.
numSamples = int(20*time.Minute/time.Millisecond) / int(itvl)
addSamples(numSamples)
require.Len(t, db.Blocks(), 0)
require.NoError(t, db.Compact())
require.Len(t, db.Blocks(), 1)
// More samples to m-map and panic.
numSamples = int(120*time.Minute/time.Millisecond) / int(itvl)
addSamples(numSamples)
require.NoError(t, db.Close())
}

@ -627,7 +627,8 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
if !ok { if !ok {
slice := mmappedChunks[seriesRef] slice := mmappedChunks[seriesRef]
if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint { if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
seriesRef, slice[len(slice)-1].minTime, slice[len(slice)-1].maxTime, mint, maxt)
} }
slice = append(slice, &mmappedChunk{ slice = append(slice, &mmappedChunk{
@ -641,7 +642,9 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
} }
if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint { if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef) return errors.Errorf("out of sequence m-mapped chunk for series ref %d, last chunk: [%d, %d], new: [%d, %d]",
seriesRef, ms.mmappedChunks[len(ms.mmappedChunks)-1].minTime, ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime,
mint, maxt)
} }
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
@ -673,7 +676,10 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files") level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil { if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err) level.Info(h.logger).Log("msg", "Deletion of corrupted mmap chunk files failed, discarding chunk files completely", "err", err)
if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed", "err", err)
}
return map[chunks.HeadSeriesRef][]*mmappedChunk{} return map[chunks.HeadSeriesRef][]*mmappedChunk{}
} }
@ -681,6 +687,9 @@ func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[chunks.Head
mmappedChunks, err := h.loadMmappedChunks(refSeries) mmappedChunks, err := h.loadMmappedChunks(refSeries)
if err != nil { if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err) level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
if err := h.chunkDiskMapper.Truncate(math.MaxInt64); err != nil {
level.Error(h.logger).Log("msg", "Deletion of all mmap chunk files failed after failed loading", "err", err)
}
mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{} mmappedChunks = map[chunks.HeadSeriesRef][]*mmappedChunk{}
} }

@ -3143,3 +3143,58 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
require.Equal(t, 0, idx) require.Equal(t, 0, idx)
require.Greater(t, offset, 0) require.Greater(t, offset, 0)
} }
// Tests https://github.com/prometheus/prometheus/issues/10277.
func TestMmapPanicAfterMmapReplayCorruption(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
require.NoError(t, err)
opts := DefaultHeadOptions()
opts.ChunkRange = DefaultBlockDuration
opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
h, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
lastTs := int64(0)
var ref storage.SeriesRef
lbls := labels.FromStrings("__name__", "testing", "foo", "bar")
addChunks := func() {
interval := DefaultBlockDuration / (4 * 120)
app := h.Appender(context.Background())
for i := 0; i < 250; i++ {
ref, err = app.Append(ref, lbls, lastTs, float64(lastTs))
lastTs += interval
if i%10 == 0 {
require.NoError(t, app.Commit())
app = h.Appender(context.Background())
}
}
require.NoError(t, app.Commit())
}
addChunks()
require.NoError(t, h.Close())
wlog, err = wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, false)
require.NoError(t, err)
mmapFilePath := filepath.Join(dir, "chunks_head", "000001")
f, err := os.OpenFile(mmapFilePath, os.O_WRONLY, 0o666)
require.NoError(t, err)
_, err = f.WriteAt([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 17)
require.NoError(t, err)
require.NoError(t, f.Close())
h, err = NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, h.Init(0))
addChunks()
require.NoError(t, h.Close())
}

Loading…
Cancel
Save