// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"hash"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"github.com/dennwc/varint"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// Head chunk file header fields constants.
const (
// MagicHeadChunks is 4 bytes at the beginning of a head chunk file.
MagicHeadChunks = 0x0130BC91
headChunksFormatV1 = 1
)
// ErrChunkDiskMapperClosed returned by any method indicates
// that the ChunkDiskMapper was closed.
var ErrChunkDiskMapperClosed = errors . New ( "ChunkDiskMapper closed" )
const (
// MintMaxtSize is the size of the mint/maxt for head chunk file and chunks.
MintMaxtSize = 8
// SeriesRefSize is the size of series reference on disk.
SeriesRefSize = 8
// HeadChunkFileHeaderSize is the total size of the header for a head chunk file.
HeadChunkFileHeaderSize = SegmentHeaderSize
// MaxHeadChunkFileSize is the max size of a head chunk file.
MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB.
// CRCSize is the size of crc32 sum on disk.
CRCSize = 4
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
// Max because the uvarint size can be smaller.
MaxHeadChunkMetaSize = SeriesRefSize + 2 * MintMaxtSize + ChunkEncodingSize + MaxChunkLengthFieldSize + CRCSize
// MinWriteBufferSize is the minimum write buffer size allowed.
MinWriteBufferSize = 64 * 1024 // 64KB.
// MaxWriteBufferSize is the maximum write buffer size allowed.
MaxWriteBufferSize = 8 * 1024 * 1024 // 8 MiB.
// DefaultWriteBufferSize is the default write buffer size.
DefaultWriteBufferSize = 4 * 1024 * 1024 // 4 MiB.
// DefaultWriteQueueSize is the default size of the in-memory queue used before flushing chunks to the disk.
DefaultWriteQueueSize = 1000
)
// ChunkDiskMapperRef represents the location of a head chunk on disk.
// The upper 4 bytes hold the index of the head chunk file and
// the lower 4 bytes hold the byte offset in the head chunk file where the chunk starts.
type ChunkDiskMapperRef uint64
func newChunkDiskMapperRef ( seq , offset uint64 ) ChunkDiskMapperRef {
return ChunkDiskMapperRef ( ( seq << 32 ) | offset )
}
func ( ref ChunkDiskMapperRef ) Unpack ( ) ( seq , offset int ) {
seq = int ( ref >> 32 )
offset = int ( ( ref << 32 ) >> 32 )
return seq , offset
}
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
Dir string
FileIndex int
Err error
}
func ( e * CorruptionErr ) Error ( ) string {
return errors . Wrapf ( e . Err , "corruption in head chunk file %s" , segmentFile ( e . Dir , e . FileIndex ) ) . Error ( )
}
// chunkPos keeps track of the position in the head chunk files.
// chunkPos is not thread-safe, a lock must be used to protect it.
type chunkPos struct {
seq uint64 // Index of chunk file.
offset uint64 // Offset within chunk file.
cutFile bool // When true then the next chunk will be written to a new file.
}
// getNextChunkRef takes a chunk and returns the chunk reference which will refer to it once it has been written.
// getNextChunkRef also decides whether a new file should be cut before writing this chunk, and it returns the decision via the second return value.
// The order of calling getNextChunkRef must be the order in which chunks are written to the disk.
func ( f * chunkPos ) getNextChunkRef ( chk chunkenc . Chunk ) ( chkRef ChunkDiskMapperRef , cutFile bool ) {
chkLen := uint64 ( len ( chk . Bytes ( ) ) )
bytesToWrite := f . bytesToWriteForChunk ( chkLen )
if f . shouldCutNewFile ( bytesToWrite ) {
f . toNewFile ( )
f . cutFile = false
cutFile = true
}
chkOffset := f . offset
f . offset += bytesToWrite
return newChunkDiskMapperRef ( f . seq , chkOffset ) , cutFile
}
// toNewFile updates the seq/offset position to point to the beginning of a new chunk file.
func ( f * chunkPos ) toNewFile ( ) {
f . seq ++
f . offset = SegmentHeaderSize
}
// cutFileOnNextChunk triggers that the next chunk will be written in to a new file.
// Not thread safe, a lock must be held when calling this.
func ( f * chunkPos ) cutFileOnNextChunk ( ) {
f . cutFile = true
}
// setSeq sets the sequence number of the head chunk file.
func ( f * chunkPos ) setSeq ( seq uint64 ) {
f . seq = seq
}
// shouldCutNewFile returns whether a new file should be cut based on the file size.
// Not thread safe, a lock must be held when calling this.
func ( f * chunkPos ) shouldCutNewFile ( bytesToWrite uint64 ) bool {
if f . cutFile {
return true
}
return f . offset == 0 || // First head chunk file.
f . offset + bytesToWrite > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
}
// bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size,
// including all meta data before and after the chunk data.
// Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk
func ( f * chunkPos ) bytesToWriteForChunk ( chkLen uint64 ) uint64 {
// Headers.
bytes := uint64 ( SeriesRefSize ) + 2 * MintMaxtSize + ChunkEncodingSize
// Size of chunk length encoded as uvarint.
bytes += uint64 ( varint . UvarintSize ( chkLen ) )
// Chunk length.
bytes += chkLen
// crc32.
bytes += CRCSize
return bytes
}
// ChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type ChunkDiskMapper struct {
/// Writer.
dir * os . File
writeBufferSize int
curFile * os . File // File being written to.
curFileSequence int // Index of current open file being appended to. 0 if no file is active.
curFileOffset atomic . Uint64 // Bytes written in current open file.
curFileMaxt int64 // Used for the size retention.
// The values in evtlPos represent the file position which will eventually be
// reached once the content of the write queue has been fully processed.
evtlPosMtx sync . Mutex
evtlPos chunkPos
byteBuf [ MaxHeadChunkMetaSize ] byte // Buffer used to write the header of the chunk.
chkWriter * bufio . Writer // Writer for the current open file.
crc32 hash . Hash
writePathMtx sync . Mutex
/// Reader.
// The int key in the map is the file number on the disk.
mmappedChunkFiles map [ int ] * mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
closers map [ int ] io . Closer // Closers for resources behind the byte slices.
readPathMtx sync . RWMutex // Mutex used to protect the above 2 maps.
pool chunkenc . Pool // This is used when fetching a chunk from the disk to allocate a chunk.
// Writer and Reader.
// We flush chunks to disk in batches. Hence, we store them in this buffer
// from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer * chunkBuffer
// Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool
writeQueue * chunkWriteQueue
closed bool
}
// mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks.
type mmappedChunkFile struct {
byteSlice ByteSlice
maxt int64 // Max timestamp among all of this file's chunks.
}
// NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewChunkDiskMapper ( reg prometheus . Registerer , dir string , pool chunkenc . Pool , writeBufferSize , writeQueueSize int ) ( * ChunkDiskMapper , error ) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil , errors . Errorf ( "ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)" , MinWriteBufferSize , MaxWriteBufferSize , writeBufferSize )
}
if writeBufferSize % 1024 != 0 {
return nil , errors . Errorf ( "ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)" , writeBufferSize )
}
if err := os . MkdirAll ( dir , 0 o777 ) ; err != nil {
return nil , err
}
dirFile , err := fileutil . OpenDir ( dir )
if err != nil {
return nil , err
}
m := & ChunkDiskMapper {
dir : dirFile ,
pool : pool ,
writeBufferSize : writeBufferSize ,
crc32 : newCRC32 ( ) ,
chunkBuffer : newChunkBuffer ( ) ,
}
m . writeQueue = newChunkWriteQueue ( reg , writeQueueSize , m . writeChunk )
if m . pool == nil {
m . pool = chunkenc . NewPool ( )
}
return m , m . openMMapFiles ( )
}
// openMMapFiles opens all files within dir for mmapping.
func ( cdm * ChunkDiskMapper ) openMMapFiles ( ) ( returnErr error ) {
cdm . mmappedChunkFiles = map [ int ] * mmappedChunkFile { }
cdm . closers = map [ int ] io . Closer { }
defer func ( ) {
if returnErr != nil {
returnErr = tsdb_errors . NewMulti ( returnErr , closeAllFromMap ( cdm . closers ) ) . Err ( )
cdm . mmappedChunkFiles = nil
cdm . closers = nil
}
} ( )
files , err := listChunkFiles ( cdm . dir . Name ( ) )
if err != nil {
return err
}
files , err = repairLastChunkFile ( files )
if err != nil {
return err
}
chkFileIndices := make ( [ ] int , 0 , len ( files ) )
for seq , fn := range files {
f , err := fileutil . OpenMmapFile ( fn )
if err != nil {
return errors . Wrapf ( err , "mmap files, file: %s" , fn )
}
cdm . closers [ seq ] = f
cdm . mmappedChunkFiles [ seq ] = & mmappedChunkFile { byteSlice : realByteSlice ( f . Bytes ( ) ) }
chkFileIndices = append ( chkFileIndices , seq )
}
// Check for gaps in the files.
sort . Ints ( chkFileIndices )
if len ( chkFileIndices ) == 0 {
return nil
}
lastSeq := chkFileIndices [ 0 ]
for _ , seq := range chkFileIndices [ 1 : ] {
if seq != lastSeq + 1 {
return errors . Errorf ( "found unsequential head chunk files %s (index: %d) and %s (index: %d)" , files [ lastSeq ] , lastSeq , files [ seq ] , seq )
}
lastSeq = seq
}
for i , b := range cdm . mmappedChunkFiles {
if b . byteSlice . Len ( ) < HeadChunkFileHeaderSize {
return errors . Wrapf ( errInvalidSize , "%s: invalid head chunk file header" , files [ i ] )
}
// Verify magic number.
if m := binary . BigEndian . Uint32 ( b . byteSlice . Range ( 0 , MagicChunksSize ) ) ; m != MagicHeadChunks {
return errors . Errorf ( "%s: invalid magic number %x" , files [ i ] , m )
}
// Verify chunk format version.
if v := int ( b . byteSlice . Range ( MagicChunksSize , MagicChunksSize + ChunksFormatVersionSize ) [ 0 ] ) ; v != chunksFormatV1 {
return errors . Errorf ( "%s: invalid chunk format version %d" , files [ i ] , v )
}
}
cdm . evtlPos . setSeq ( uint64 ( lastSeq ) )
return nil
}
func listChunkFiles ( dir string ) ( map [ int ] string , error ) {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
res := map [ int ] string { }
for _ , fi := range files {
seq , err := strconv . ParseUint ( fi . Name ( ) , 10 , 64 )
if err != nil {
continue
}
res [ int ( seq ) ] = filepath . Join ( dir , fi . Name ( ) )
}
return res , nil
}
// repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these files, we could end
// up with an empty file at the end during an abrupt shutdown.
func repairLastChunkFile ( files map [ int ] string ) ( _ map [ int ] string , returnErr error ) {
lastFile := - 1
for seq := range files {
if seq > lastFile {
lastFile = seq
}
}
if lastFile <= 0 {
return files , nil
}
info , err := os . Stat ( files [ lastFile ] )
if err != nil {
return files , errors . Wrap ( err , "file stat during last head chunk file repair" )
}
if info . Size ( ) == 0 {
// Corrupt file, hence remove it.
if err := os . RemoveAll ( files [ lastFile ] ) ; err != nil {
return files , errors . Wrap ( err , "delete corrupted, empty head chunk file during last file repair" )
}
delete ( files , lastFile )
}
return files , nil
}
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func ( cdm * ChunkDiskMapper ) WriteChunk ( seriesRef HeadSeriesRef , mint , maxt int64 , chk chunkenc . Chunk , callback func ( err error ) ) ( chkRef ChunkDiskMapperRef ) {
var err error
defer func ( ) {
if err != nil && callback != nil {
callback ( err )
}
} ( )
// cdm.evtlPosMtx must be held to serialize the calls to .getNextChunkRef() and .addJob().
cdm . evtlPosMtx . Lock ( )
defer cdm . evtlPosMtx . Unlock ( )
ref , cutFile := cdm . evtlPos . getNextChunkRef ( chk )
err = cdm . writeQueue . addJob ( chunkWriteJob {
cutFile : cutFile ,
seriesRef : seriesRef ,
mint : mint ,
maxt : maxt ,
chk : chk ,
ref : ref ,
callback : callback ,
} )
return ref
}
func ( cdm * ChunkDiskMapper ) writeChunk ( seriesRef HeadSeriesRef , mint , maxt int64 , chk chunkenc . Chunk , ref ChunkDiskMapperRef , cutFile bool ) ( err error ) {
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
if cdm . closed {
return ErrChunkDiskMapperClosed
}
if cutFile {
err := cdm . cutAndExpectRef ( ref )
if err != nil {
return err
}
}
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len ( chk . Bytes ( ) ) + MaxHeadChunkMetaSize < cdm . writeBufferSize && cdm . chkWriter . Available ( ) < MaxHeadChunkMetaSize + len ( chk . Bytes ( ) ) {
if err := cdm . flushBuffer ( ) ; err != nil {
return err
}
}
cdm . crc32 . Reset ( )
bytesWritten := 0
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , uint64 ( seriesRef ) )
bytesWritten += SeriesRefSize
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , uint64 ( mint ) )
bytesWritten += MintMaxtSize
binary . BigEndian . PutUint64 ( cdm . byteBuf [ bytesWritten : ] , uint64 ( maxt ) )
bytesWritten += MintMaxtSize
cdm . byteBuf [ bytesWritten ] = byte ( chk . Encoding ( ) )
bytesWritten += ChunkEncodingSize
n := binary . PutUvarint ( cdm . byteBuf [ bytesWritten : ] , uint64 ( len ( chk . Bytes ( ) ) ) )
bytesWritten += n
if err := cdm . writeAndAppendToCRC32 ( cdm . byteBuf [ : bytesWritten ] ) ; err != nil {
return err
}
if err := cdm . writeAndAppendToCRC32 ( chk . Bytes ( ) ) ; err != nil {
return err
}
if err := cdm . writeCRC32 ( ) ; err != nil {
return err
}
if maxt > cdm . curFileMaxt {
cdm . curFileMaxt = maxt
}
cdm . chunkBuffer . put ( ref , chk )
if len ( chk . Bytes ( ) ) + MaxHeadChunkMetaSize >= cdm . writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm . flushBuffer ( ) ; err != nil {
return err
}
}
return nil
}
// CutNewFile makes that a new file will be created the next time a chunk is written.
func ( cdm * ChunkDiskMapper ) CutNewFile ( ) {
cdm . evtlPosMtx . Lock ( )
defer cdm . evtlPosMtx . Unlock ( )
cdm . evtlPos . cutFileOnNextChunk ( )
}
func ( cdm * ChunkDiskMapper ) IsQueueEmpty ( ) bool {
return cdm . writeQueue . queueIsEmpty ( )
}
// cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors.
func ( cdm * ChunkDiskMapper ) cutAndExpectRef ( chkRef ChunkDiskMapperRef ) ( err error ) {
seq , offset , err := cdm . cut ( )
if err != nil {
return err
}
if expSeq , expOffset := chkRef . Unpack ( ) ; seq != expSeq || offset != expOffset {
return errors . Errorf ( "expected newly cut file to have sequence:offset %d:%d, got %d:%d" , expSeq , expOffset , seq , offset )
}
return nil
}
// cut creates a new m-mapped file. The write lock should be held before calling this.
// It returns the file sequence and the offset in that file to start writing chunks.
func ( cdm * ChunkDiskMapper ) cut ( ) ( seq , offset int , returnErr error ) {
// Sync current tail to disk and close.
if err := cdm . finalizeCurFile ( ) ; err != nil {
return 0 , 0 , err
}
offset , newFile , seq , err := cutSegmentFile ( cdm . dir , MagicHeadChunks , headChunksFormatV1 , HeadChunkFilePreallocationSize )
if err != nil {
return 0 , 0 , err
}
defer func ( ) {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
if returnErr != nil {
returnErr = tsdb_errors . NewMulti ( returnErr , newFile . Close ( ) ) . Err ( )
}
} ( )
cdm . curFileOffset . Store ( uint64 ( offset ) )
if cdm . curFile != nil {
cdm . readPathMtx . Lock ( )
cdm . mmappedChunkFiles [ cdm . curFileSequence ] . maxt = cdm . curFileMaxt
cdm . readPathMtx . Unlock ( )
}
mmapFile , err := fileutil . OpenMmapFileWithSize ( newFile . Name ( ) , MaxHeadChunkFileSize )
if err != nil {
return 0 , 0 , err
}
cdm . readPathMtx . Lock ( )
cdm . curFileSequence = seq
cdm . curFile = newFile
if cdm . chkWriter != nil {
cdm . chkWriter . Reset ( newFile )
} else {
cdm . chkWriter = bufio . NewWriterSize ( newFile , cdm . writeBufferSize )
}
cdm . closers [ cdm . curFileSequence ] = mmapFile
cdm . mmappedChunkFiles [ cdm . curFileSequence ] = & mmappedChunkFile { byteSlice : realByteSlice ( mmapFile . Bytes ( ) ) }
cdm . readPathMtx . Unlock ( )
cdm . curFileMaxt = 0
return seq , offset , nil
}
// finalizeCurFile writes all pending data to the current tail file,
// truncates its size, and closes it.
func ( cdm * ChunkDiskMapper ) finalizeCurFile ( ) error {
if cdm . curFile == nil {
return nil
}
if err := cdm . flushBuffer ( ) ; err != nil {
return err
}
if err := cdm . curFile . Sync ( ) ; err != nil {
return err
}
return cdm . curFile . Close ( )
}
func ( cdm * ChunkDiskMapper ) write ( b [ ] byte ) error {
n , err := cdm . chkWriter . Write ( b )
cdm . curFileOffset . Add ( uint64 ( n ) )
return err
}
func ( cdm * ChunkDiskMapper ) writeAndAppendToCRC32 ( b [ ] byte ) error {
if err := cdm . write ( b ) ; err != nil {
return err
}
_ , err := cdm . crc32 . Write ( b )
return err
}
func ( cdm * ChunkDiskMapper ) writeCRC32 ( ) error {
return cdm . write ( cdm . crc32 . Sum ( cdm . byteBuf [ : 0 ] ) )
}
// flushBuffer flushes the current in-memory chunks.
// Assumes that writePathMtx is _write_ locked before calling this method.
func ( cdm * ChunkDiskMapper ) flushBuffer ( ) error {
if err := cdm . chkWriter . Flush ( ) ; err != nil {
return err
}
cdm . chunkBuffer . clear ( )
return nil
}
// Chunk returns a chunk from a given reference.
func ( cdm * ChunkDiskMapper ) Chunk ( ref ChunkDiskMapperRef ) ( chunkenc . Chunk , error ) {
cdm . readPathMtx . RLock ( )
// We hold this read lock for the entire duration because if Close()
// is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed.
defer cdm . readPathMtx . RUnlock ( )
if cdm . closed {
return nil , ErrChunkDiskMapperClosed
}
chunk := cdm . writeQueue . get ( ref )
if chunk != nil {
return chunk , nil
}
sgmIndex , chkStart := ref . Unpack ( )
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + ( 2 * MintMaxtSize )
chkCRC32 := newCRC32 ( )
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm . curFileSequence {
chunk := cdm . chunkBuffer . get ( ref )
if chunk != nil {
return chunk , nil
}
}
mmapFile , ok := cdm . mmappedChunkFiles [ sgmIndex ]
if ! ok {
if sgmIndex > cdm . curFileSequence {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : - 1 ,
Err : errors . Errorf ( "head chunk file index %d more than current open file" , sgmIndex ) ,
}
}
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . New ( "head chunk file index %d does not exist on disk" ) ,
}
}
if chkStart + MaxChunkLengthFieldSize > mmapFile . byteSlice . Len ( ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , mmapFile . byteSlice . Len ( ) ) ,
}
}
// Encoding.
chkEnc := mmapFile . byteSlice . Range ( chkStart , chkStart + ChunkEncodingSize ) [ 0 ]
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chkDataLenStart := chkStart + ChunkEncodingSize
c := mmapFile . byteSlice . Range ( chkDataLenStart , chkDataLenStart + MaxChunkLengthFieldSize )
chkDataLen , n := binary . Uvarint ( c )
if n <= 0 {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "reading chunk length failed with %d" , n ) ,
}
}
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int ( chkDataLen )
if chkDataEnd > mmapFile . byteSlice . Len ( ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkDataEnd , mmapFile . byteSlice . Len ( ) ) ,
}
}
// Check the CRC.
sum := mmapFile . byteSlice . Range ( chkDataEnd , chkDataEnd + CRCSize )
if _ , err := chkCRC32 . Write ( mmapFile . byteSlice . Range ( chkStart - ( SeriesRefSize + 2 * MintMaxtSize ) , chkDataEnd ) ) ; err != nil {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : err ,
}
}
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
// The chunk data itself.
chkData := mmapFile . byteSlice . Range ( chkDataEnd - int ( chkDataLen ) , chkDataEnd )
// Make a copy of the chunk data to prevent a panic occurring because the returned
// chunk data slice references an mmap-ed file which could be closed after the
// function returns but while the chunk is still in use.
chkDataCopy := make ( [ ] byte , len ( chkData ) )
copy ( chkDataCopy , chkData )
chk , err := cdm . pool . Get ( chunkenc . Encoding ( chkEnc ) , chkDataCopy )
if err != nil {
return nil , & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : sgmIndex ,
Err : err ,
}
}
return chk , nil
}
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
// and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func ( cdm * ChunkDiskMapper ) IterateAllChunks ( f func ( seriesRef HeadSeriesRef , chunkRef ChunkDiskMapperRef , mint , maxt int64 , numSamples uint16 ) error ) ( err error ) {
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
defer func ( ) {
cdm . fileMaxtSet = true
} ( )
chkCRC32 := newCRC32 ( )
// Iterate files in ascending order.
segIDs := make ( [ ] int , 0 , len ( cdm . mmappedChunkFiles ) )
for seg := range cdm . mmappedChunkFiles {
segIDs = append ( segIDs , seg )
}
sort . Ints ( segIDs )
for _ , segID := range segIDs {
mmapFile := cdm . mmappedChunkFiles [ segID ]
fileEnd := mmapFile . byteSlice . Len ( )
if segID == cdm . curFileSequence {
fileEnd = int ( cdm . curFileSize ( ) )
}
idx := HeadChunkFileHeaderSize
for idx < fileEnd {
if fileEnd - idx < MaxHeadChunkMetaSize {
// Check for all 0s which marks the end of the file.
allZeros := true
for _ , b := range mmapFile . byteSlice . Range ( idx , fileEnd ) {
if b != byte ( 0 ) {
allZeros = false
break
}
}
if allZeros {
// End of segment chunk file content.
break
}
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file has some unread data, but doesn't include enough bytes to read the chunk header" +
" - required:%v, available:%v, file:%d" , idx + MaxHeadChunkMetaSize , fileEnd , segID ) ,
}
}
chkCRC32 . Reset ( )
chunkRef := newChunkDiskMapperRef ( uint64 ( segID ) , uint64 ( idx ) )
startIdx := idx
seriesRef := HeadSeriesRef ( binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + SeriesRefSize ) ) )
idx += SeriesRefSize
mint := int64 ( binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + MintMaxtSize ) ) )
idx += MintMaxtSize
maxt := int64 ( binary . BigEndian . Uint64 ( mmapFile . byteSlice . Range ( idx , idx + MintMaxtSize ) ) )
idx += MintMaxtSize
// We preallocate file to help with m-mapping (especially windows systems).
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
// We are not considering possible file corruption that can cause it to be 0.
// Additionally we are checking mint and maxt just to be sure.
if seriesRef == 0 && mint == 0 && maxt == 0 {
break
}
idx += ChunkEncodingSize // Skip encoding.
dataLen , n := binary . Uvarint ( mmapFile . byteSlice . Range ( idx , idx + MaxChunkLengthFieldSize ) )
idx += n
numSamples := binary . BigEndian . Uint16 ( mmapFile . byteSlice . Range ( idx , idx + 2 ) )
idx += int ( dataLen ) // Skip the data.
// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx + CRCSize > fileEnd {
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d" , idx + CRCSize , fileEnd , segID ) ,
}
}
// Check CRC.
sum := mmapFile . byteSlice . Range ( idx , idx + CRCSize )
if _ , err := chkCRC32 . Write ( mmapFile . byteSlice . Range ( startIdx , idx ) ) ; err != nil {
return err
}
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act ) ,
}
}
idx += CRCSize
if maxt > mmapFile . maxt {
mmapFile . maxt = maxt
}
if err := f ( seriesRef , chunkRef , mint , maxt , numSamples ) ; err != nil {
if cerr , ok := err . ( * CorruptionErr ) ; ok {
cerr . Dir = cdm . dir . Name ( )
cerr . FileIndex = segID
return cerr
}
return err
}
}
if idx > fileEnd {
// It should be equal to the slice length.
return & CorruptionErr {
Dir : cdm . dir . Name ( ) ,
FileIndex : segID ,
Err : errors . Errorf ( "head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d" , idx , fileEnd , segID ) ,
}
}
}
return nil
}
// Truncate deletes the head chunk files which are strictly below the mint.
// mint should be in milliseconds.
func ( cdm * ChunkDiskMapper ) Truncate ( mint int64 ) error {
if ! cdm . fileMaxtSet {
return errors . New ( "maxt of the files are not set" )
}
cdm . readPathMtx . RLock ( )
// Sort the file indices, else if files deletion fails in between,
// it can lead to unsequential files as the map is not sorted.
chkFileIndices := make ( [ ] int , 0 , len ( cdm . mmappedChunkFiles ) )
for seq := range cdm . mmappedChunkFiles {
chkFileIndices = append ( chkFileIndices , seq )
}
sort . Ints ( chkFileIndices )
var removedFiles [ ] int
for _ , seq := range chkFileIndices {
if seq == cdm . curFileSequence || cdm . mmappedChunkFiles [ seq ] . maxt >= mint {
break
}
if cdm . mmappedChunkFiles [ seq ] . maxt < mint {
removedFiles = append ( removedFiles , seq )
}
}
cdm . readPathMtx . RUnlock ( )
errs := tsdb_errors . NewMulti ( )
// Cut a new file only if the current file has some chunks.
if cdm . curFileSize ( ) > HeadChunkFileHeaderSize {
// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
// a new file could already be cut, this is acceptable because it will simply result in an empty file which
// won't do any harm.
cdm . CutNewFile ( )
}
pendingDeletes , err := cdm . deleteFiles ( removedFiles )
errs . Add ( err )
if len ( chkFileIndices ) == len ( removedFiles ) {
// All files were deleted. Reset the current sequence.
cdm . evtlPosMtx . Lock ( )
if err == nil {
cdm . evtlPos . setSeq ( 0 )
} else {
// In case of error, set it to the last file number on the disk that was not deleted.
cdm . evtlPos . setSeq ( uint64 ( pendingDeletes [ len ( pendingDeletes ) - 1 ] ) )
}
cdm . evtlPosMtx . Unlock ( )
}
return errs . Err ( )
}
// deleteFiles deletes the given file sequences in order of the sequence.
// In case of an error, it returns the sorted file sequences that were not deleted from the _disk_.
func ( cdm * ChunkDiskMapper ) deleteFiles ( removedFiles [ ] int ) ( [ ] int , error ) {
sort . Ints ( removedFiles ) // To delete them in order.
cdm . readPathMtx . Lock ( )
for _ , seq := range removedFiles {
if err := cdm . closers [ seq ] . Close ( ) ; err != nil {
cdm . readPathMtx . Unlock ( )
return removedFiles , err
}
delete ( cdm . mmappedChunkFiles , seq )
delete ( cdm . closers , seq )
}
cdm . readPathMtx . Unlock ( )
// We actually delete the files separately to not block the readPathMtx for long.
for i , seq := range removedFiles {
if err := os . Remove ( segmentFile ( cdm . dir . Name ( ) , seq ) ) ; err != nil {
return removedFiles [ i : ] , err
}
}
return nil , nil
}
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func ( cdm * ChunkDiskMapper ) DeleteCorrupted ( originalErr error ) error {
err := errors . Cause ( originalErr ) // So that we can pick up errors even if wrapped.
cerr , ok := err . ( * CorruptionErr )
if ! ok {
return errors . Wrap ( originalErr , "cannot handle error" )
}
// Delete all the head chunk files following the corrupt head chunk file.
segs := [ ] int { }
cdm . readPathMtx . RLock ( )
lastSeq := 0
for seg := range cdm . mmappedChunkFiles {
if seg >= cerr . FileIndex {
segs = append ( segs , seg )
} else if seg > lastSeq {
lastSeq = seg
}
}
cdm . readPathMtx . RUnlock ( )
pendingDeletes , err := cdm . deleteFiles ( segs )
cdm . evtlPosMtx . Lock ( )
if err == nil {
cdm . evtlPos . setSeq ( uint64 ( lastSeq ) )
} else {
// In case of error, set it to the last file number on the disk that was not deleted.
cdm . evtlPos . setSeq ( uint64 ( pendingDeletes [ len ( pendingDeletes ) - 1 ] ) )
}
cdm . evtlPosMtx . Unlock ( )
return err
}
// Size returns the size of the chunk files.
func ( cdm * ChunkDiskMapper ) Size ( ) ( int64 , error ) {
return fileutil . DirSize ( cdm . dir . Name ( ) )
}
func ( cdm * ChunkDiskMapper ) curFileSize ( ) uint64 {
return cdm . curFileOffset . Load ( )
}
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func ( cdm * ChunkDiskMapper ) Close ( ) error {
// Locking the eventual position lock blocks WriteChunk()
cdm . evtlPosMtx . Lock ( )
defer cdm . evtlPosMtx . Unlock ( )
cdm . writeQueue . stop ( )
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
cdm . writePathMtx . Lock ( )
defer cdm . writePathMtx . Unlock ( )
cdm . readPathMtx . Lock ( )
defer cdm . readPathMtx . Unlock ( )
if cdm . closed {
return nil
}
cdm . closed = true
errs := tsdb_errors . NewMulti (
closeAllFromMap ( cdm . closers ) ,
cdm . finalizeCurFile ( ) ,
cdm . dir . Close ( ) ,
)
cdm . mmappedChunkFiles = map [ int ] * mmappedChunkFile { }
cdm . closers = map [ int ] io . Closer { }
return errs . Err ( )
}
func closeAllFromMap ( cs map [ int ] io . Closer ) error {
errs := tsdb_errors . NewMulti ( )
for _ , c := range cs {
errs . Add ( c . Close ( ) )
}
return errs . Err ( )
}
const inBufferShards = 128 // 128 is a randomly chosen number.
// chunkBuffer is a thread safe lookup table for chunks by their ref.
type chunkBuffer struct {
inBufferChunks [ inBufferShards ] map [ ChunkDiskMapperRef ] chunkenc . Chunk
inBufferChunksMtxs [ inBufferShards ] sync . RWMutex
}
func newChunkBuffer ( ) * chunkBuffer {
cb := & chunkBuffer { }
for i := 0 ; i < inBufferShards ; i ++ {
cb . inBufferChunks [ i ] = make ( map [ ChunkDiskMapperRef ] chunkenc . Chunk )
}
return cb
}
func ( cb * chunkBuffer ) put ( ref ChunkDiskMapperRef , chk chunkenc . Chunk ) {
shardIdx := ref % inBufferShards
cb . inBufferChunksMtxs [ shardIdx ] . Lock ( )
cb . inBufferChunks [ shardIdx ] [ ref ] = chk
cb . inBufferChunksMtxs [ shardIdx ] . Unlock ( )
}
func ( cb * chunkBuffer ) get ( ref ChunkDiskMapperRef ) chunkenc . Chunk {
shardIdx := ref % inBufferShards
cb . inBufferChunksMtxs [ shardIdx ] . RLock ( )
defer cb . inBufferChunksMtxs [ shardIdx ] . RUnlock ( )
return cb . inBufferChunks [ shardIdx ] [ ref ]
}
func ( cb * chunkBuffer ) clear ( ) {
for i := 0 ; i < inBufferShards ; i ++ {
cb . inBufferChunksMtxs [ i ] . Lock ( )
cb . inBufferChunks [ i ] = make ( map [ ChunkDiskMapperRef ] chunkenc . Chunk )
cb . inBufferChunksMtxs [ i ] . Unlock ( )
}
}