Validate index series/postings/symbol table checksums on read

pull/5805/head
Sunny Klair 2017-10-26 15:34:31 -04:00
parent ab02ea4de4
commit b65dd43c5b
1 changed files with 28 additions and 14 deletions

View File

@ -626,7 +626,7 @@ func (r *indexReader) readTOC() error {
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
if valid, err := r.validCrc(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid {
if valid, err := r.checkCrc(d.be32(), len(r.b)-indexTOCLen, indexTOCLen-4); !valid {
return errors.Wrap(err, "TOC checksum")
}
@ -640,7 +640,7 @@ func (r *indexReader) decbufAt(off int) decbuf {
return decbuf{b: r.b[off:]}
}
func (r *indexReader) validCrc(crc uint32, off, cnt int) (bool, error) {
func (r *indexReader) checkCrc(crc uint32, off, cnt int) (bool, error) {
c2 := newCRC32()
if len(r.b) < off+cnt {
return false, errInvalidSize
@ -663,7 +663,8 @@ func (r *indexReader) readSymbols(off int) error {
}
var (
d1 = r.decbufAt(int(off))
d2 = d1.decbuf(d1.be32int())
l = d1.be32int()
d2 = d1.decbuf(l)
origLen = d2.len()
cnt = d2.be32int()
basePos = uint32(off) + 4
@ -676,6 +677,9 @@ func (r *indexReader) readSymbols(off int) error {
nextPos = basePos + uint32(origLen-d2.len())
cnt--
}
if valid, err := r.checkCrc(d1.be32(), int(off)+4, l); !valid {
return errors.Wrap(err, "symbol table checksum")
}
return d2.err()
}
@ -685,10 +689,10 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff"
var (
d1 = r.decbufAt(int(off))
tableLen = d1.be32int()
d2 = d1.decbuf(tableLen)
cnt = d2.be32()
d1 = r.decbufAt(int(off))
l = d1.be32int()
d2 = d1.decbuf(l)
cnt = d2.be32()
)
res := make(map[string]uint32, 512)
@ -705,7 +709,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
cnt--
}
if valid, err := r.validCrc(d1.be32(), int(off)+4, tableLen); !valid {
if valid, err := r.checkCrc(d1.be32(), int(off)+4, l); !valid {
return res, errors.Wrap(err, "offset table checksum")
}
@ -765,7 +769,8 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
}
d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int())
l := d1.be32int()
d2 := d1.decbuf(l)
nc := d2.be32int()
d2.be32() // consume unused value entry count.
@ -774,7 +779,9 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
return nil, errors.Wrap(d2.err(), "read label value index")
}
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
if valid, err := r.checkCrc(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "read label values checksum")
}
st := &serializedStringTuples{
l: nc,
@ -802,7 +809,9 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref))
d2 := d1.decbuf(int(d1.uvarint()))
l := d1.uvarint()
sl := len(r.b[ref:]) - d1.len() // # bytes in l
d2 := d1.decbuf(l)
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
@ -865,7 +874,9 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
})
}
// TODO(fabxc): verify CRC32.
if valid, err := r.checkCrc(d1.be32(), int(ref)+sl, l); !valid {
return errors.Wrap(err, "series checksum")
}
return nil
}
@ -880,7 +891,8 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
}
d1 := r.decbufAt(int(off))
d2 := d1.decbuf(d1.be32int())
l := d1.be32int()
d2 := d1.decbuf(l)
d2.be32() // consume unused postings list length.
@ -888,7 +900,9 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
return nil, errors.Wrap(d2.err(), "get postings bytes")
}
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
if valid, err := r.checkCrc(d1.be32(), int(off)+4, l); !valid {
return nil, errors.Wrap(err, "postings checksum")
}
return newBigEndianPostings(d2.get()), nil
}