Merge pull request #189 from sunhay/checksum-checks

Index checksum validation on reads
pull/5805/head
Fabian Reinartz 2017-11-09 15:35:09 +00:00 committed by GitHub
commit 798f2bdb0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 15 deletions

View File

@ -2,7 +2,7 @@
The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB. The following describes the format of a single chunks file, which is created in the `chunks/` directory of a block. The maximum size per segment file is 512MiB.
Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the bigher 4 MSBs. Chunks in the files are referenced from the index by the in-file offset in the 4 LSB and the segment sequence number in the higher 4 MSBs.
``` ```
┌────────────────────────────────────────┬──────────────────────┐ ┌────────────────────────────────────────┬──────────────────────┐

View File

@ -133,7 +133,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
return renameFile(tmp, path) return renameFile(tmp, path)
} }
// Block represents a directory of time series data covering a continous time range. // Block represents a directory of time series data covering a continuous time range.
type Block struct { type Block struct {
mtx sync.RWMutex mtx sync.RWMutex
closing bool closing bool

View File

@ -575,11 +575,14 @@ type indexReader struct {
// prevents memory faults when applications work with read symbols after // prevents memory faults when applications work with read symbols after
// the block has been unmapped. // the block has been unmapped.
symbols map[uint32]string symbols map[uint32]string
crc32 hash.Hash32
} }
var ( var (
errInvalidSize = fmt.Errorf("invalid size") errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag") errInvalidFlag = fmt.Errorf("invalid flag")
errInvalidChecksum = fmt.Errorf("invalid checksum")
) )
// NewIndexReader returns a new IndexReader on the given directory. // NewIndexReader returns a new IndexReader on the given directory.
@ -595,6 +598,7 @@ func newIndexReader(dir string) (*indexReader, error) {
b: f.b, b: f.b,
c: f, c: f,
symbols: map[uint32]string{}, symbols: map[uint32]string{},
crc32: newCRC32(),
} }
// Verify magic number. // Verify magic number.
@ -630,9 +634,11 @@ func (r *indexReader) readTOC() error {
r.toc.postings = d.be64() r.toc.postings = d.be64()
r.toc.postingsTable = d.be64() r.toc.postingsTable = d.be64()
// TODO(fabxc): validate checksum. if valid, err := r.checkCRC(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid {
return errors.Wrap(err, "TOC checksum")
}
return nil return d.err()
} }
func (r *indexReader) decbufAt(off int) decbuf { func (r *indexReader) decbufAt(off int) decbuf {
@ -642,6 +648,20 @@ func (r *indexReader) decbufAt(off int) decbuf {
return decbuf{b: r.b[off:]} return decbuf{b: r.b[off:]}
} }
func (r *indexReader) checkCRC(crc uint32, off, cnt int) (bool, error) {
r.crc32.Reset()
if len(r.b) < off+cnt {
return false, errInvalidSize
}
if _, err := r.crc32.Write(r.b[off : off+cnt]); err != nil {
return false, errors.Wrap(err, "write to hash")
}
if r.crc32.Sum32() != crc {
return false, errInvalidChecksum
}
return true, nil
}
// readSymbols reads the symbol table fully into memory and allocates proper strings for them. // readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them // Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed. // after the reader is closed.
@ -651,7 +671,8 @@ func (r *indexReader) readSymbols(off int) error {
} }
var ( var (
d1 = r.decbufAt(int(off)) d1 = r.decbufAt(int(off))
d2 = d1.decbuf(d1.be32int()) l = d1.be32int()
d2 = d1.decbuf(l)
origLen = d2.len() origLen = d2.len()
cnt = d2.be32int() cnt = d2.be32int()
basePos = uint32(off) + 4 basePos = uint32(off) + 4
@ -664,6 +685,9 @@ func (r *indexReader) readSymbols(off int) error {
nextPos = basePos + uint32(origLen-d2.len()) nextPos = basePos + uint32(origLen-d2.len())
cnt-- cnt--
} }
if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return errors.Wrap(err, "symbol table checksum")
}
return d2.err() return d2.err()
} }
@ -674,7 +698,8 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
var ( var (
d1 = r.decbufAt(int(off)) d1 = r.decbufAt(int(off))
d2 = d1.decbuf(d1.be32int()) l = d1.be32int()
d2 = d1.decbuf(l)
cnt = d2.be32() cnt = d2.be32()
) )
@ -692,7 +717,10 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
cnt-- cnt--
} }
// TODO(fabxc): verify checksum from remainer of d1. if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return res, errors.Wrap(err, "offset table checksum")
}
return res, d2.err() return res, d2.err()
} }
@ -749,7 +777,8 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
} }
d1 := r.decbufAt(int(off)) d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int()) l := d1.be32int()
d2 := d1.decbuf(l)
nc := d2.be32int() nc := d2.be32int()
d2.be32() // consume unused value entry count. d2.be32() // consume unused value entry count.
@ -758,7 +787,9 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
return nil, errors.Wrap(d2.err(), "read label value index") return nil, errors.Wrap(d2.err(), "read label value index")
} }
// TODO(fabxc): verify checksum in 4 remaining bytes of d1. if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "read label values checksum")
}
st := &serializedStringTuples{ st := &serializedStringTuples{
l: nc, l: nc,
@ -786,7 +817,9 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref)) d1 := r.decbufAt(int(ref))
d2 := d1.decbuf(int(d1.uvarint())) l := d1.uvarint()
sl := len(r.b[ref:]) - d1.len() // # bytes in l
d2 := d1.decbuf(l)
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
*chks = (*chks)[:0] *chks = (*chks)[:0]
@ -849,7 +882,9 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
}) })
} }
// TODO(fabxc): verify CRC32. if valid, err := r.checkCRC(d1.be32(), int(ref)+sl, l); !valid {
return errors.Wrap(err, "series checksum")
}
return nil return nil
} }
@ -864,7 +899,8 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
} }
d1 := r.decbufAt(int(off)) d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int()) l := d1.be32int()
d2 := d1.decbuf(l)
d2.be32() // consume unused postings list length. d2.be32() // consume unused postings list length.
@ -872,7 +908,9 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
return nil, errors.Wrap(d2.err(), "get postings bytes") return nil, errors.Wrap(d2.err(), "get postings bytes")
} }
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify. if valid, err := r.checkCRC(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "postings checksum")
}
return newBigEndianPostings(d2.get()), nil return newBigEndianPostings(d2.get()), nil
} }