// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"hash"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"go.uber.org/atomic"
)
// Head chunk file header fields constants.
const (
// MagicHeadChunks is 4 bytes at the beginning of a head chunk file.
MagicHeadChunks = 0x0130BC91
headChunksFormatV1 = 1
writeBufferSize = 4 * 1024 * 1024 // 4 MiB.
)
var (
// ErrChunkDiskMapperClosed returned by any method indicates
// that the ChunkDiskMapper was closed.
ErrChunkDiskMapperClosed = errors . New ( "ChunkDiskMapper closed" )
)
const (
// MintMaxtSize is the size of the mint/maxt for head chunk file and chunks.
MintMaxtSize = 8
// SeriesRefSize is the size of series reference on disk.
SeriesRefSize = 8
// HeadChunkFileHeaderSize is the total size of the header for the head chunk file.
HeadChunkFileHeaderSize = SegmentHeaderSize
// MaxHeadChunkFileSize is the max size of a head chunk file.
MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB.
// CRCSize is the size of crc32 sum on disk.
CRCSize = 4
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
// Max because the uvarint size can be smaller.
MaxHeadChunkMetaSize = SeriesRefSize + 2 * MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize
)
// corruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Dir string
FileIndex int
Err error
}
func ( e * CorruptionErr ) Error ( ) string {
return errors . Wrapf ( e . Err , "corruption in head chunk file %s" , segmentFile ( e . Dir , e . FileIndex ) ) . Error ( )
}
// ChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type ChunkDiskMapper struct {
curFileNumBytes atomic . Int64 // Bytes written in current open file.
/// Writer.
dir * os . File
curFile * os . File // File being written to.
curFileSequence int // Index of current open file being appended to.
curFileMaxt int64 // Used for the size retention.
byteBuf [ MaxHeadChunkMetaSize ] byte // Buffer used to write the header of the chunk.
chkWriter * bufio . Writer // Writer for the current open file.
crc32 hash . Hash
writePathMtx sync . Mutex
/// Reader.
// The int key in the map is the file number on the disk.
mmappedChunkFiles map [ int ] * mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
closers map [ int ] io . Closer // Closers for resources behind the byte slices.
readPathMtx sync . RWMutex // Mutex used to protect the above 2 maps.
pool chunkenc . Pool // This is used when fetching a chunk from the disk to allocate a chunk.
// Writer and Reader.
// We flush chunks to disk in batches. Hence, we store them in this buffer
// from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer * chunkBuffer
// The total size of bytes in the closed files.
// Needed to calculate the total size of all segments on disk.
size atomic . Int64
// If 'true', it indicated that the maxt of all the on-disk files were set
// after iterating through all the chunks in those files.
fileMaxtSet bool
closed bool
}
type mmappedChunkFile struct {
byteSlice ByteSlice
maxt int64
}
// NewChunkDiskMapper returns a new writer against the given directory
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewChunkDiskMapper ( dir string , pool chunkenc . Pool ) ( * ChunkDiskMapper , error ) {
if err := os . MkdirAll ( dir , 0777 ) ; err != nil {
return nil , err
}
dirFile , err := fileutil . OpenDir ( dir )
if err != nil {
return nil , err
}
m := & ChunkDiskMapper {
dir : dirFile ,
pool : pool ,
crc32 : newCRC32 ( ) ,
chunkBuffer : newChunkBuffer ( ) ,
}
if m . pool == nil {
m . pool = chunkenc . NewPool ( )
}
return m , m . openMMapFiles ( )
}
func ( cdm * ChunkDiskMapper ) openMMapFiles ( ) ( returnErr error ) {
cdm . mmappedChunkFiles = map [ int ] * mmappedChunkFile { }
cdm . closers = map [ int ] io . Closer { }
defer func ( ) {
if returnErr != nil {
var merr tsdb_errors . MultiError
merr . Add ( returnErr )
merr . Add ( closeAllFromMap ( cdm . closers ) )
returnErr = merr . Err ( )
cdm . mmappedChunkFiles = nil
cdm . closers = nil
}
} ( )
files , err := listChunkFiles ( cdm . dir . Name ( ) )
if err != nil {
return err
}
files , err = repairLastChunkFile ( files )
if err != nil {
return err
}
chkFileIndices := make ( [ ] int , 0 , len ( files ) )
for seq , fn := range files {
f , err := fileutil . OpenMmapFile ( fn )
if err != nil {
return errors . Wrapf ( err , "mmap files, file: %s" , fn )
}
cdm . closers [ seq ] = f
cdm . mmappedChunkFiles [ seq ] = & mmappedChunkFile { byteSlice : realByteSlice ( f . Bytes ( ) ) }
chkFileIndices = append ( chkFileIndices , seq )
}
cdm . size . Store ( int64 ( 0 ) )
// Check for gaps in the files.
sort . Ints ( chkFileIndices )
if len ( chkFileIndices ) == 0 {
return nil
}
lastSeq := chkFileIndices [ 0 ]
for _ , seq := range chkFileIndices [ 1 : ] {
if seq != lastSeq + 1 {
return errors . Errorf ( "found unsequential head chunk files %s (index: %d) and %s (index: %d)" , files [ lastSeq ] , lastSeq , files [ seq ] , seq )
}
lastSeq = seq
}
for i , b := range cdm . mmappedChunkFiles {
if b . byteSlice . Len ( ) < HeadChunkFileHeaderSize {
return errors . Wrapf ( errInvalidSize , "%s: invalid head chunk file header" , files [ i ] )
}
// Verify magic number.
if m := binary . BigEndian . Uint32 ( b . byteSlice . Range ( 0 , MagicChunksSize ) ) ; m != MagicHeadChunks {
return errors . Errorf ( "%s: invalid magic number %x" , files [ i ] , m )
}
// Verify chunk format version.
if v := int ( b . byteSlice . Range ( MagicChunksSize , MagicChunksSize + ChunksFormatVersionSize ) [ 0 ] ) ; v != chunksFormatV1 {
return errors . Errorf ( "%s: invalid chunk format version %d" , files [ i ] , v )
}
cdm . size . Add ( int64 ( b . byteSlice . Len ( ) ) )
}
return nil
}
func listChunkFiles ( dir string ) ( map [ int ] string , error ) {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
res := map [ int ] string { }
for _ , fi := range files {
seq , err := strconv . ParseUint ( fi . Name ( ) , 10 , 64 )
if err != nil {
continue
}
res [ int ( seq ) ] = filepath . Join ( dir , fi . Name ( ) )
}
return res , nil
}
// repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these file, we could end
// up with an empty file at the end during an abrupt shutdown.
func repairLastChunkFile ( files map [ int ] string ) ( _ map [ int ] string , returnErr error ) {
lastFile := - 1
for seq := range files {
if seq > lastFile {
lastFile = seq
}
}
if lastFile <= 0 {
return files , nil
}
info , err := os . Stat ( files [ lastFile ] )
if err != nil {
return files , errors . Wrap ( err , "file stat during last head chunk file repair" )
}
if info . Size ( ) == 0 {
// Corrupt file, hence remove it.
if err := os . RemoveAll ( files [ lastFile ] ) ; err != nil {
return files , errors . Wrap ( err , "delete corrupted, empty head chunk file during last file repair" )
}
delete ( files , lastFile )
}
return files , nil
}
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func ( cdm * ChunkDiskMapper ) WriteChunk ( seriesRef uint64 , mint , maxt int64 , chk chunkenc . Chunk ) ( chkRef uint64 , err error ) {
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
if cdm . closed {
return 0 , ErrChunkDiskMapperClosed
}
if cdm . shouldCutNewFile ( len ( chk . Bytes ( ) ) ) {
if err := cdm . cut ( ) ; err != nil {
return 0 , err
}
}
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len ( chk . Bytes ( ) ) + MaxHeadChunkMetaSize < writeBufferSize && cdm . chkWriter . Available ( ) < MaxHeadChunkMetaSize + len ( chk . Bytes ( ) ) {
if err := cdm . flushBuffer ( ) ; err != nil {
return 0 , err
}
}
cdm . crc32 . Reset ( )
bytesWritten := 0
// The upper 4 bytes are for the head chunk file index and
// the lower 4 bytes are for the head chunk file offset where to start reading this chunk.
chkRef = chunkRef ( uint64 ( cdm . curFileSequence ) , uint64 ( cdm . curFileSize ( ) ) )
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , seriesRef )
bytesWritten += SeriesRefSize
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , uint64 ( mint ) )
bytesWritten += MintMaxtSize
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , uint64 ( maxt ) )
bytesWritten += MintMaxtSize
cdm . byteBuf [ bytesWritten ] = byte ( chk . Encoding ( ) )
bytesWritten += ChunkEncodingSize
n := binary . PutUvarint ( cdm . byteBuf [ bytesWritten : ] , uint64 ( len ( chk . Bytes ( ) ) ) )
bytesWritten += n
if err := cdm . writeAndAppendToCRC32 ( cdm . byteBuf [ : bytesWritten ] ) ; err != nil {
return 0 , err
}
if err := cdm . writeAndAppendToCRC32 ( chk . Bytes ( ) ) ; err != nil {
return 0 , err
}
if err := cdm . writeCRC32 ( ) ; err != nil {
return 0 , err
}
if maxt > cdm . curFileMaxt {
cdm . curFileMaxt = maxt
}
cdm . chunkBuffer . put ( chkRef , chk )
if len ( chk . Bytes ( ) ) + MaxHeadChunkMetaSize >= writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm . flushBuffer ( ) ; err != nil {
return 0 , err
}
}
return chkRef , nil
}
func chunkRef ( seq , offset uint64 ) ( chunkRef uint64 ) {
return ( seq << 32 ) | offset
}
// shouldCutNewFile decides the cutting of a new file based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func ( cdm * ChunkDiskMapper ) shouldCutNewFile ( chunkSize int ) bool {
return cdm . curFileSize ( ) == 0 || // First head chunk file.
cdm . curFileSize ( ) + int64 ( chunkSize + MaxHeadChunkMetaSize ) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
}
// CutNewFile creates a new m-mapped file.
func ( cdm * ChunkDiskMapper ) CutNewFile ( ) ( returnErr error ) {
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
return cdm . cut ( )
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
func ( cdm * ChunkDiskMapper ) cut ( ) ( returnErr error ) {
// Sync current tail to disk and close.
if err := cdm . finalizeCurFile ( ) ; err != nil {
return err
}
n , newFile , seq , err := cutSegmentFile ( cdm . dir , MagicHeadChunks , headChunksFormatV1 , HeadChunkFilePreallocationSize )
if err != nil {
return err
}
defer func ( ) {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
if returnErr != nil {
var merr tsdb_errors . MultiError
merr . Add ( returnErr )
merr . Add ( newFile . Close ( ) )
returnErr = merr . Err ( )
}
} ( )
cdm . size . Add ( cdm . curFileSize ( ) )
cdm . curFileNumBytes . Store ( int64 ( n ) )
if cdm . curFile != nil {
cdm . readPathMtx . Lock ( )
cdm . mmappedChunkFiles [ cdm . curFileSequence ] . maxt = cdm . curFileMaxt
cdm . readPathMtx . Unlock ( )
}
mmapFile , err := fileutil . OpenMmapFileWithSize ( newFile . Name ( ) , int ( MaxHeadChunkFileSize ) )
if err != nil {
return err
}
cdm . readPathMtx . Lock ( )
cdm . curFileSequence = seq
cdm . curFile = newFile
if cdm . chkWriter != nil {
cdm . chkWriter . Reset ( newFile )
} else {
cdm . chkWriter = bufio . NewWriterSize ( newFile , writeBufferSize )
}
cdm . closers [ cdm . curFileSequence ] = mmapFile
cdm . mmappedChunkFiles [ cdm . curFileSequence ] = & mmappedChunkFile { byteSlice : realByteSlice ( mmapFile . Bytes ( ) ) }
cdm . readPathMtx . Unlock ( )
cdm . curFileMaxt = 0
return nil
}
// finalizeCurFile writes all pending data to the current tail file,
// truncates its size, and closes it.
func ( cdm * ChunkDiskMapper ) finalizeCurFile ( ) error {
if cdm . curFile == nil {
return nil
}
if err := cdm . flushBuffer ( ) ; err != nil {
return err
}
if err := cdm . curFile . Sync ( ) ; err != nil {
return err
}
return cdm . curFile . Close ( )
}
func ( cdm * ChunkDiskMapper ) write ( b [ ] byte ) error {
n , err := cdm . chkWriter . Write ( b )
cdm . curFileNumBytes . Add ( int64 ( n ) )
return err
}
func ( cdm * ChunkDiskMapper ) writeAndAppendToCRC32 ( b [ ] byte ) error {
if err := cdm . write ( b ) ; err != nil {
return err
}
_ , err := cdm . crc32 . Write ( b )
return err
}
func ( cdm * ChunkDiskMapper ) writeCRC32 ( ) error {
return cdm . write ( cdm . crc32 . Sum ( cdm . byteBuf [ : 0 ] ) )
}
// flushBuffer flushes the current in-memory chunks.
// Assumes that writePathMtx is _write_ locked before calling this method.
func ( cdm * ChunkDiskMapper ) flushBuffer ( ) error {
if err := cdm . chkWriter . Flush ( ) ; err != nil {
return err
}
cdm . chunkBuffer . clear ( )
return nil
}
// Chunk returns a chunk from a given reference.
func ( cdm * ChunkDiskMapper ) Chunk ( ref uint64 ) ( chunkenc . Chunk , error ) {
cdm . readPathMtx . RLock ( )
// We hold this read lock for the entire duration because if the Close()
// is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed.
defer cdm . readPathMtx . RUnlock ( )
var (
// Get the upper 4 bytes.
// These contain the head chunk file index.
sgmIndex = int ( ref >> 32 )
// Get the lower 4 bytes.
// These contain the head chunk file offset where the chunk starts.
// We skip the series ref and the mint/maxt beforehand.
chkStart = int ( ( ref << 32 ) >> 32 ) + SeriesRefSize + ( 2 * MintMaxtSize )
chkCRC32 = newCRC32 ( )
)
if cdm . closed {
return nil , ErrChunkDiskMapperClosed
}
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm . curFileSequence {
chunk := cdm . chunkBuffer . get ( ref )
if chunk != nil {
return chunk , nil
}
}
mmapFile , ok := cdm . mmappedChunkFiles [ sgmIndex ]
if ! ok {
if sgmIndex > cdm . curFileSequence {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : - 1 ,
Err : errors . Errorf ( "head chunk file index %d more than current open file" , sgmIndex ) ,
}
}
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . New ( "head chunk file index %d does not exist on disk" ) ,
}
}
if chkStart + MaxChunkLengthFieldSize > mmapFile . byteSlice . Len ( ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , mmapFile . byteSlice . Len ( ) ) ,
}
}
// Encoding.
chkEnc := mmapFile . byteSlice . Range ( chkStart , chkStart + ChunkEncodingSize ) [ 0 ]
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chkDataLenStart := chkStart + ChunkEncodingSize
c := mmapFile . byteSlice . Range ( chkDataLenStart , chkDataLenStart + MaxChunkLengthFieldSize )
chkDataLen , n := binary . Uvarint ( c )
if n <= 0 {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "reading chunk length failed with %d" , n ) ,
}
}
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int ( chkDataLen )
if chkDataEnd > mmapFile . byteSlice . Len ( ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkDataEnd , mmapFile . byteSlice . Len ( ) ) ,
}
}
// Check the CRC.
sum := mmapFile . byteSlice . Range ( chkDataEnd , chkDataEnd + CRCSize )
if _ , err := chkCRC32 . Write ( mmapFile . byteSlice . Range ( chkStart - ( SeriesRefSize + 2 * MintMaxtSize ) , chkDataEnd ) ) ; err != nil {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : err ,
}
}
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
// The chunk data itself.
chkData := mmapFile . byteSlice . Range ( chkDataEnd - int ( chkDataLen ) , chkDataEnd )
chk , err := cdm . pool . Get ( chunkenc . Encoding ( chkEnc ) , chkData )
if err != nil {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : err ,
}
}
return chk , nil
}
// IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence
// and runs the provided function on each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func ( cdm * ChunkDiskMapper ) IterateAllChunks ( f func ( seriesRef , chunkRef uint64 , mint , maxt int64 , numSamples uint16 ) error ) ( err error ) {
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
defer func ( ) {
cdm . fileMaxtSet = true
} ( )
chkCRC32 := newCRC32 ( )
// Iterate files in ascending order.
segIDs := make ( [ ] int , 0 , len ( cdm . mmappedChunkFiles ) )
for seg := range cdm . mmappedChunkFiles {
segIDs = append ( segIDs , seg )
}
sort . Ints ( segIDs )
for _ , segID := range segIDs {
mmapFile := cdm . mmappedChunkFiles [ segID ]
fileEnd := mmapFile . byteSlice . Len ( )
if segID == cdm . curFileSequence {
fileEnd = int ( cdm . curFileSize ( ) )
}
idx := HeadChunkFileHeaderSize
for idx < fileEnd {
if fileEnd - idx < MaxHeadChunkMetaSize {
// Check for all 0s which marks the end of the file.
allZeros := true
for _ , b := range mmapFile . byteSlice . Range ( idx , fileEnd ) {
if b != byte ( 0 ) {
allZeros = false
break
}
}
if allZeros {
break
}
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d" , idx + MaxHeadChunkMetaSize , fileEnd , segID ) ,
}
}
chkCRC32 . Reset ( )
chunkRef := chunkRef ( uint64 ( segID ) , uint64 ( idx ) )
startIdx := idx
seriesRef := binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + SeriesRefSize ) )
idx += SeriesRefSize
mint := int64 ( binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + MintMaxtSize ) ) )
idx += MintMaxtSize
maxt := int64 ( binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + MintMaxtSize ) ) )
idx += MintMaxtSize
// We preallocate file to help with m-mapping (especially windows systems).
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
// We are not considering possible file corruption that can cause it to be 0.
// Additionally we are checking mint and maxt just to be sure.
if seriesRef == 0 && mint == 0 && maxt == 0 {
break
}
idx += ChunkEncodingSize // Skip encoding.
dataLen , n := binary . Uvarint ( mmapFile . byteSlice . Range ( idx , idx + MaxChunkLengthFieldSize ) )
idx += n
numSamples := binary . BigEndian . Uint16 ( mmapFile . byteSlice . Range ( idx , idx + 2 ) )
idx += int ( dataLen ) // Skip the data.
// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx + CRCSize > fileEnd {
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d" , idx + CRCSize , fileEnd , segID ) ,
}
}
// Check CRC.
sum := mmapFile . byteSlice . Range ( idx , idx + CRCSize )
if _ , err := chkCRC32 . Write ( mmapFile . byteSlice . Range ( startIdx , idx ) ) ; err != nil {
return err
}
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
idx += CRCSize
if maxt > mmapFile . maxt {
mmapFile . maxt = maxt
}
if err := f ( seriesRef , chunkRef , mint , maxt , numSamples ) ; err != nil {
if cerr , ok := err . ( * CorruptionErr ) ; ok {
cerr . Dir = cdm . dir . Name ( )
cerr . FileIndex = segID
return cerr
}
return err
}
}
if idx > fileEnd {
// It should be equal to the slice length.
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d" , idx , fileEnd , segID ) ,
}
}
}
return nil
}
// Truncate deletes the head chunk files which are strictly below the mint.
// mint should be in milliseconds.
func ( cdm * ChunkDiskMapper ) Truncate ( mint int64 ) error {
if ! cdm . fileMaxtSet {
return errors . New ( "maxt of the files are not set" )
}
cdm . readPathMtx . RLock ( )
// Sort the file indices, else if files deletion fails in between,
// it can lead to unsequential files as the map is not sorted.
chkFileIndices := make ( [ ] int , 0 , len ( cdm . mmappedChunkFiles ) )
for seq := range cdm . mmappedChunkFiles {
chkFileIndices = append ( chkFileIndices , seq )
}
sort . Ints ( chkFileIndices )
var removedFiles [ ] int
for _ , seq := range chkFileIndices {
if seq == cdm . curFileSequence || cdm . mmappedChunkFiles [ seq ] . maxt >= mint {
break
}
if cdm . mmappedChunkFiles [ seq ] . maxt < mint {
removedFiles = append ( removedFiles , seq )
}
}
cdm . readPathMtx . RUnlock ( )
var merr tsdb_errors . MultiError
// Cut a new file only if the current file has some chunks.
if cdm . curFileSize ( ) > HeadChunkFileHeaderSize {
merr . Add ( cdm . CutNewFile ( ) )
}
merr . Add ( cdm . deleteFiles ( removedFiles ) )
return merr . Err ( )
}
func ( cdm * ChunkDiskMapper ) deleteFiles ( removedFiles [ ] int ) error {
cdm . readPathMtx . Lock ( )
for _ , seq := range removedFiles {
if err := cdm . closers [ seq ] . Close ( ) ; err != nil {
cdm . readPathMtx . Unlock ( )
return err
}
cdm . size . Sub ( int64 ( cdm . mmappedChunkFiles [ seq ] . byteSlice . Len ( ) ) )
delete ( cdm . mmappedChunkFiles , seq )
delete ( cdm . closers , seq )
}
cdm . readPathMtx . Unlock ( )
// We actually delete the files separately to not block the readPathMtx for long.
for _ , seq := range removedFiles {
if err := os . Remove ( segmentFile ( cdm . dir . Name ( ) , seq ) ) ; err != nil {
return err
}
}
return nil
}
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func ( cdm * ChunkDiskMapper ) DeleteCorrupted ( originalErr error ) error {
err := errors . Cause ( originalErr ) // So that we can pick up errors even if wrapped.
cerr , ok := err . ( * CorruptionErr )
if ! ok {
return errors . Wrap ( originalErr , "cannot handle error" )
}
// Delete all the head chunk files following the corrupt head chunk file.
segs := [ ] int { }
cdm . readPathMtx . RLock ( )
for seg := range cdm . mmappedChunkFiles {
if seg >= cerr . FileIndex {
segs = append ( segs , seg )
}
}
cdm . readPathMtx . RUnlock ( )
return cdm . deleteFiles ( segs )
}
// Size returns the size of the chunk files.
func ( cdm * ChunkDiskMapper ) Size ( ) int64 {
return cdm . size . Load ( ) + cdm . curFileSize ( )
}
func ( cdm * ChunkDiskMapper ) curFileSize ( ) int64 {
return cdm . curFileNumBytes . Load ( )
}
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func ( cdm * ChunkDiskMapper ) Close ( ) error {
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
cdm . readPathMtx . Lock ( )
defer cdm . readPathMtx . Unlock ( )
if cdm . closed {
return nil
}
cdm . closed = true
var merr tsdb_errors . MultiError
merr . Add ( closeAllFromMap ( cdm . closers ) )
merr . Add ( cdm . finalizeCurFile ( ) )
merr . Add ( cdm . dir . Close ( ) )
cdm . mmappedChunkFiles = map [ int ] * mmappedChunkFile { }
cdm . closers = map [ int ] io . Closer { }
return merr . Err ( )
}
func closeAllFromMap ( cs map [ int ] io . Closer ) error {
var merr tsdb_errors . MultiError
for _ , c := range cs {
merr . Add ( c . Close ( ) )
}
return merr . Err ( )
}
const inBufferShards = 128 // 128 is a randomly chosen number.
// chunkBuffer is a thread safe buffer for chunks.
type chunkBuffer struct {
inBufferChunks [ inBufferShards ] map [ uint64 ] chunkenc . Chunk
inBufferChunksMtxs [ inBufferShards ] sync . RWMutex
}
func newChunkBuffer ( ) * chunkBuffer {
cb := & chunkBuffer { }
for i := 0 ; i < inBufferShards ; i ++ {
cb . inBufferChunks [ i ] = make ( map [ uint64 ] chunkenc . Chunk )
}
return cb
}
func ( cb * chunkBuffer ) put ( ref uint64 , chk chunkenc . Chunk ) {
shardIdx := ref % inBufferShards
cb . inBufferChunksMtxs [ shardIdx ] . Lock ( )
cb . inBufferChunks [ shardIdx ] [ ref ] = chk
cb . inBufferChunksMtxs [ shardIdx ] . Unlock ( )
}
func ( cb * chunkBuffer ) get ( ref uint64 ) chunkenc . Chunk {
shardIdx := ref % inBufferShards
cb . inBufferChunksMtxs [ shardIdx ] . RLock ( )
defer cb . inBufferChunksMtxs [ shardIdx ] . RUnlock ( )
return cb . inBufferChunks [ shardIdx ] [ ref ]
}
func ( cb * chunkBuffer ) clear ( ) {
for i := 0 ; i < inBufferShards ; i ++ {
cb . inBufferChunksMtxs [ i ] . Lock ( )
cb . inBufferChunks [ i ] = make ( map [ uint64 ] chunkenc . Chunk )
cb . inBufferChunksMtxs [ i ] . Unlock ( )
}
}