|
|
|
@ -137,9 +137,8 @@ func (f *chunkPos) cutFileOnNextChunk() {
|
|
|
|
|
f.cutFile = true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// initSeq sets the sequence number of the head chunk file.
|
|
|
|
|
// Should only be used for initialization, after that the sequence number will be managed by chunkPos.
|
|
|
|
|
func (f *chunkPos) initSeq(seq uint64) { |
|
|
|
|
// setSeq sets the sequence number of the head chunk file.
|
|
|
|
|
func (f *chunkPos) setSeq(seq uint64) { |
|
|
|
|
f.seq = seq |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -181,7 +180,7 @@ type ChunkDiskMapper struct {
|
|
|
|
|
writeBufferSize int |
|
|
|
|
|
|
|
|
|
curFile *os.File // File being written to.
|
|
|
|
|
curFileSequence int // Index of current open file being appended to.
|
|
|
|
|
curFileSequence int // Index of current open file being appended to. 0 if no file is active.
|
|
|
|
|
curFileOffset atomic.Uint64 // Bytes written in current open file.
|
|
|
|
|
curFileMaxt int64 // Used for the size retention.
|
|
|
|
|
|
|
|
|
@ -321,7 +320,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cdm.evtlPos.initSeq(uint64(lastSeq)) |
|
|
|
|
cdm.evtlPos.setSeq(uint64(lastSeq)) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -869,16 +868,33 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
|
|
|
|
|
// won't do any harm.
|
|
|
|
|
cdm.CutNewFile() |
|
|
|
|
} |
|
|
|
|
errs.Add(cdm.deleteFiles(removedFiles)) |
|
|
|
|
pendingDeletes, err := cdm.deleteFiles(removedFiles) |
|
|
|
|
errs.Add(err) |
|
|
|
|
|
|
|
|
|
if len(chkFileIndices) == len(removedFiles) { |
|
|
|
|
// All files were deleted. Reset the current sequence.
|
|
|
|
|
cdm.evtlPosMtx.Lock() |
|
|
|
|
if err == nil { |
|
|
|
|
cdm.evtlPos.setSeq(0) |
|
|
|
|
} else { |
|
|
|
|
// In case of error, set it to the last file number on the disk that was not deleted.
|
|
|
|
|
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) |
|
|
|
|
} |
|
|
|
|
cdm.evtlPosMtx.Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return errs.Err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error { |
|
|
|
|
// deleteFiles deletes the given file sequences in order of the sequence.
|
|
|
|
|
// In case of an error, it returns the sorted file sequences that were not deleted from the _disk_.
|
|
|
|
|
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) { |
|
|
|
|
sort.Ints(removedFiles) // To delete them in order.
|
|
|
|
|
cdm.readPathMtx.Lock() |
|
|
|
|
for _, seq := range removedFiles { |
|
|
|
|
if err := cdm.closers[seq].Close(); err != nil { |
|
|
|
|
cdm.readPathMtx.Unlock() |
|
|
|
|
return err |
|
|
|
|
return removedFiles, err |
|
|
|
|
} |
|
|
|
|
delete(cdm.mmappedChunkFiles, seq) |
|
|
|
|
delete(cdm.closers, seq) |
|
|
|
@ -886,13 +902,13 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
|
|
|
|
cdm.readPathMtx.Unlock() |
|
|
|
|
|
|
|
|
|
// We actually delete the files separately to not block the readPathMtx for long.
|
|
|
|
|
for _, seq := range removedFiles { |
|
|
|
|
for i, seq := range removedFiles { |
|
|
|
|
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { |
|
|
|
|
return err |
|
|
|
|
return removedFiles[i:], err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
return nil, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
|
|
|
|
@ -907,14 +923,27 @@ func (cdm *ChunkDiskMapper) DeleteCorrupted(originalErr error) error {
|
|
|
|
|
// Delete all the head chunk files following the corrupt head chunk file.
|
|
|
|
|
segs := []int{} |
|
|
|
|
cdm.readPathMtx.RLock() |
|
|
|
|
lastSeq := 0 |
|
|
|
|
for seg := range cdm.mmappedChunkFiles { |
|
|
|
|
if seg >= cerr.FileIndex { |
|
|
|
|
segs = append(segs, seg) |
|
|
|
|
} else if seg > lastSeq { |
|
|
|
|
lastSeq = seg |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
cdm.readPathMtx.RUnlock() |
|
|
|
|
|
|
|
|
|
return cdm.deleteFiles(segs) |
|
|
|
|
pendingDeletes, err := cdm.deleteFiles(segs) |
|
|
|
|
cdm.evtlPosMtx.Lock() |
|
|
|
|
if err == nil { |
|
|
|
|
cdm.evtlPos.setSeq(uint64(lastSeq)) |
|
|
|
|
} else { |
|
|
|
|
// In case of error, set it to the last file number on the disk that was not deleted.
|
|
|
|
|
cdm.evtlPos.setSeq(uint64(pendingDeletes[len(pendingDeletes)-1])) |
|
|
|
|
} |
|
|
|
|
cdm.evtlPosMtx.Unlock() |
|
|
|
|
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Size returns the size of the chunk files.
|
|
|
|
|