Browse Source

Verify chunk format version (#544)

Verify chunk format version.
pull/5805/head
zhulongcheng 6 years ago committed by Krasi Georgiev
parent
commit
4d03c70800
  1. 71
      block_test.go
  2. 34
      chunks/chunks.go

71
block_test.go

@ -15,6 +15,8 @@ package tsdb
import (
"context"
"encoding/binary"
"errors"
"io/ioutil"
"math/rand"
"os"
@ -22,6 +24,7 @@ import (
"testing"
"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
)
@ -77,6 +80,74 @@ func TestCreateBlock(t *testing.T) {
testutil.Ok(t, err)
}
func TestCorruptedChunk(t *testing.T) {
for name, test := range map[string]struct {
corrFunc func(f *os.File) // Func that applies the corruption.
expErr error
}{
"invalid header size": {
func(f *os.File) {
err := f.Truncate(1)
testutil.Ok(t, err)
},
errors.New("invalid chunk header in segment 0: invalid size"),
},
"invalid magic number": {
func(f *os.File) {
magicChunksOffset := int64(0)
_, err := f.Seek(magicChunksOffset, 0)
testutil.Ok(t, err)
// Set invalid magic number.
b := make([]byte, chunks.MagicChunksSize)
binary.BigEndian.PutUint32(b[:chunks.MagicChunksSize], 0x00000000)
n, err := f.Write(b)
testutil.Ok(t, err)
testutil.Equals(t, chunks.MagicChunksSize, n)
},
errors.New("invalid magic number 0"),
},
"invalid chunk format version": {
func(f *os.File) {
chunksFormatVersionOffset := int64(4)
_, err := f.Seek(chunksFormatVersionOffset, 0)
testutil.Ok(t, err)
// Set invalid chunk format version.
b := make([]byte, chunks.ChunksFormatVersionSize)
b[0] = 0
n, err := f.Write(b)
testutil.Ok(t, err)
testutil.Equals(t, chunks.ChunksFormatVersionSize, n)
},
errors.New("invalid chunk format version 0"),
},
} {
t.Run(name, func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_open_block_chunk_corrupted")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 0))
files, err := sequenceFiles(chunkDir(blockDir))
testutil.Ok(t, err)
testutil.Assert(t, len(files) > 0, "No chunk created.")
f, err := os.OpenFile(files[0], os.O_RDWR, 0666)
testutil.Ok(t, err)
// Apply corruption function.
test.corrFunc(f)
testutil.Ok(t, f.Close())
_, err = OpenBlock(nil, blockDir, nil)
testutil.Equals(t, test.expErr.Error(), err.Error())
})
}
}
// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)

34
chunks/chunks.go

@ -27,12 +27,20 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/tsdb/errors"
"github.com/prometheus/tsdb/fileutil"
)
const (
// MagicChunks is 4 bytes at the head of a series file.
MagicChunks = 0x85BD40DD
// MagicChunksSize is the size in bytes of MagicChunks.
MagicChunksSize = 4
chunksFormatV1 = 1
ChunksFormatVersionSize = 1
chunkHeaderSize = MagicChunksSize + ChunksFormatVersionSize
)
// Meta holds information about a chunk of data.
@ -93,8 +101,6 @@ type Writer struct {
const (
defaultChunkSegmentSize = 512 * 1024 * 1024
chunksFormatV1 = 1
)
// NewWriter returns a new writer against the given directory.
@ -170,9 +176,8 @@ func (w *Writer) cut() error {
}
// Write header metadata for new file.
metab := make([]byte, 8)
binary.BigEndian.PutUint32(metab[:4], MagicChunks)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormatV1
if _, err := f.Write(metab); err != nil {
@ -373,13 +378,18 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
var totalSize int64
for i, b := range cr.bs {
if b.Len() < 4 {
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
if b.Len() < chunkHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid chunk header in segment %d", i)
}
// Verify magic number.
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
if m := binary.BigEndian.Uint32(b.Range(0, MagicChunksSize)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m)
}
// Verify chunk format version.
if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return nil, errors.Errorf("invalid chunk format version %d", v)
}
totalSize += int64(b.Len())
}
cr.size = totalSize
@ -409,7 +419,15 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
cs = append(cs, f)
bs = append(bs, realByteSlice(f.Bytes()))
}
return newReader(bs, cs, pool)
reader, err := newReader(bs, cs, pool)
if err != nil {
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(closeAll(cs))
return nil, merr
}
return reader, nil
}
func (s *Reader) Close() error {

Loading…
Cancel
Save