@ -17,6 +17,8 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
"os"
@ -25,7 +27,6 @@ import (
"sync"
"github.com/dennwc/varint"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
@ -107,7 +108,7 @@ type CorruptionErr struct {
}
func ( e * CorruptionErr ) Error ( ) string {
return errors . Wrapf ( e . Err , "corruption in head chunk file %s ", segmentFile ( e . Dir , e . FileIndex ) ) . Error ( )
return fmt . Errorf ( "corruption in head chunk file %s: %w ", segmentFile ( e . Dir , e . FileIndex ) , e . Err ) . Error ( )
}
// chunkPos keeps track of the position in the head chunk files.
@ -240,10 +241,10 @@ type mmappedChunkFile struct {
func NewChunkDiskMapper ( reg prometheus . Registerer , dir string , pool chunkenc . Pool , writeBufferSize , writeQueueSize int ) ( * ChunkDiskMapper , error ) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil , errors . Errorf ( "ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)" , MinWriteBufferSize , MaxWriteBufferSize , writeBufferSize )
return nil , fmt . Errorf ( "ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)" , MinWriteBufferSize , MaxWriteBufferSize , writeBufferSize )
}
if writeBufferSize % 1024 != 0 {
return nil , errors . Errorf ( "ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)" , writeBufferSize )
return nil , fmt . Errorf ( "ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)" , writeBufferSize )
}
if err := os . MkdirAll ( dir , 0 o777 ) ; err != nil {
@ -320,7 +321,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
for seq , fn := range files {
f , err := fileutil . OpenMmapFile ( fn )
if err != nil {
return errors . Wrapf ( err , "mmap files, file: %s" , fn )
return fmt . Errorf ( "mmap files, file: %s: %w" , fn , err )
}
cdm . closers [ seq ] = f
cdm . mmappedChunkFiles [ seq ] = & mmappedChunkFile { byteSlice : realByteSlice ( f . Bytes ( ) ) }
@ -335,23 +336,23 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
lastSeq := chkFileIndices [ 0 ]
for _ , seq := range chkFileIndices [ 1 : ] {
if seq != lastSeq + 1 {
return errors . Errorf ( "found unsequential head chunk files %s (index: %d) and %s (index: %d)" , files [ lastSeq ] , lastSeq , files [ seq ] , seq )
return fmt . Errorf ( "found unsequential head chunk files %s (index: %d) and %s (index: %d)" , files [ lastSeq ] , lastSeq , files [ seq ] , seq )
}
lastSeq = seq
}
for i , b := range cdm . mmappedChunkFiles {
if b . byteSlice . Len ( ) < HeadChunkFileHeaderSize {
return errors . Wrapf ( errInvalidSize , "%s: invalid head chunk file header ", files [ i ] )
return fmt . Errorf ( "%s: invalid head chunk file header: %w ", files [ i ] , errInvalidSize )
}
// Verify magic number.
if m := binary . BigEndian . Uint32 ( b . byteSlice . Range ( 0 , MagicChunksSize ) ) ; m != MagicHeadChunks {
return errors . Errorf ( "%s: invalid magic number %x" , files [ i ] , m )
return fmt . Errorf ( "%s: invalid magic number %x" , files [ i ] , m )
}
// Verify chunk format version.
if v := int ( b . byteSlice . Range ( MagicChunksSize , MagicChunksSize + ChunksFormatVersionSize ) [ 0 ] ) ; v != chunksFormatV1 {
return errors . Errorf ( "%s: invalid chunk format version %d" , files [ i ] , v )
return fmt . Errorf ( "%s: invalid chunk format version %d" , files [ i ] , v )
}
}
@ -394,16 +395,16 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
f , err := os . Open ( files [ lastFile ] )
if err != nil {
return files , errors . Wrap ( err , "open file during last head chunk file repair" )
return files , fmt . Errorf ( "open file during last head chunk file repair: %w " , err )
}
buf := make ( [ ] byte , MagicChunksSize )
size , err := f . Read ( buf )
if err != nil && err != io . EOF {
return files , errors . Wrap ( err , "failed to read magic number during last head chunk file repair" )
return files , fmt . Errorf ( "failed to read magic number during last head chunk file repair: %w " , err )
}
if err := f . Close ( ) ; err != nil {
return files , errors . Wrap ( err , "close file during last head chunk file repair" )
return files , fmt . Errorf ( "close file during last head chunk file repair: %w " , err )
}
// We either don't have enough bytes for the magic number or the magic number is 0.
@ -413,7 +414,7 @@ func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr erro
if size < MagicChunksSize || binary . BigEndian . Uint32 ( buf ) == 0 {
// Corrupt file, hence remove it.
if err := os . RemoveAll ( files [ lastFile ] ) ; err != nil {
return files , errors . Wrap ( err , "delete corrupted, empty head chunk file during last file repair" )
return files , fmt . Errorf ( "delete corrupted, empty head chunk file during last file repair: %w " , err )
}
delete ( files , lastFile )
}
@ -559,7 +560,7 @@ func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err erro
}
if expSeq , expOffset := chkRef . Unpack ( ) ; seq != expSeq || offset != expOffset {
return errors . Errorf ( "expected newly cut file to have sequence:offset %d:%d, got %d:%d" , expSeq , expOffset , seq , offset )
return fmt . Errorf ( "expected newly cut file to have sequence:offset %d:%d, got %d:%d" , expSeq , expOffset , seq , offset )
}
return nil
@ -701,13 +702,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : - 1 ,
Err : errors . Errorf ( "head chunk file index %d more than current open file" , sgmIndex ) ,
Err : fmt . Errorf ( "head chunk file index %d more than current open file" , sgmIndex ) ,
}
}
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file index %d does not exist on disk" , sgmIndex ) ,
Err : fmt . Errorf ( "head chunk file index %d does not exist on disk" , sgmIndex ) ,
}
}
@ -715,7 +716,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , mmapFile . byteSlice . Len ( ) ) ,
Err : fmt . Errorf ( "head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , mmapFile . byteSlice . Len ( ) ) ,
}
}
@ -734,7 +735,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "reading chunk length failed with %d" , n ) ,
Err : fmt . Errorf ( "reading chunk length failed with %d" , n ) ,
}
}
@ -744,7 +745,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkDataEnd , mmapFile . byteSlice . Len ( ) ) ,
Err : fmt . Errorf ( "head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkDataEnd , mmapFile . byteSlice . Len ( ) ) ,
}
}
@ -761,7 +762,7 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
Err : fmt . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
@ -829,7 +830,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file has some unread data, but doesn't include enough bytes to read the chunk header" +
Err : fmt . Errorf ( "head chunk file has some unread data, but doesn't include enough bytes to read the chunk header" +
" - required:%v, available:%v, file:%d" , idx + MaxHeadChunkMetaSize , fileEnd , segID ) ,
}
}
@ -866,7 +867,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d" , idx + CRCSize , fileEnd , segID ) ,
Err : fmt . Errorf ( "head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d" , idx + CRCSize , fileEnd , segID ) ,
}
}
@ -879,7 +880,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
Err : fmt . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
idx += CRCSize
@ -905,7 +906,7 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d" , idx , fileEnd , segID ) ,
Err : fmt . Errorf ( "head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d" , idx , fileEnd , segID ) ,
}
}
}
@ -998,10 +999,9 @@ func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) ([]int, error) {
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func ( cdm * ChunkDiskMapper ) DeleteCorrupted ( originalErr error ) error {
err := errors . Cause ( originalErr ) // So that we can pick up errors even if wrapped.
cerr , ok := err . ( * CorruptionErr )
if ! ok {
return errors . Wrap ( originalErr , "cannot handle error" )
var cerr * CorruptionErr
if ! errors . As ( originalErr , & cerr ) {
return fmt . Errorf ( "cannot handle error: %w" , originalErr )
}
// Delete all the head chunk files following the corrupt head chunk file.