mirror of https://github.com/prometheus/prometheus
Live m-mapping of chunks on disk (#6830)
* Live m-mapping of chunks on disk Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Part 2 Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Part 3 Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix review comments Part 4 Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Attempt to fix windows bug Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>pull/7012/head
parent
1da83305be
commit
e50fdbc70c
|
@ -190,7 +190,7 @@ func (w *Writer) cut() error {
|
|||
return err
|
||||
}
|
||||
|
||||
n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize)
|
||||
n, f, _, err := cutSegmentFile(w.dirFile, MagicChunks, chunksFormatV1, w.segmentSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -206,7 +206,7 @@ func (w *Writer) cut() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) {
|
||||
func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, allocSize int64) (headerSize int, newFile *os.File, seq int, err error) {
|
||||
p, seq, err := nextSequenceFile(dirFile.Name())
|
||||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
|
@ -215,8 +215,8 @@ func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (hea
|
|||
if err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
if segmentSize > 0 {
|
||||
if err = fileutil.Preallocate(f, segmentSize, true); err != nil {
|
||||
if allocSize > 0 {
|
||||
if err = fileutil.Preallocate(f, allocSize, true); err != nil {
|
||||
return 0, nil, 0, err
|
||||
}
|
||||
}
|
||||
|
@ -226,7 +226,7 @@ func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (hea
|
|||
|
||||
// Write header metadata for new file.
|
||||
metab := make([]byte, SegmentHeaderSize)
|
||||
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
|
||||
binary.BigEndian.PutUint32(metab[:MagicChunksSize], magicNumber)
|
||||
metab[4] = chunksFormat
|
||||
|
||||
n, err := f.Write(metab)
|
||||
|
|
|
@ -0,0 +1,779 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||
)
|
||||
|
||||
// Head chunk file header fields constants.
|
||||
const (
|
||||
// MagicHeadChunks is 4 bytes at the beginning of a head chunk file.
|
||||
MagicHeadChunks = 0x0130BC91
|
||||
|
||||
headChunksFormatV1 = 1
|
||||
writeBufferSize = 4 * 1024 * 1024 // 4 MiB.
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrChunkDiskMapperClosed returned by any method indicates
|
||||
// that the ChunkDiskMapper was closed.
|
||||
ErrChunkDiskMapperClosed = errors.New("ChunkDiskMapper closed")
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultHeadChunkFileMaxTimeRange is the default head chunk file time range.
|
||||
// Assuming a general scrape interval of 15s, a chunk with 120 samples would
|
||||
// be cut every 30m, so anything <30m will cause lots of empty files. And keeping
|
||||
// it exactly 30m also has a chance of having empty files as its near that border.
|
||||
// Hence keeping it a little more than 30m, i.e. 40m.
|
||||
DefaultHeadChunkFileMaxTimeRange = 40 * int64(time.Minute/time.Millisecond)
|
||||
// MintMaxtSize is the size of the mint/maxt for head chunk file and chunks.
|
||||
MintMaxtSize = 8
|
||||
// SeriesRefSize is the size of series reference on disk.
|
||||
SeriesRefSize = 8
|
||||
// HeadChunkFileHeaderSize is the total size of the header for the head chunk file.
|
||||
HeadChunkFileHeaderSize = SegmentHeaderSize
|
||||
// MaxHeadChunkFileSize is the max size of a head chunk file.
|
||||
MaxHeadChunkFileSize = 512 * 1024 * 1024 // 512 MiB.
|
||||
// CRCSize is the size of crc32 sum on disk.
|
||||
CRCSize = 4
|
||||
// MaxHeadChunkMetaSize is the max size of an mmapped chunks minus the chunks data.
|
||||
// Max because the uvarint size can be smaller.
|
||||
MaxHeadChunkMetaSize = SeriesRefSize + 2*MintMaxtSize + ChunksFormatVersionSize + MaxChunkLengthFieldSize + CRCSize
|
||||
)
|
||||
|
||||
// corruptionErr is an error that's returned when corruption is encountered.
|
||||
type corruptionErr struct {
|
||||
Dir string
|
||||
FileIndex int
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e *corruptionErr) Error() string {
|
||||
return errors.Wrapf(e.Err, "corruption in head chunk file %s", segmentFile(e.Dir, e.FileIndex)).Error()
|
||||
}
|
||||
|
||||
// ChunkDiskMapper is for writing the Head block chunks to the disk
|
||||
// and access chunks via mmapped file.
|
||||
type ChunkDiskMapper struct {
|
||||
/// Writer.
|
||||
dir *os.File
|
||||
|
||||
curFile *os.File // File being written to.
|
||||
curFileSequence int // Index of current open file being appended to.
|
||||
curFileMint int64 // Used to check for a chunk crossing the max file time range.
|
||||
curFileMaxt int64 // Used for the size retention.
|
||||
curFileNumBytes int64 // Bytes written in current open file.
|
||||
maxFileTime int64 // Max time range (curFileMaxt-curFileMint) for a file.
|
||||
|
||||
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
|
||||
chkWriter *bufio.Writer // Writer for the current open file.
|
||||
crc32 hash.Hash
|
||||
writePathMtx sync.RWMutex
|
||||
|
||||
/// Reader.
|
||||
// The int key in the map is the file number on the disk.
|
||||
mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
|
||||
closers map[int]io.Closer // Closers for resources behind the byte slices.
|
||||
readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps.
|
||||
pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk.
|
||||
|
||||
// Writer and Reader.
|
||||
// We flush chunks to disk in batches. Hence, we store them in this buffer
|
||||
// from which chunks are served till they are flushed and are ready for m-mapping.
|
||||
chunkBuffer *chunkBuffer
|
||||
|
||||
// The total size of bytes in the closed files.
|
||||
// Needed to calculate the total size of all segments on disk.
|
||||
size int64
|
||||
|
||||
// If 'true', it indicated that the maxt of all the on-disk files were set
|
||||
// after iterating through all the chunks in those files.
|
||||
fileMaxtSet bool
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
type mmappedChunkFile struct {
|
||||
byteSlice ByteSlice
|
||||
maxt int64
|
||||
}
|
||||
|
||||
// NewChunkDiskMapper returns a new writer against the given directory
|
||||
// using the default head chunk file duration.
|
||||
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
func NewChunkDiskMapper(dir string, pool chunkenc.Pool) (*ChunkDiskMapper, error) {
|
||||
return newChunkDiskMapper(dir, DefaultHeadChunkFileMaxTimeRange, pool)
|
||||
}
|
||||
|
||||
func newChunkDiskMapper(dir string, maxFileDuration int64, pool chunkenc.Pool) (*ChunkDiskMapper, error) {
|
||||
if maxFileDuration <= 0 {
|
||||
maxFileDuration = DefaultHeadChunkFileMaxTimeRange
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dirFile, err := fileutil.OpenDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := &ChunkDiskMapper{
|
||||
dir: dirFile,
|
||||
maxFileTime: maxFileDuration,
|
||||
pool: pool,
|
||||
crc32: newCRC32(),
|
||||
chunkBuffer: newChunkBuffer(),
|
||||
}
|
||||
|
||||
if m.pool == nil {
|
||||
m.pool = chunkenc.NewPool()
|
||||
}
|
||||
|
||||
return m, m.openMMapFiles()
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
|
||||
cdm.closers = map[int]io.Closer{}
|
||||
defer func() {
|
||||
if returnErr != nil {
|
||||
var merr tsdb_errors.MultiError
|
||||
merr.Add(returnErr)
|
||||
merr.Add(closeAllFromMap(cdm.closers))
|
||||
returnErr = merr.Err()
|
||||
|
||||
cdm.mmappedChunkFiles = nil
|
||||
cdm.closers = nil
|
||||
}
|
||||
}()
|
||||
|
||||
files, err := listChunkFiles(cdm.dir.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chkFileIndices := make([]int, 0, len(files))
|
||||
for seq, fn := range files {
|
||||
f, err := fileutil.OpenMmapFile(fn)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "mmap files")
|
||||
}
|
||||
cdm.closers[seq] = f
|
||||
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
|
||||
chkFileIndices = append(chkFileIndices, seq)
|
||||
}
|
||||
|
||||
cdm.size = 0
|
||||
|
||||
// Check for gaps in the files.
|
||||
sort.Ints(chkFileIndices)
|
||||
if len(chkFileIndices) == 0 {
|
||||
return nil
|
||||
}
|
||||
lastSeq := chkFileIndices[0]
|
||||
for _, seq := range chkFileIndices[1:] {
|
||||
if seq != lastSeq+1 {
|
||||
return errors.Errorf("found unsequential head chunk files %d and %d", lastSeq, seq)
|
||||
}
|
||||
lastSeq = seq
|
||||
}
|
||||
|
||||
for i, b := range cdm.mmappedChunkFiles {
|
||||
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
|
||||
return errors.Wrapf(errInvalidSize, "invalid head chunk file header in file %d", i)
|
||||
}
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
|
||||
return errors.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
|
||||
// Verify chunk format version.
|
||||
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
|
||||
return errors.Errorf("invalid chunk format version %d", v)
|
||||
}
|
||||
|
||||
cdm.size += int64(b.byteSlice.Len())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func listChunkFiles(dir string) (map[int]string, error) {
|
||||
files, err := ioutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := map[int]string{}
|
||||
for _, fi := range files {
|
||||
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
res[int(seq)] = filepath.Join(dir, fi.Name())
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// WriteChunk writes the chunk to the disk.
|
||||
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
||||
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) {
|
||||
cdm.writePathMtx.Lock()
|
||||
defer cdm.writePathMtx.Unlock()
|
||||
|
||||
if cdm.closed {
|
||||
return 0, ErrChunkDiskMapperClosed
|
||||
}
|
||||
|
||||
if cdm.shouldCutNewFile(len(chk.Bytes()), maxt) {
|
||||
if err := cdm.cut(mint); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
|
||||
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
|
||||
if len(chk.Bytes())+MaxHeadChunkMetaSize < writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
|
||||
if err := cdm.flushBuffer(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
cdm.crc32.Reset()
|
||||
bytesWritten := 0
|
||||
|
||||
// The upper 4 bytes are for the head chunk file index and
|
||||
// the lower 4 bytes are for the head chunk file offset where to start reading this chunk.
|
||||
chkRef = chunkRef(uint64(cdm.curFileSequence), uint64(cdm.curFileNumBytes))
|
||||
|
||||
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], seriesRef)
|
||||
bytesWritten += SeriesRefSize
|
||||
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
|
||||
bytesWritten += MintMaxtSize
|
||||
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
|
||||
bytesWritten += MintMaxtSize
|
||||
cdm.byteBuf[bytesWritten] = byte(chk.Encoding())
|
||||
bytesWritten += ChunkEncodingSize
|
||||
n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
|
||||
bytesWritten += n
|
||||
|
||||
if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := cdm.writeCRC32(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if maxt > cdm.curFileMaxt {
|
||||
cdm.curFileMaxt = maxt
|
||||
}
|
||||
if mint < cdm.curFileMint {
|
||||
cdm.curFileMint = mint
|
||||
}
|
||||
|
||||
cdm.chunkBuffer.put(chkRef, chk)
|
||||
|
||||
if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize {
|
||||
// The chunk was bigger than the buffer itself.
|
||||
// Flushing to not keep partial chunks in buffer.
|
||||
if err := cdm.flushBuffer(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return chkRef, nil
|
||||
}
|
||||
|
||||
func chunkRef(seq, offset uint64) (chunkRef uint64) {
|
||||
return (seq << 32) | offset
|
||||
}
|
||||
|
||||
// shouldCutNewFile decides the cutting of a new file based on time and size retention.
|
||||
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
|
||||
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
|
||||
func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int, maxt int64) bool {
|
||||
return cdm.curFileNumBytes == 0 || // First head chunk file.
|
||||
(maxt-cdm.curFileMint > cdm.maxFileTime && cdm.curFileNumBytes > HeadChunkFileHeaderSize) || // Time duration reached for the existing file.
|
||||
cdm.curFileNumBytes+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) cut(mint int64) (returnErr error) {
|
||||
// Sync current tail to disk and close.
|
||||
if err := cdm.finalizeCurFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, int64(MaxHeadChunkFileSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
// The file should not be closed if there is no error,
|
||||
// its kept open in the ChunkDiskMapper.
|
||||
if returnErr != nil {
|
||||
var merr tsdb_errors.MultiError
|
||||
merr.Add(returnErr)
|
||||
merr.Add(newFile.Close())
|
||||
returnErr = merr.Err()
|
||||
}
|
||||
}()
|
||||
|
||||
cdm.size += cdm.curFileNumBytes
|
||||
atomic.StoreInt64(&cdm.curFileNumBytes, int64(n))
|
||||
|
||||
if cdm.curFile != nil {
|
||||
cdm.readPathMtx.Lock()
|
||||
cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt
|
||||
cdm.readPathMtx.Unlock()
|
||||
}
|
||||
|
||||
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), int(MaxHeadChunkFileSize))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cdm.curFileSequence = seq
|
||||
cdm.curFileMint = mint
|
||||
cdm.curFile = newFile
|
||||
if cdm.chkWriter != nil {
|
||||
cdm.chkWriter.Reset(newFile)
|
||||
} else {
|
||||
cdm.chkWriter = bufio.NewWriterSize(newFile, writeBufferSize)
|
||||
}
|
||||
|
||||
cdm.readPathMtx.Lock()
|
||||
cdm.closers[cdm.curFileSequence] = mmapFile
|
||||
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
|
||||
cdm.readPathMtx.Unlock()
|
||||
|
||||
cdm.curFileMaxt = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// finalizeCurFile writes all pending data to the current tail file,
|
||||
// truncates its size, and closes it.
|
||||
func (cdm *ChunkDiskMapper) finalizeCurFile() error {
|
||||
if cdm.curFile == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cdm.flushBuffer(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cdm.curFile.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cdm.curFile.Close()
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) write(b []byte) error {
|
||||
n, err := cdm.chkWriter.Write(b)
|
||||
atomic.AddInt64(&cdm.curFileNumBytes, int64(n))
|
||||
return err
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) writeAndAppendToCRC32(b []byte) error {
|
||||
if err := cdm.write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := cdm.crc32.Write(b)
|
||||
return err
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) writeCRC32() error {
|
||||
return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0]))
|
||||
}
|
||||
|
||||
// flushBuffer flushes the current in-memory chunks.
|
||||
// Assumes that writePathMtx is _write_ locked before calling this method.
|
||||
func (cdm *ChunkDiskMapper) flushBuffer() error {
|
||||
if err := cdm.chkWriter.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
cdm.chunkBuffer.clear()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Chunk returns a chunk from a given reference.
|
||||
// Note: The returned chunk will turn invalid after closing ChunkDiskMapper.
|
||||
func (cdm *ChunkDiskMapper) Chunk(ref uint64) (chunkenc.Chunk, error) {
|
||||
cdm.readPathMtx.RLock()
|
||||
// We hold this read lock for the entire duration because if the Close()
|
||||
// is called, the data in the byte slice will get corrupted as the mmapped
|
||||
// file will be closed.
|
||||
defer cdm.readPathMtx.RUnlock()
|
||||
|
||||
var (
|
||||
// Get the upper 4 bytes.
|
||||
// These contain the head chunk file index.
|
||||
sgmIndex = int(ref >> 32)
|
||||
// Get the lower 4 bytes.
|
||||
// These contain the head chunk file offset where the chunk starts.
|
||||
// We skip the series ref and the mint/maxt beforehand.
|
||||
chkStart = int((ref<<32)>>32) + SeriesRefSize + (2 * MintMaxtSize)
|
||||
chkCRC32 = newCRC32()
|
||||
)
|
||||
|
||||
if cdm.closed {
|
||||
return nil, ErrChunkDiskMapperClosed
|
||||
}
|
||||
|
||||
// If it is the current open file, then the chunks can be in the buffer too.
|
||||
if sgmIndex == cdm.curFileSequence {
|
||||
chunk := cdm.chunkBuffer.get(ref)
|
||||
if chunk != nil {
|
||||
return chunk, nil
|
||||
}
|
||||
}
|
||||
|
||||
mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
|
||||
if !ok {
|
||||
if sgmIndex > cdm.curFileSequence {
|
||||
return nil, errors.Errorf("head chunk file index %d more than current open file", sgmIndex)
|
||||
}
|
||||
return nil, errors.Errorf("head chunk file index %d does not exist on disk", sgmIndex)
|
||||
}
|
||||
|
||||
if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
|
||||
return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v, file:%d", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len(), sgmIndex)
|
||||
}
|
||||
|
||||
// Encoding.
|
||||
chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
|
||||
|
||||
// Data length.
|
||||
// With the minimum chunk length this should never cause us reading
|
||||
// over the end of the slice.
|
||||
chkDataLenStart := chkStart + ChunkEncodingSize
|
||||
c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
|
||||
chkDataLen, n := binary.Uvarint(c)
|
||||
if n <= 0 {
|
||||
return nil, errors.Errorf("reading chunk length failed with %d", n)
|
||||
}
|
||||
|
||||
// Verify the chunk data end.
|
||||
chkDataEnd := chkDataLenStart + n + int(chkDataLen)
|
||||
if chkDataEnd > mmapFile.byteSlice.Len() {
|
||||
return nil, errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len())
|
||||
}
|
||||
|
||||
// Check the CRC.
|
||||
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
|
||||
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
|
||||
return nil, errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act)
|
||||
}
|
||||
|
||||
// The chunk data itself.
|
||||
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
|
||||
return cdm.pool.Get(chunkenc.Encoding(chkEnc), chkData)
|
||||
}
|
||||
|
||||
// IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence
|
||||
// and runs the provided function on each chunk. It returns on the first error encountered.
|
||||
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
|
||||
// to set the maxt of all the file.
|
||||
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef, chunkRef uint64, mint, maxt int64) error) (err error) {
|
||||
cdm.writePathMtx.Lock()
|
||||
defer cdm.writePathMtx.Unlock()
|
||||
|
||||
defer func() {
|
||||
if err == nil {
|
||||
cdm.fileMaxtSet = true
|
||||
}
|
||||
}()
|
||||
|
||||
chkCRC32 := newCRC32()
|
||||
|
||||
// Iterate files in ascending order.
|
||||
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
|
||||
for seg := range cdm.mmappedChunkFiles {
|
||||
segIDs = append(segIDs, seg)
|
||||
}
|
||||
sort.Ints(segIDs)
|
||||
for _, segID := range segIDs {
|
||||
mmapFile := cdm.mmappedChunkFiles[segID]
|
||||
fileEnd := mmapFile.byteSlice.Len()
|
||||
if segID == cdm.curFileSequence {
|
||||
fileEnd = int(cdm.curFileNumBytes)
|
||||
}
|
||||
idx := HeadChunkFileHeaderSize
|
||||
for idx < fileEnd {
|
||||
if fileEnd-idx < MaxHeadChunkMetaSize {
|
||||
// Check for all 0s which marks the end of the file.
|
||||
allZeros := true
|
||||
for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) {
|
||||
if b != byte(0) {
|
||||
allZeros = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allZeros {
|
||||
break
|
||||
}
|
||||
return &corruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
chkCRC32.Reset()
|
||||
chunkRef := chunkRef(uint64(segID), uint64(idx))
|
||||
|
||||
startIdx := idx
|
||||
seriesRef := binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize))
|
||||
idx += SeriesRefSize
|
||||
mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
|
||||
idx += MintMaxtSize
|
||||
maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
|
||||
idx += MintMaxtSize
|
||||
|
||||
// We preallocate file to help with m-mapping (especially windows systems).
|
||||
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
|
||||
// We are not considering possible file corruption that can cause it to be 0.
|
||||
// Additionally we are checking mint and maxt just to be sure.
|
||||
if seriesRef == 0 && mint == 0 && maxt == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
idx += ChunkEncodingSize // Skip encoding.
|
||||
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
|
||||
idx += n + int(dataLen) // Skip the data.
|
||||
|
||||
// In the beginning we only checked for the chunk meta size.
|
||||
// Now that we have added the chunk data length, we check for sufficient bytes again.
|
||||
if idx+CRCSize > fileEnd {
|
||||
return &corruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
|
||||
// Check CRC.
|
||||
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
|
||||
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
|
||||
return err
|
||||
}
|
||||
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
|
||||
return &corruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
|
||||
}
|
||||
}
|
||||
idx += CRCSize
|
||||
|
||||
if maxt > mmapFile.maxt {
|
||||
mmapFile.maxt = maxt
|
||||
}
|
||||
|
||||
if err := f(seriesRef, chunkRef, mint, maxt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if idx > fileEnd {
|
||||
// It should be equal to the slice length.
|
||||
return &corruptionErr{
|
||||
Dir: cdm.dir.Name(),
|
||||
FileIndex: segID,
|
||||
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Truncate deletes the head chunk files which are strictly below the mint.
|
||||
// mint should be in milliseconds.
|
||||
func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
|
||||
if !cdm.fileMaxtSet {
|
||||
return errors.New("maxt of the files are not set")
|
||||
}
|
||||
cdm.readPathMtx.RLock()
|
||||
|
||||
// Sort the file indices, else if files deletion fails in between,
|
||||
// it can lead to unsequential files as the map is not sorted.
|
||||
chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles))
|
||||
for seq := range cdm.mmappedChunkFiles {
|
||||
chkFileIndices = append(chkFileIndices, seq)
|
||||
}
|
||||
sort.Ints(chkFileIndices)
|
||||
|
||||
var removedFiles []int
|
||||
for _, seq := range chkFileIndices {
|
||||
if seq == cdm.curFileSequence {
|
||||
continue
|
||||
}
|
||||
if cdm.mmappedChunkFiles[seq].maxt < mint {
|
||||
removedFiles = append(removedFiles, seq)
|
||||
}
|
||||
}
|
||||
cdm.readPathMtx.RUnlock()
|
||||
|
||||
return cdm.deleteFiles(removedFiles)
|
||||
}
|
||||
|
||||
func (cdm *ChunkDiskMapper) deleteFiles(removedFiles []int) error {
|
||||
cdm.readPathMtx.Lock()
|
||||
for _, seq := range removedFiles {
|
||||
if err := cdm.closers[seq].Close(); err != nil {
|
||||
cdm.readPathMtx.Unlock()
|
||||
return err
|
||||
}
|
||||
cdm.size -= int64(cdm.mmappedChunkFiles[seq].byteSlice.Len())
|
||||
delete(cdm.mmappedChunkFiles, seq)
|
||||
delete(cdm.closers, seq)
|
||||
}
|
||||
cdm.readPathMtx.Unlock()
|
||||
|
||||
// We actually delete the files separately to not block the readPathMtx for long.
|
||||
for _, seq := range removedFiles {
|
||||
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Repair deletes all the head chunk files after the one which had the corruption
|
||||
// (including the corrupt file).
|
||||
func (cdm *ChunkDiskMapper) Repair(originalErr error) error {
|
||||
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
|
||||
cerr, ok := err.(*corruptionErr)
|
||||
if !ok {
|
||||
return errors.Wrap(originalErr, "cannot handle error")
|
||||
}
|
||||
|
||||
// Delete all the head chunk files following the corrupt head chunk file.
|
||||
segs := []int{}
|
||||
for seg := range cdm.mmappedChunkFiles {
|
||||
if seg >= cerr.FileIndex {
|
||||
segs = append(segs, seg)
|
||||
}
|
||||
}
|
||||
return cdm.deleteFiles(segs)
|
||||
}
|
||||
|
||||
// Size returns the size of the chunk files.
|
||||
func (cdm *ChunkDiskMapper) Size() int64 {
|
||||
n := atomic.LoadInt64(&cdm.curFileNumBytes)
|
||||
return cdm.size + n
|
||||
}
|
||||
|
||||
// Close closes all the open files in ChunkDiskMapper.
|
||||
// It is not longer safe to access chunks from this struct after calling Close.
|
||||
func (cdm *ChunkDiskMapper) Close() error {
|
||||
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
|
||||
// The lock order should not be reversed here else it can cause deadlocks.
|
||||
cdm.writePathMtx.Lock()
|
||||
defer cdm.writePathMtx.Unlock()
|
||||
cdm.readPathMtx.Lock()
|
||||
defer cdm.readPathMtx.Unlock()
|
||||
|
||||
if cdm.closed {
|
||||
return nil
|
||||
}
|
||||
cdm.closed = true
|
||||
|
||||
var merr tsdb_errors.MultiError
|
||||
merr.Add(closeAllFromMap(cdm.closers))
|
||||
merr.Add(cdm.finalizeCurFile())
|
||||
merr.Add(cdm.dir.Close())
|
||||
|
||||
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
|
||||
cdm.closers = map[int]io.Closer{}
|
||||
|
||||
return merr.Err()
|
||||
}
|
||||
|
||||
func closeAllFromMap(cs map[int]io.Closer) error {
|
||||
var merr tsdb_errors.MultiError
|
||||
for _, c := range cs {
|
||||
merr.Add(c.Close())
|
||||
}
|
||||
return merr.Err()
|
||||
}
|
||||
|
||||
const inBufferShards = 128 // 128 is a randomly chosen number.
|
||||
|
||||
// chunkBuffer is a thread safe buffer for chunks.
|
||||
type chunkBuffer struct {
|
||||
inBufferChunks [inBufferShards]map[uint64]chunkenc.Chunk
|
||||
inBufferChunksMtxs [inBufferShards]sync.RWMutex
|
||||
}
|
||||
|
||||
func newChunkBuffer() *chunkBuffer {
|
||||
cb := &chunkBuffer{}
|
||||
for i := 0; i < inBufferShards; i++ {
|
||||
cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk)
|
||||
}
|
||||
return cb
|
||||
}
|
||||
|
||||
func (cb *chunkBuffer) put(ref uint64, chk chunkenc.Chunk) {
|
||||
shardIdx := ref % inBufferShards
|
||||
|
||||
cb.inBufferChunksMtxs[shardIdx].Lock()
|
||||
cb.inBufferChunks[shardIdx][ref] = chk
|
||||
cb.inBufferChunksMtxs[shardIdx].Unlock()
|
||||
}
|
||||
|
||||
func (cb *chunkBuffer) get(ref uint64) chunkenc.Chunk {
|
||||
shardIdx := ref % inBufferShards
|
||||
|
||||
cb.inBufferChunksMtxs[shardIdx].RLock()
|
||||
defer cb.inBufferChunksMtxs[shardIdx].RUnlock()
|
||||
|
||||
return cb.inBufferChunks[shardIdx][ref]
|
||||
}
|
||||
|
||||
func (cb *chunkBuffer) clear() {
|
||||
for i := 0; i < inBufferShards; i++ {
|
||||
cb.inBufferChunksMtxs[i].Lock()
|
||||
cb.inBufferChunks[i] = make(map[uint64]chunkenc.Chunk)
|
||||
cb.inBufferChunksMtxs[i].Unlock()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,275 @@
|
|||
// Copyright 2020 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestHeadReadWriter_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
||||
hrw, close := testHeadReadWriter(t)
|
||||
defer func() {
|
||||
testutil.Ok(t, hrw.Close())
|
||||
close()
|
||||
}()
|
||||
|
||||
expectedBytes := []byte{}
|
||||
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
|
||||
chkCRC32 := newCRC32()
|
||||
|
||||
type expectedDataType struct {
|
||||
seriesRef, chunkRef uint64
|
||||
mint, maxt int64
|
||||
chunk chunkenc.Chunk
|
||||
}
|
||||
expectedData := []expectedDataType{}
|
||||
|
||||
var buf [MaxHeadChunkMetaSize]byte
|
||||
totalChunks := 0
|
||||
var firstFileName string
|
||||
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
|
||||
for i := 0; i < 100; i++ {
|
||||
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
|
||||
totalChunks++
|
||||
expectedData = append(expectedData, expectedDataType{
|
||||
seriesRef: seriesRef,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
chunkRef: chkRef,
|
||||
chunk: chunk,
|
||||
})
|
||||
|
||||
if hrw.curFileSequence != 1 {
|
||||
// We are checking for bytes written only for the first file.
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculating expected bytes written on disk for first file.
|
||||
firstFileName = hrw.curFile.Name()
|
||||
testutil.Equals(t, chunkRef(1, nextChunkOffset), chkRef)
|
||||
|
||||
bytesWritten := 0
|
||||
chkCRC32.Reset()
|
||||
|
||||
binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef)
|
||||
bytesWritten += SeriesRefSize
|
||||
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
|
||||
bytesWritten += MintMaxtSize
|
||||
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
|
||||
bytesWritten += MintMaxtSize
|
||||
buf[bytesWritten] = byte(chunk.Encoding())
|
||||
bytesWritten += ChunkEncodingSize
|
||||
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
|
||||
bytesWritten += n
|
||||
|
||||
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
|
||||
_, err := chkCRC32.Write(buf[:bytesWritten])
|
||||
testutil.Ok(t, err)
|
||||
expectedBytes = append(expectedBytes, chunk.Bytes()...)
|
||||
_, err = chkCRC32.Write(chunk.Bytes())
|
||||
testutil.Ok(t, err)
|
||||
|
||||
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
|
||||
|
||||
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
|
||||
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
|
||||
}
|
||||
}
|
||||
|
||||
// Checking on-disk bytes for the first file.
|
||||
testutil.Assert(t, len(hrw.mmappedChunkFiles) == 3 && len(hrw.closers) == 3, "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
|
||||
|
||||
actualBytes, err := ioutil.ReadFile(firstFileName)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
// Check header of the segment file.
|
||||
testutil.Equals(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
|
||||
testutil.Equals(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
|
||||
|
||||
// Remaining chunk data.
|
||||
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
|
||||
testutil.Equals(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
|
||||
|
||||
// Test for the next chunk header to be all 0s. That marks the end of the file.
|
||||
for _, b := range actualBytes[fileEnd : fileEnd+MaxHeadChunkMetaSize] {
|
||||
testutil.Equals(t, byte(0), b)
|
||||
}
|
||||
|
||||
// Testing reading of chunks.
|
||||
for _, exp := range expectedData {
|
||||
actChunk, err := hrw.Chunk(exp.chunkRef)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, exp.chunk.Bytes(), actChunk.Bytes())
|
||||
}
|
||||
|
||||
// Testing IterateAllChunks method.
|
||||
dir := hrw.dir.Name()
|
||||
testutil.Ok(t, hrw.Close())
|
||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
||||
testutil.Ok(t, err)
|
||||
|
||||
idx := 0
|
||||
err = hrw.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64) error {
|
||||
t.Helper()
|
||||
|
||||
expData := expectedData[idx]
|
||||
testutil.Equals(t, expData.seriesRef, seriesRef)
|
||||
testutil.Equals(t, expData.chunkRef, chunkRef)
|
||||
testutil.Equals(t, expData.maxt, maxt)
|
||||
testutil.Equals(t, expData.maxt, maxt)
|
||||
|
||||
actChunk, err := hrw.Chunk(expData.chunkRef)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, expData.chunk.Bytes(), actChunk.Bytes())
|
||||
|
||||
idx++
|
||||
return nil
|
||||
})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, len(expectedData), idx)
|
||||
|
||||
}
|
||||
|
||||
func TestHeadReadWriter_Truncate(t *testing.T) {
|
||||
hrw, close := testHeadReadWriter(t)
|
||||
defer func() {
|
||||
testutil.Ok(t, hrw.Close())
|
||||
close()
|
||||
}()
|
||||
|
||||
testutil.Assert(t, !hrw.fileMaxtSet, "")
|
||||
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil }))
|
||||
testutil.Assert(t, hrw.fileMaxtSet, "")
|
||||
|
||||
timeRange := 0
|
||||
fileTimeStep := 100
|
||||
totalFiles, after1stTruncation, after2ndTruncation := 7, 5, 3
|
||||
var timeToTruncate, timeToTruncateAfterRestart int64
|
||||
|
||||
cutFile := func(i int) {
|
||||
testutil.Ok(t, hrw.cut(int64(timeRange)))
|
||||
|
||||
mint := timeRange + 1 // Just after the the new file cut.
|
||||
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
|
||||
|
||||
// Write a chunks to set maxt for the segment.
|
||||
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
if i == totalFiles-after1stTruncation+1 {
|
||||
// Truncate the segment files before the 5th segment.
|
||||
timeToTruncate = int64(mint)
|
||||
} else if i == totalFiles-after2ndTruncation+1 {
|
||||
// Truncate the segment files before the 3rd segment after restart.
|
||||
timeToTruncateAfterRestart = int64(mint)
|
||||
}
|
||||
|
||||
timeRange += fileTimeStep
|
||||
}
|
||||
|
||||
// Cut segments.
|
||||
for i := 1; i <= totalFiles; i++ {
|
||||
cutFile(i)
|
||||
}
|
||||
|
||||
// Verifying the the remaining files.
|
||||
verifyRemainingFiles := func(remainingFiles int) {
|
||||
t.Helper()
|
||||
|
||||
files, err := ioutil.ReadDir(hrw.dir.Name())
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, remainingFiles, len(files))
|
||||
testutil.Equals(t, remainingFiles, len(hrw.mmappedChunkFiles))
|
||||
testutil.Equals(t, remainingFiles, len(hrw.closers))
|
||||
|
||||
for i := 1; i <= totalFiles; i++ {
|
||||
_, ok := hrw.mmappedChunkFiles[i]
|
||||
if i < totalFiles-remainingFiles+1 {
|
||||
testutil.Equals(t, false, ok)
|
||||
} else {
|
||||
testutil.Equals(t, true, ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the number of segments.
|
||||
verifyRemainingFiles(totalFiles)
|
||||
|
||||
// Truncating files.
|
||||
testutil.Ok(t, hrw.Truncate(timeToTruncate))
|
||||
verifyRemainingFiles(after1stTruncation)
|
||||
|
||||
dir := hrw.dir.Name()
|
||||
testutil.Ok(t, hrw.Close())
|
||||
|
||||
// Restarted.
|
||||
var err error
|
||||
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
||||
testutil.Ok(t, err)
|
||||
|
||||
testutil.Assert(t, !hrw.fileMaxtSet, "")
|
||||
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64) error { return nil }))
|
||||
testutil.Assert(t, hrw.fileMaxtSet, "")
|
||||
|
||||
// Truncating files after restart.
|
||||
testutil.Ok(t, hrw.Truncate(timeToTruncateAfterRestart))
|
||||
verifyRemainingFiles(after2ndTruncation)
|
||||
|
||||
// Add another file to have an active file.
|
||||
totalFiles++
|
||||
cutFile(totalFiles)
|
||||
// Truncating till current time should not delete the current active file.
|
||||
testutil.Ok(t, hrw.Truncate(time.Now().UnixNano()/1e6))
|
||||
verifyRemainingFiles(1)
|
||||
}
|
||||
|
||||
func testHeadReadWriter(t *testing.T) (hrw *ChunkDiskMapper, close func()) {
|
||||
tmpdir, err := ioutil.TempDir("", "data")
|
||||
testutil.Ok(t, err)
|
||||
hrw, err = NewChunkDiskMapper(tmpdir, chunkenc.NewPool())
|
||||
testutil.Ok(t, err)
|
||||
return hrw, func() {
|
||||
testutil.Ok(t, os.RemoveAll(tmpdir))
|
||||
}
|
||||
}
|
||||
|
||||
func randomChunk(t *testing.T) chunkenc.Chunk {
|
||||
chunk := chunkenc.NewXORChunk()
|
||||
len := rand.Int() % 120
|
||||
app, err := chunk.Appender()
|
||||
testutil.Ok(t, err)
|
||||
for i := 0; i < len; i++ {
|
||||
app.Append(rand.Int63(), rand.Float64())
|
||||
}
|
||||
return chunk
|
||||
}
|
||||
|
||||
func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef uint64, mint, maxt int64, chunk chunkenc.Chunk) {
|
||||
var err error
|
||||
seriesRef = uint64(rand.Int63())
|
||||
mint = int64((idx)*1000 + 1)
|
||||
maxt = int64((idx + 1) * 1000)
|
||||
chunk = randomChunk(t)
|
||||
chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk)
|
||||
testutil.Ok(t, err)
|
||||
return
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
# Head Chunks on Disk Format
|
||||
|
||||
The following describes the format of a chunks file,
|
||||
which is created in the `wal/chunks/` inside the data directory.
|
||||
|
||||
Chunks in the files are referenced from the index by uint64 composed of
|
||||
in-file offset (lower 4 bytes) and segment sequence number (upper 4 bytes).
|
||||
|
||||
```
|
||||
┌──────────────────────────────┐
|
||||
│ magic(0x85BD40DD) <4 byte> │
|
||||
├──────────────────────────────┤
|
||||
│ version(1) <1 byte> │
|
||||
├──────────────────────────────┤
|
||||
│ padding(0) <3 byte> │
|
||||
├──────────────────────────────┤
|
||||
│ ┌──────────────────────────┐ │
|
||||
│ │ Chunk 1 │ │
|
||||
│ ├──────────────────────────┤ │
|
||||
│ │ ... │ │
|
||||
│ ├──────────────────────────┤ │
|
||||
│ │ Chunk N │ │
|
||||
│ └──────────────────────────┘ │
|
||||
└──────────────────────────────┘
|
||||
```
|
||||
|
||||
|
||||
# Chunk
|
||||
|
||||
Unlike chunks in the on-disk blocks, here we additionally store series reference that the chunks belongs to and the mint/maxt of the chunks. This is because we don't have an index associated with these chunks, hence these meta information are used while replaying the chunks.
|
||||
|
||||
```
|
||||
┌─────────────────────┬───────────────────────┬───────────────────────┬───────────────────┬───────────────┬──────────────┬────────────────┐
|
||||
| series ref <8 byte> | mint <8 byte, uint64> | maxt <8 byte, uint64> | encoding <1 byte> | len <uvarint> | data <bytes> │ CRC32 <4 byte> │
|
||||
└─────────────────────┴───────────────────────┴───────────────────────┴───────────────────┴───────────────┴──────────────┴────────────────┘
|
||||
```
|
|
@ -28,11 +28,16 @@ func OpenMmapFile(path string) (*MmapFile, error) {
|
|||
return OpenMmapFileWithSize(path, 0)
|
||||
}
|
||||
|
||||
func OpenMmapFileWithSize(path string, size int) (*MmapFile, error) {
|
||||
func OpenMmapFileWithSize(path string, size int) (mf *MmapFile, retErr error) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "try lock file")
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
f.Close()
|
||||
}
|
||||
}()
|
||||
if size <= 0 {
|
||||
info, err := f.Stat()
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue