// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// Segment header fields constants.
const (
// MagicChunks is 4 bytes at the head of a series file.
MagicChunks = 0x85BD40DD
// MagicChunksSize is the size in bytes of MagicChunks.
MagicChunksSize = 4
chunksFormatV1 = 1
ChunksFormatVersionSize = 1
segmentHeaderPaddingSize = 3
// SegmentHeaderSize defines the total size of the header part.
SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize
)
// Chunk fields constants.
const (
// MaxChunkLengthFieldSize defines the maximum size of the data length part.
MaxChunkLengthFieldSize = binary . MaxVarintLen32
// ChunkEncodingSize defines the size of the chunk encoding part.
ChunkEncodingSize = 1
)
// Meta holds information about a chunk of data.
type Meta struct {
// Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself.
// When it is a reference it is the segment offset at which the chunk bytes start.
// Generally, only one of them is set.
Ref uint64
Chunk chunkenc . Chunk
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime , MaxTime int64
}
// writeHash writes the chunk encoding and raw data into the provided hash.
func ( cm * Meta ) writeHash ( h hash . Hash , buf [ ] byte ) error {
buf = append ( buf [ : 0 ] , byte ( cm . Chunk . Encoding ( ) ) )
if _ , err := h . Write ( buf [ : 1 ] ) ; err != nil {
return err
}
if _ , err := h . Write ( cm . Chunk . Bytes ( ) ) ; err != nil {
return err
}
return nil
}
// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt].
func ( cm * Meta ) OverlapsClosedInterval ( mint , maxt int64 ) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm . MinTime <= maxt && mint <= cm . MaxTime
}
var (
errInvalidSize = fmt . Errorf ( "invalid size" )
)
var castagnoliTable * crc32 . Table
func init ( ) {
castagnoliTable = crc32 . MakeTable ( crc32 . Castagnoli )
}
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// polynomial may be easily changed in one location at a later time, if necessary.
func newCRC32 ( ) hash . Hash32 {
return crc32 . New ( castagnoliTable )
}
// Writer implements the ChunkWriter interface for the standard
// serialization format.
type Writer struct {
dirFile * os . File
files [ ] * os . File
wbuf * bufio . Writer
n int64
crc32 hash . Hash
buf [ binary . MaxVarintLen32 ] byte
segmentSize int64
}
const (
// DefaultChunkSegmentSize is the default chunks segment size.
DefaultChunkSegmentSize = 512 * 1024 * 1024
)
// NewWriterWithSegSize returns a new writer against the given directory
// and allows setting a custom size for the segments.
func NewWriterWithSegSize ( dir string , segmentSize int64 ) ( * Writer , error ) {
return newWriter ( dir , segmentSize )
}
// NewWriter returns a new writer against the given directory
// using the default segment size.
func NewWriter ( dir string ) ( * Writer , error ) {
return newWriter ( dir , DefaultChunkSegmentSize )
}
func newWriter ( dir string , segmentSize int64 ) ( * Writer , error ) {
if segmentSize <= 0 {
segmentSize = DefaultChunkSegmentSize
}
if err := os . MkdirAll ( dir , 0777 ) ; err != nil {
return nil , err
}
dirFile , err := fileutil . OpenDir ( dir )
if err != nil {
return nil , err
}
return & Writer {
dirFile : dirFile ,
n : 0 ,
crc32 : newCRC32 ( ) ,
segmentSize : segmentSize ,
} , nil
}
func ( w * Writer ) tail ( ) * os . File {
if len ( w . files ) == 0 {
return nil
}
return w . files [ len ( w . files ) - 1 ]
}
// finalizeTail writes all pending data to the current tail file,
// truncates its size, and closes it.
func ( w * Writer ) finalizeTail ( ) error {
tf := w . tail ( )
if tf == nil {
return nil
}
if err := w . wbuf . Flush ( ) ; err != nil {
return err
}
if err := tf . Sync ( ) ; err != nil {
return err
}
// As the file was pre-allocated, we truncate any superfluous zero bytes.
off , err := tf . Seek ( 0 , io . SeekCurrent )
if err != nil {
return err
}
if err := tf . Truncate ( off ) ; err != nil {
return err
}
return tf . Close ( )
}
func ( w * Writer ) cut ( ) error {
// Sync current tail to disk and close.
if err := w . finalizeTail ( ) ; err != nil {
return err
}
n , f , _ , err := cutSegmentFile ( w . dirFile , chunksFormatV1 , w . segmentSize )
if err != nil {
return err
}
w . n = int64 ( n )
w . files = append ( w . files , f )
if w . wbuf != nil {
w . wbuf . Reset ( f )
} else {
w . wbuf = bufio . NewWriterSize ( f , 8 * 1024 * 1024 )
}
return nil
}
func cutSegmentFile ( dirFile * os . File , chunksFormat byte , segmentSize int64 ) ( headerSize int , newFile * os . File , seq int , err error ) {
p , seq , err := nextSequenceFile ( dirFile . Name ( ) )
if err != nil {
return 0 , nil , 0 , err
}
f , err := os . OpenFile ( p , os . O_WRONLY | os . O_CREATE , 0666 )
if err != nil {
return 0 , nil , 0 , err
}
if segmentSize > 0 {
if err = fileutil . Preallocate ( f , segmentSize , true ) ; err != nil {
return 0 , nil , 0 , err
}
}
if err = dirFile . Sync ( ) ; err != nil {
return 0 , nil , 0 , err
}
// Write header metadata for new file.
metab := make ( [ ] byte , SegmentHeaderSize )
binary . BigEndian . PutUint32 ( metab [ : MagicChunksSize ] , MagicChunks )
metab [ 4 ] = chunksFormat
n , err := f . Write ( metab )
if err != nil {
return 0 , nil , 0 , err
}
return n , f , seq , nil
}
func ( w * Writer ) write ( b [ ] byte ) error {
n , err := w . wbuf . Write ( b )
w . n += int64 ( n )
return err
}
// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks ( chks [ ] Meta ) ( [ ] Meta , error ) {
if len ( chks ) < 2 {
return chks , nil
}
newChks := make ( [ ] Meta , 0 , len ( chks ) ) // Will contain the merged chunks.
newChks = append ( newChks , chks [ 0 ] )
last := 0
for _ , c := range chks [ 1 : ] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c . MinTime > newChks [ last ] . MaxTime {
newChks = append ( newChks , c )
last ++
continue
}
nc := & newChks [ last ]
if c . MaxTime > nc . MaxTime {
nc . MaxTime = c . MaxTime
}
chk , err := MergeChunks ( nc . Chunk , c . Chunk )
if err != nil {
return nil , err
}
nc . Chunk = chk
}
return newChks , nil
}
// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks ( a , b chunkenc . Chunk ) ( * chunkenc . XORChunk , error ) {
newChunk := chunkenc . NewXORChunk ( )
app , err := newChunk . Appender ( )
if err != nil {
return nil , err
}
ait := a . Iterator ( nil )
bit := b . Iterator ( nil )
aok , bok := ait . Next ( ) , bit . Next ( )
for aok && bok {
at , av := ait . At ( )
bt , bv := bit . At ( )
if at < bt {
app . Append ( at , av )
aok = ait . Next ( )
} else if bt < at {
app . Append ( bt , bv )
bok = bit . Next ( )
} else {
app . Append ( bt , bv )
aok = ait . Next ( )
bok = bit . Next ( )
}
}
for aok {
at , av := ait . At ( )
app . Append ( at , av )
aok = ait . Next ( )
}
for bok {
bt , bv := bit . At ( )
app . Append ( bt , bv )
bok = bit . Next ( )
}
if ait . Err ( ) != nil {
return nil , ait . Err ( )
}
if bit . Err ( ) != nil {
return nil , bit . Err ( )
}
return newChunk , nil
}
// WriteChunks writes as many chunks as possible to the current segment,
// cuts a new segment when the current segment is full and
// writes the rest of the chunks in the new segment.
func ( w * Writer ) WriteChunks ( chks ... Meta ) error {
var (
batchSize = int64 ( 0 )
batchStart = 0
batches = make ( [ ] [ ] Meta , 1 )
batchID = 0
firstBatch = true
)
for i , chk := range chks {
// Each chunk contains: data length + encoding + the data itself + crc32
chkSize := int64 ( MaxChunkLengthFieldSize ) // The data length is a variable length field so use the maximum possible value.
chkSize += ChunkEncodingSize // The chunk encoding.
chkSize += int64 ( len ( chk . Chunk . Bytes ( ) ) ) // The data itself.
chkSize += crc32 . Size // The 4 bytes of crc32.
batchSize += chkSize
// Cut a new batch when it is not the first chunk(to avoid empty segments) and
// the batch is too large to fit in the current segment.
cutNewBatch := ( i != 0 ) && ( batchSize + SegmentHeaderSize > w . segmentSize )
// When the segment already has some data than
// the first batch size calculation should account for that.
if firstBatch && w . n > SegmentHeaderSize {
cutNewBatch = batchSize + w . n > w . segmentSize
if cutNewBatch {
firstBatch = false
}
}
if cutNewBatch {
batchStart = i
batches = append ( batches , [ ] Meta { } )
batchID ++
batchSize = chkSize
}
batches [ batchID ] = chks [ batchStart : i + 1 ]
}
// Create a new segment when one doesn't already exist.
if w . n == 0 {
if err := w . cut ( ) ; err != nil {
return err
}
}
for i , chks := range batches {
if err := w . writeChunks ( chks ) ; err != nil {
return err
}
// Cut a new segment only when there are more chunks to write.
// Avoid creating a new empty segment at the end of the write.
if i < len ( batches ) - 1 {
if err := w . cut ( ) ; err != nil {
return err
}
}
}
return nil
}
// writeChunks writes the chunks into the current segment irrespective
// of the configured segment size limit. A segment should have been already
// started before calling this.
func ( w * Writer ) writeChunks ( chks [ ] Meta ) error {
if len ( chks ) == 0 {
return nil
}
var seq = uint64 ( w . seq ( ) ) << 32
for i := range chks {
chk := & chks [ i ]
// The reference is set to the segment index and the offset where
// the data starts for this chunk.
//
// The upper 4 bytes are for the segment index and
// the lower 4 bytes are for the segment offset where to start reading this chunk.
chk . Ref = seq | uint64 ( w . n )
n := binary . PutUvarint ( w . buf [ : ] , uint64 ( len ( chk . Chunk . Bytes ( ) ) ) )
if err := w . write ( w . buf [ : n ] ) ; err != nil {
return err
}
w . buf [ 0 ] = byte ( chk . Chunk . Encoding ( ) )
if err := w . write ( w . buf [ : 1 ] ) ; err != nil {
return err
}
if err := w . write ( chk . Chunk . Bytes ( ) ) ; err != nil {
return err
}
w . crc32 . Reset ( )
if err := chk . writeHash ( w . crc32 , w . buf [ : ] ) ; err != nil {
return err
}
if err := w . write ( w . crc32 . Sum ( w . buf [ : 0 ] ) ) ; err != nil {
return err
}
}
return nil
}
func ( w * Writer ) seq ( ) int {
return len ( w . files ) - 1
}
func ( w * Writer ) Close ( ) error {
if err := w . finalizeTail ( ) ; err != nil {
return err
}
// close dir file (if not windows platform will fail on rename)
return w . dirFile . Close ( )
}
// ByteSlice abstracts a byte slice.
type ByteSlice interface {
Len ( ) int
Range ( start , end int ) [ ] byte
}
type realByteSlice [ ] byte
func ( b realByteSlice ) Len ( ) int {
return len ( b )
}
func ( b realByteSlice ) Range ( start , end int ) [ ] byte {
return b [ start : end ]
}
func ( b realByteSlice ) Sub ( start , end int ) ByteSlice {
return b [ start : end ]
}
// Reader implements a ChunkReader for a serialized byte stream
// of series data.
type Reader struct {
// The underlying bytes holding the encoded series data.
// Each slice holds the data for a different segment.
bs [ ] ByteSlice
cs [ ] io . Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc . Pool
}
func newReader ( bs [ ] ByteSlice , cs [ ] io . Closer , pool chunkenc . Pool ) ( * Reader , error ) {
cr := Reader { pool : pool , bs : bs , cs : cs }
for i , b := range cr . bs {
if b . Len ( ) < SegmentHeaderSize {
return nil , errors . Wrapf ( errInvalidSize , "invalid segment header in segment %d" , i )
}
// Verify magic number.
if m := binary . BigEndian . Uint32 ( b . Range ( 0 , MagicChunksSize ) ) ; m != MagicChunks {
return nil , errors . Errorf ( "invalid magic number %x" , m )
}
// Verify chunk format version.
if v := int ( b . Range ( MagicChunksSize , MagicChunksSize + ChunksFormatVersionSize ) [ 0 ] ) ; v != chunksFormatV1 {
return nil , errors . Errorf ( "invalid chunk format version %d" , v )
}
cr . size += int64 ( b . Len ( ) )
}
return & cr , nil
}
// NewDirReader returns a new Reader against sequentially numbered files in the
// given directory.
func NewDirReader ( dir string , pool chunkenc . Pool ) ( * Reader , error ) {
files , err := sequenceFiles ( dir )
if err != nil {
return nil , err
}
if pool == nil {
pool = chunkenc . NewPool ( )
}
var (
bs [ ] ByteSlice
cs [ ] io . Closer
merr tsdb_errors . MultiError
)
for _ , fn := range files {
f , err := fileutil . OpenMmapFile ( fn )
if err != nil {
merr . Add ( errors . Wrap ( err , "mmap files" ) )
merr . Add ( closeAll ( cs ) )
return nil , merr
}
cs = append ( cs , f )
bs = append ( bs , realByteSlice ( f . Bytes ( ) ) )
}
reader , err := newReader ( bs , cs , pool )
if err != nil {
merr . Add ( err )
merr . Add ( closeAll ( cs ) )
return nil , merr
}
return reader , nil
}
func ( s * Reader ) Close ( ) error {
return closeAll ( s . cs )
}
// Size returns the size of the chunks.
func ( s * Reader ) Size ( ) int64 {
return s . size
}
// Chunk returns a chunk from a given reference.
func ( s * Reader ) Chunk ( ref uint64 ) ( chunkenc . Chunk , error ) {
var (
// Get the upper 4 bytes.
// These contain the segment index.
sgmIndex = int ( ref >> 32 )
// Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts.
chkStart = int ( ( ref << 32 ) >> 32 )
chkCRC32 = newCRC32 ( )
)
if sgmIndex >= len ( s . bs ) {
return nil , errors . Errorf ( "segment index %d out of range" , sgmIndex )
}
sgmBytes := s . bs [ sgmIndex ]
if chkStart + MaxChunkLengthFieldSize > sgmBytes . Len ( ) {
return nil , errors . Errorf ( "segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , sgmBytes . Len ( ) )
}
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
c := sgmBytes . Range ( chkStart , chkStart + MaxChunkLengthFieldSize )
chkDataLen , n := binary . Uvarint ( c )
if n <= 0 {
return nil , errors . Errorf ( "reading chunk length failed with %d" , n )
}
chkEncStart := chkStart + n
chkEnd := chkEncStart + ChunkEncodingSize + int ( chkDataLen ) + crc32 . Size
chkDataStart := chkEncStart + ChunkEncodingSize
chkDataEnd := chkEnd - crc32 . Size
if chkEnd > sgmBytes . Len ( ) {
return nil , errors . Errorf ( "segment doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkEnd , sgmBytes . Len ( ) )
}
sum := sgmBytes . Range ( chkDataEnd , chkEnd )
if _ , err := chkCRC32 . Write ( sgmBytes . Range ( chkEncStart , chkDataEnd ) ) ; err != nil {
return nil , err
}
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
return nil , errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act )
}
chkData := sgmBytes . Range ( chkDataStart , chkDataEnd )
chkEnc := sgmBytes . Range ( chkEncStart , chkEncStart + ChunkEncodingSize ) [ 0 ]
return s . pool . Get ( chunkenc . Encoding ( chkEnc ) , chkData )
}
func nextSequenceFile ( dir string ) ( string , int , error ) {
names , err := fileutil . ReadDir ( dir )
if err != nil {
return "" , 0 , err
}
i := uint64 ( 0 )
for _ , n := range names {
j , err := strconv . ParseUint ( n , 10 , 64 )
if err != nil {
continue
}
// It is not necessary that we find the files in number order,
// for example with '1000000' and '200000', '1000000' would come first.
// Though this is a very very race case, we check anyway for the max id.
if j > i {
i = j
}
}
return segmentFile ( dir , int ( i + 1 ) ) , int ( i + 1 ) , nil
}
func segmentFile ( baseDir string , index int ) string {
return filepath . Join ( baseDir , fmt . Sprintf ( "%0.6d" , index ) )
}
func sequenceFiles ( dir string ) ( [ ] string , error ) {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
var res [ ] string
for _ , fi := range files {
if _ , err := strconv . ParseUint ( fi . Name ( ) , 10 , 64 ) ; err != nil {
continue
}
res = append ( res , filepath . Join ( dir , fi . Name ( ) ) )
}
return res , nil
}
func closeAll ( cs [ ] io . Closer ) error {
var merr tsdb_errors . MultiError
for _ , c := range cs {
merr . Add ( c . Close ( ) )
}
return merr . Err ( )
}